• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4877

11 Dec 2025 02:43AM UTC coverage: 64.586% (-0.05%) from 64.632%
#4877

push

travis-ci

guanshengliang
feat(TS-7270): internal dependence

307 of 617 new or added lines in 24 files covered. (49.76%)

673 existing lines in 130 files now uncovered.

163673 of 253417 relevant lines covered (64.59%)

105540806.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.7
/source/libs/executor/src/dataInserter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdint.h>
17
#include <stdlib.h>
18
#include <string.h>
19
#include "dataSinkInt.h"
20
#include "dataSinkMgt.h"
21
#include "executor.h"
22
#include "executorInt.h"
23
#include "functionMgt.h"
24
#include "libs/new-stream/stream.h"
25
#include "osAtomic.h"
26
#include "osMemPool.h"
27
#include "osMemory.h"
28
#include "osSemaphore.h"
29
#include "planner.h"
30
#include "query.h"
31
#include "querytask.h"
32
#include "storageapi.h"
33
#include "taoserror.h"
34
#include "tarray.h"
35
#include "tcompression.h"
36
#include "tdatablock.h"
37
#include "tdataformat.h"
38
#include "tglobal.h"
39
#include "thash.h"
40
#include "tmsg.h"
41
#include "tqueue.h"
42

43
extern SDataSinkStat gDataSinkStat;
44
SHashObj*            gStreamGrpTableHash = NULL;
45
typedef struct SSubmitRes {
46
  int64_t      affectedRows;
47
  int32_t      code;
48
  SSubmitRsp2* pRsp;
49
} SSubmitRes;
50

51
typedef struct SSubmitTbDataMsg {
52
  int32_t vgId;
53
  int32_t len;
54
  void*   pData;
55
} SSubmitTbDataMsg;
56

57
typedef struct SSubmitTbDataSendInfo {
58
  SSubmitTbDataMsg msg;
59
  SEpSet epSet;
60
} SSubmitTbDataSendInfo;
61

62
typedef struct SSubmitReqSendInfo {
63
  SSubmitReq2* msg;
64
  SEpSet epSet;
65
} SSubmitReqSendInfo;
66

67
static void destroySSubmitTbDataMsg(void* p) {
×
68
  if (p == NULL) return;
×
69
  SSubmitTbDataMsg* pVg = p;
×
70
  taosMemoryFree(pVg->pData);
×
71
  taosMemoryFree(pVg);
×
72
}
73

NEW
74
static void destroySSubmitTbDataSendInfo(void* p) {
×
NEW
75
  if (p == NULL) return;
×
NEW
76
  SSubmitTbDataSendInfo* pSendInfo = p;
×
NEW
77
  taosMemoryFree(pSendInfo->msg.pData);
×
NEW
78
  taosMemoryFree(pSendInfo);
×
79
}
80

81
typedef struct SDataInserterHandle {
82
  SDataSinkHandle     sink;
83
  SDataSinkManager*   pManager;
84
  STSchema*           pSchema;
85
  SQueryInserterNode* pNode;
86
  SSubmitRes          submitRes;
87
  SInserterParam*     pParam;
88
  SArray*             pDataBlocks;
89
  SHashObj*           pCols;
90
  int32_t             status;
91
  bool                queryEnd;
92
  bool                fullOrderColList;
93
  uint64_t            useconds;
94
  uint64_t            cachedSize;
95
  uint64_t            flags;
96
  TdThreadMutex       mutex;
97
  tsem_t              ready;
98
  bool                explain;
99
  bool                isStbInserter;
100
  SSchemaWrapper*     pTagSchema;
101
  const char*         dbFName;
102
  SHashObj*           dbVgInfoMap;
103
  SUseDbRsp*          pRsp;
104
} SDataInserterHandle;
105

106
typedef struct SSubmitRspParam {
107
  SDataInserterHandle* pInserter;
108
  void*                putParam;
109
} SSubmitRspParam;
110

111
typedef struct SBuildInsertDataInfo {
112
  SSubmitTbData  pTbData;
113
  bool           isFirstBlock;
114
  bool           isLastBlock;
115
  int64_t        lastTs;
116
  bool           needSortMerge;
117
} SBuildInsertDataInfo;
118

119
typedef struct SDropTbCtx {
120
  SSTriggerDropRequest* req;
121
  tsem_t                ready;
122
  int32_t               code;
123
} SDropTbCtx;
124
typedef struct SDropTbDataMsg {
125
  SMsgHead header;
126
  void*    pData;
127
} SDropTbDataMsg;
128

129
typedef struct SRunnerDropTableInfo {
130
  SSTriggerDropRequest* pReq;
131
  int32_t               code;
132
} SRunnerDropTableInfo;
133

134
static int32_t initInsertProcessInfo(SBuildInsertDataInfo* pBuildInsertDataInfo, int32_t rows) {
5,298,671✔
135
  pBuildInsertDataInfo->isLastBlock = false;
5,298,671✔
136
  pBuildInsertDataInfo->lastTs = TSKEY_MIN;
5,299,044✔
137
  pBuildInsertDataInfo->isFirstBlock = true;
5,299,044✔
138
  pBuildInsertDataInfo->needSortMerge = false;
5,299,474✔
139

140
  if (!(pBuildInsertDataInfo->pTbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
5,299,474✔
141
    return terrno;
×
142
  }
143

144
  return TSDB_CODE_SUCCESS;
5,299,474✔
145
}
146

147
static void freeCacheTbInfo(void* pp) {
451,907✔
148
  if (pp == NULL || *(SInsertTableInfo**)pp == NULL) {
451,907✔
149
    return;
×
150
  }
151
  SInsertTableInfo* pTbInfo = *(SInsertTableInfo**)pp;
451,907✔
152
  if (pTbInfo->tbname) {
451,907✔
153
    taosMemFree(pTbInfo->tbname);
452,337✔
154
    pTbInfo->tbname = NULL;
452,337✔
155
  }
156
  if (pTbInfo->pSchema) {
452,337✔
157
    tDestroyTSchema(pTbInfo->pSchema);
452,337✔
158
    pTbInfo->pSchema = NULL;
452,337✔
159
  }
160
  taosMemoryFree(pTbInfo);
451,907✔
161
}
162

163
int32_t initInserterGrpInfo() {
689,924✔
164
  gStreamGrpTableHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
689,924✔
165
  if (NULL == gStreamGrpTableHash) {
689,924✔
166
    qError("failed to create stream group table hash");
×
167
    return terrno;
×
168
  }
169
  taosHashSetFreeFp(gStreamGrpTableHash, freeCacheTbInfo);
689,924✔
170
  return TSDB_CODE_SUCCESS;
689,924✔
171
}
172

173
void destroyInserterGrpInfo() {
689,924✔
174
  static int8_t destoryGrpInfo = 0;
175
  int8_t        flag = atomic_val_compare_exchange_8(&destoryGrpInfo, 0, 1);
689,924✔
176
  if (flag != 0) {
689,924✔
177
    return;
×
178
  }
179
  if (NULL != gStreamGrpTableHash) {
689,924✔
180
    taosHashCleanup(gStreamGrpTableHash);
689,924✔
181
    gStreamGrpTableHash = NULL;
689,924✔
182
  }
183
}
184

185
static int32_t checkResAndResetTableInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* res,
448,381✔
186
                                         bool* pSchemaChaned) {
187
  int32_t code = TSDB_CODE_SUCCESS;
448,381✔
188
  if (!pSubmitRes->pRsp) {
448,381✔
189
    stError("create table response is NULL");
×
190
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
191
  }
192
  if (pSubmitRes->pRsp->aCreateTbRsp->size < 1) {
448,381✔
193
    stError("create table response size is less than 1");
×
194
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
195
  }
196
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
448,381✔
197
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
448,381✔
198
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
199
    return pCreateTbRsp->code;
×
200
  }
201
  if (!pCreateTbRsp->pMeta || pCreateTbRsp->pMeta->tuid == 0) {
448,381✔
202
    stError("create table can not get tuid");
×
203
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
204
  }
205

206
  *pSchemaChaned = false;
448,381✔
207
  res->vgid = pCreateTbRsp->pMeta->vgId;
448,381✔
208
  res->uid = pCreateTbRsp->pMeta->tuid;
448,381✔
209

210
  if (pCreateTbRsp->pMeta->sversion != 0 && res->version != pCreateTbRsp->pMeta->sversion) {
448,381✔
211
    *pSchemaChaned = true;
806✔
212
  }
213

214
  stDebug("inserter callback, uid:%" PRId64 "  vgid: %" PRId64 ", version: %d", res->uid, res->vgid, res->version);
448,381✔
215

216
  return TSDB_CODE_SUCCESS;
448,381✔
217
}
218

219
static int32_t createNewInsertTbInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* pOldInsertTbInfo,
806✔
220
                                     SInsertTableInfo** ppNewInsertTbInfo) {
221
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
806✔
222
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
806✔
223
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
224
    return pCreateTbRsp->code;
×
225
  }
226

227
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
806✔
228
  if (res == NULL) {
806✔
229
    return terrno;
×
230
  }
231
  res->tbname = taosStrdup(pOldInsertTbInfo->tbname);
806✔
232
  if (res->tbname == NULL) {
806✔
233
    taosMemoryFree(res);
×
234
    stError("failed to allocate memory for table name");
×
235
    return terrno;
×
236
  }
237

238
  res->vgid = pCreateTbRsp->pMeta->vgId;
806✔
239

240
  res->uid = pCreateTbRsp->pMeta->tuid;
806✔
241
  res->vgid = pCreateTbRsp->pMeta->vgId;
806✔
242

243
  res->version = pCreateTbRsp->pMeta->sversion;
806✔
244
  res->pSchema = tBuildTSchema(pCreateTbRsp->pMeta->pSchemas, pCreateTbRsp->pMeta->numOfColumns, res->version);
806✔
245
  if (res->pSchema == NULL) {
806✔
246
    stError("failed to build schema for table:%s, uid:%" PRId64 ", vgid:%" PRId64 ", version:%d", res->tbname, res->uid,
×
247
            res->vgid, res->version);
248
    return terrno;
×
249
  }
250
  *ppNewInsertTbInfo = res;
806✔
251
  return TSDB_CODE_SUCCESS;
806✔
252
}
253

254
static int32_t updateInsertGrpTableInfo(SStreamDataInserterInfo* pInserterInfo, const SSubmitRes* pSubmitRes) {
448,381✔
255
  int32_t            code = TSDB_CODE_SUCCESS;
448,381✔
256
  int32_t            lino = 0;
448,381✔
257
  int64_t            key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
448,381✔
258
  SInsertTableInfo** ppTbRes = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
448,381✔
259
  if (NULL == ppTbRes || *ppTbRes == NULL) {
448,381✔
260
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
261
  }
262

263
  bool schemaChanged = false;
448,381✔
264
  code = checkResAndResetTableInfo(pSubmitRes, *ppTbRes, &schemaChanged);
448,381✔
265
  QUERY_CHECK_CODE(code, lino, _exit);
448,381✔
266

267
  if (schemaChanged) {
448,381✔
268
    SInsertTableInfo* pNewInfo = NULL;
806✔
269
    code = createNewInsertTbInfo(pSubmitRes, *ppTbRes, &pNewInfo);
806✔
270
    QUERY_CHECK_CODE(code, lino, _exit);
806✔
271

272
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
806✔
273

274
    code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &pNewInfo, sizeof(SInsertTableInfo*));
806✔
275

276
    if (code == TSDB_CODE_DUP_KEY) {
806✔
277
      freeCacheTbInfo(&pNewInfo);
×
278
      code = TSDB_CODE_SUCCESS;
×
279
      goto _exit;
×
280
    } else if (code != TSDB_CODE_SUCCESS) {
806✔
281
      freeCacheTbInfo(&pNewInfo);
×
282
      stError("failed to put new insert tbInfo for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
283
              pInserterInfo->streamId, pInserterInfo->groupId, code);
284
      QUERY_CHECK_CODE(code, lino, _exit);
×
285
    }
286

287
    stInfo("update table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", uid:%" PRId64 ", vgid:%" PRId64
806✔
288
           ", version:%d",
289
           pInserterInfo->streamId, pInserterInfo->groupId, pNewInfo->uid, pNewInfo->vgid, pNewInfo->version);
290
  }
291
  return TSDB_CODE_SUCCESS;
448,381✔
292

293
_exit:
×
294
  if (code != TSDB_CODE_SUCCESS) {
×
295
    stError("failed to check and reset table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
296
            pInserterInfo->streamId, pInserterInfo->groupId, code);
297
  }
298
  taosHashRelease(gStreamGrpTableHash, ppTbRes);
×
299
  return code;
×
300
}
301

302
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema);
303
static int32_t initTableInfo(SDataInserterHandle* pInserter, SStreamDataInserterInfo* pInserterInfo) {
451,063✔
304
  int32_t           code = TSDB_CODE_SUCCESS;
451,063✔
305
  int32_t           lino = 0;
451,063✔
306
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
451,063✔
307
  if (res == NULL) {
451,531✔
308
    return terrno;
×
309
  }
310

311
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
451,531✔
312
  res->uid = 0;
451,531✔
313
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
451,531✔
314
    res->version = 1;
127,110✔
315
  } else {
316
    res->version = pInsertParam->sver;
324,421✔
317
  }
318

319
  res->tbname = taosStrdup(pInserterInfo->tbName);
451,531✔
320
  if (res->tbname == NULL) {
451,430✔
321
    taosMemoryFree(res);
×
322
    stError("failed to allocate memory for table name");
×
323
    return terrno;
×
324
  }
325

326
  code = buildTSchmaFromInserter(pInserter->pParam->streamInserterParam, &res->pSchema);
451,430✔
327
  QUERY_CHECK_CODE(code, lino, _return);
451,531✔
328

329
  int64_t key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
451,531✔
330
  code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &res, sizeof(SInsertTableInfo*));
451,531✔
331
  if (code == TSDB_CODE_DUP_KEY) {
451,101✔
332
    freeCacheTbInfo(&res);
116,646✔
333
    return TSDB_CODE_SUCCESS;
117,076✔
334
  }
335

336
_return:
334,455✔
337
  if (code != TSDB_CODE_SUCCESS) {
334,455✔
338
    stError("failed to build table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
339
            pInserterInfo->streamId, pInserterInfo->groupId, code);
340
    freeCacheTbInfo(&res);
×
341
  }
342
  return code;
334,455✔
343
}
344

345
static bool colsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
30,345✔
346
  SArray* pCreatingFields = pInserterParam->pFields;
30,345✔
347

348
  for (int32_t i = 0; i < pCreatingFields->size; ++i) {
178,493✔
349
    SFieldWithOptions* pField = taosArrayGet(pCreatingFields, i);
148,148✔
350
    if (NULL == pField) {
148,148✔
351
      stError("isSupportedSTableSchema: failed to get field from array");
×
352
      return false;
×
353
    }
354

355
    for (int j = 0; j < pTableMetaRsp->numOfColumns; ++j) {
457,738✔
356
      if (strncmp(pTableMetaRsp->pSchemas[j].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
454,514✔
357
        if (pTableMetaRsp->pSchemas[j].type == pField->type && pTableMetaRsp->pSchemas[j].bytes == pField->bytes) {
144,924✔
358
          break;
359
        } else {
360
          return false;
×
361
        }
362
      }
363
    }
364
  }
365
  return true;
30,345✔
366
}
367

368
static bool TagsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
1,612✔
369
  SArray* pCreatingTags = pInserterParam->pTagFields;
1,612✔
370

371
  int32_t            tagIndexOffset = -1;
1,612✔
372
  SFieldWithOptions* pField = taosArrayGet(pCreatingTags, 0);
1,612✔
373
  if (NULL == pField) {
1,612✔
374
    stError("isSupportedSTableSchema: failed to get field from array");
×
375
    return false;
×
376
  }
377
  for (int32_t i = 0; i < pTableMetaRsp->numOfColumns + pTableMetaRsp->numOfTags; ++i) {
1,612✔
378
    if (strncmp(pTableMetaRsp->pSchemas[i].name, pField->name, TSDB_COL_NAME_LEN) != 0) {
806✔
379
      tagIndexOffset = i;
806✔
380
      break;
806✔
381
    }
382
  }
383
  if (tagIndexOffset == -1) {
1,612✔
384
    stError("isSupportedSTableSchema: failed to get tag index");
806✔
385
    return false;
806✔
386
  }
387

388
  for (int32_t i = 0; i < pTableMetaRsp->numOfTags; ++i) {
1,612✔
389
    int32_t            index = i + tagIndexOffset;
806✔
390
    SFieldWithOptions* pField = taosArrayGet(pCreatingTags, i);
806✔
391
    if (NULL == pField) {
806✔
392
      stError("isSupportedSTableSchema: failed to get field from array");
×
393
      return false;
×
394
    }
395

396
    for(int32_t j = 0; j < pTableMetaRsp->numOfTags; ++j) {
1,612✔
397
      if (strncmp(pTableMetaRsp->pSchemas[index].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
806✔
398
        if (pTableMetaRsp->pSchemas[index].type == pField->type &&
×
399
            pTableMetaRsp->pSchemas[index].bytes == pField->bytes) {
×
400
          break;
401
        } else {
402
          return false;
×
403
        }
404
      }
405
    }
406
  }
407
  return true;
806✔
408
}
409

410
static bool isSupportedSTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
1,612✔
411
  if (!colsIsSupported(pTableMetaRsp, pInserterParam)) {
1,612✔
412
    return false;
×
413
  }
414
  if (!TagsIsSupported(pTableMetaRsp, pInserterParam)) {
1,612✔
415
    return false;
806✔
416
  }
417
  return true;
806✔
418
}
419

420
static bool isSupportedNTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
28,733✔
421
  return colsIsSupported(pTableMetaRsp, pInserterParam);
28,733✔
422
}
423

424
static int32_t checkAndSaveCreateGrpTableInfo(SDataInserterHandle*     pInserthandle,
30,345✔
425
                                              SStreamDataInserterInfo* pInserterInfo) {
426
  int32_t     code = TSDB_CODE_SUCCESS;
30,345✔
427
  SSubmitRes* pSubmitRes = &pInserthandle->submitRes;
30,345✔
428
  int8_t      tbType = pInserthandle->pParam->streamInserterParam->tbType;
30,345✔
429

430
  SVCreateTbRsp*        pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
30,345✔
431
  SSchema*              pExistRow = pCreateTbRsp->pMeta->pSchemas;
30,345✔
432
  SStreamInserterParam* pInserterParam = pInserthandle->pParam->streamInserterParam;
30,345✔
433

434
  if (tbType == TSDB_CHILD_TABLE || tbType == TSDB_SUPER_TABLE) {
30,345✔
435
    if (!isSupportedSTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
1,612✔
436
      stError("create table failed, schema is not supported");
806✔
437
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
806✔
438
    }
439
  } else if (tbType == TSDB_NORMAL_TABLE) {
28,733✔
440
    if (!isSupportedNTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
28,733✔
441
      stError("create table failed, schema is not supported");
×
442
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
×
443
    }
444
  } else {
445
    stError("checkAndSaveCreateGrpTableInfo failed, tbType:%d is not supported", tbType);
×
446
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
447
  }
448

449
  return updateInsertGrpTableInfo(pInserterInfo, pSubmitRes);
29,539✔
450
}
451

452
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
5,356,886✔
453
  SSubmitRspParam*     pParam = (SSubmitRspParam*)param;
5,356,886✔
454
  SDataInserterHandle* pInserter = pParam->pInserter;
5,356,886✔
455
  int32_t              code2 = 0;
5,356,886✔
456

457
  if (code) {
5,356,886✔
458
    pInserter->submitRes.code = code;
37,624✔
459
  } else {
460
    pInserter->submitRes.code = TSDB_CODE_SUCCESS;
5,319,262✔
461
  }
462
  SDecoder coder = {0};
5,356,886✔
463

464
  if (code == TSDB_CODE_SUCCESS) {
5,356,886✔
465
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
5,319,262✔
466
    if (NULL == pInserter->submitRes.pRsp) {
5,319,262✔
467
      pInserter->submitRes.code = terrno;
×
468
      goto _return;
×
469
    }
470

471
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
5,319,262✔
472
    code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
5,319,262✔
473
    if (code) {
5,319,262✔
474
      tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
475
      taosMemoryFree(pInserter->submitRes.pRsp);
×
476
      pInserter->submitRes.code = code;
×
477
      goto _return;
×
478
    }
479

480
    if (pInserter->submitRes.pRsp->affectedRows > 0) {
5,319,262✔
481
      SArray* pCreateTbList = pInserter->submitRes.pRsp->aCreateTbRsp;
1,327,945✔
482
      int32_t numOfTables = taosArrayGetSize(pCreateTbList);
1,327,945✔
483

484
      for (int32_t i = 0; i < numOfTables; ++i) {
1,740,214✔
485
        SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i);
412,269✔
486
        if (NULL == pRsp) {
412,269✔
487
          pInserter->submitRes.code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
488
          goto _return;
×
489
        }
490
        if (TSDB_CODE_SUCCESS != pRsp->code) {
412,269✔
491
          code = pRsp->code;
×
492
          tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
493
          taosMemoryFree(pInserter->submitRes.pRsp);
×
494
          pInserter->submitRes.code = code;
×
495
          goto _return;
×
496
        }
497
      }
498
    }
499

500
    if (pParam->putParam != NULL && ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) {
5,319,262✔
501
      code2 = updateInsertGrpTableInfo((SStreamDataInserterInfo*)pParam->putParam, &pInserter->submitRes);
418,842✔
502
    }
503

504
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
5,319,262✔
505
    qDebug("submit rsp received, affectedRows:%d, total:%" PRId64, pInserter->submitRes.pRsp->affectedRows,
5,319,262✔
506
           pInserter->submitRes.affectedRows);
507
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
5,319,262✔
508
    taosMemoryFree(pInserter->submitRes.pRsp);
5,319,262✔
509
  } else if ((TSDB_CODE_TDB_TABLE_ALREADY_EXIST == code && pParam->putParam != NULL &&
37,624✔
510
              ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) ||
37,624✔
511
             TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER == code) {
512
    pInserter->submitRes.code = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
30,345✔
513
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
30,345✔
514
    if (NULL == pInserter->submitRes.pRsp) {
30,345✔
515
      code2 = terrno;
×
516
      goto _return;
×
517
    }
518

519
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
30,345✔
520
    code2 = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
30,345✔
521
    if (code2 == TSDB_CODE_SUCCESS) {
30,345✔
522
      code2 = checkAndSaveCreateGrpTableInfo(pInserter, (SStreamDataInserterInfo*)pParam->putParam);
30,345✔
523
    }
524
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
30,345✔
525
    taosMemoryFree(pInserter->submitRes.pRsp);
30,345✔
526
  }
527

528
_return:
5,339,824✔
529

530
  if (code2) {
5,356,886✔
531
    qError("update inserter table info failed, error:%s", tstrerror(code2));
806✔
532
  }
533
  tDecoderClear(&coder);
5,356,886✔
534
  TAOS_UNUSED(tsem_post(&pInserter->ready));
5,356,886✔
535

536
  taosMemoryFree(pMsg->pData);
5,356,886✔
537

538
  return TSDB_CODE_SUCCESS;
5,356,886✔
539
}
540

541
void freeUseDbOutput_tmp(void* ppOutput) {
12,255✔
542
  SUseDbOutput* pOut = *(SUseDbOutput**)ppOutput;
12,255✔
543
  if (NULL == ppOutput) {
12,255✔
544
    return;
×
545
  }
546

547
  if (pOut->dbVgroup) {
12,255✔
548
    freeVgInfo(pOut->dbVgroup);
12,255✔
549
  }
550
  taosMemFree(pOut);
12,255✔
551
  *(SUseDbOutput**)ppOutput = NULL;
12,255✔
552
}
553

554
static int32_t processUseDbRspForInserter(void* param, SDataBuf* pMsg, int32_t code) {
67,910✔
555
  int32_t       lino = 0;
67,910✔
556
  SDBVgInfoReq* pVgInfoReq = (SDBVgInfoReq*)param;
67,910✔
557

558
  if (TSDB_CODE_SUCCESS != code) {
67,910✔
559
    // pInserter->pTaskInfo->code = rpcCvtErrCode(code);
560
    // if (pInserter->pTaskInfo->code != code) {
561
    //   qError("load db info rsp received, error:%s, cvted error:%s", tstrerror(code),
562
    //          tstrerror(pInserter->pTaskInfo->code));
563
    // } else {
564
    //   qError("load db info rsp received, error:%s", tstrerror(code));
565
    // }
566
    goto _return;
×
567
  }
568

569
  pVgInfoReq->pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
67,910✔
570
  QUERY_CHECK_NULL(pVgInfoReq->pRsp, code, lino, _return, terrno);
67,910✔
571

572
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pVgInfoReq->pRsp);
67,910✔
573
  QUERY_CHECK_CODE(code, lino, _return);
67,910✔
574

575
_return:
67,910✔
576
  taosMemoryFreeClear(pMsg->pData);
67,910✔
577
  taosMemoryFreeClear(pMsg->pEpSet);
67,910✔
578
  if (code != 0){
67,910✔
579
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
580
  }
581
  int ret = tsem_post(&pVgInfoReq->ready);
67,910✔
582
  if (ret != 0) {
67,910✔
583
    qError("%s failed code: %d", __func__, ret);
×
584
  }
585
  return code;
67,910✔
586
}
587

588

589
int inserterVgInfoComp(const void* lp, const void* rp) {
88,672✔
590
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
88,672✔
591
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
88,672✔
592
  if (pLeft->hashBegin < pRight->hashBegin) {
88,672✔
593
    return -1;
77,950✔
594
  } else if (pLeft->hashBegin > pRight->hashBegin) {
10,722✔
595
    return 1;
10,722✔
596
  }
597

598
  return 0;
×
599
}
600

601
static int32_t buildDbVgInfoMap(void* clientRpc, const char* dbFName, SUseDbOutput* output) {
67,815✔
602
  int32_t      code = TSDB_CODE_SUCCESS;
67,815✔
603
  int32_t      lino = 0;
67,815✔
604
  char*        buf1 = NULL;
67,815✔
605
  SUseDbReq*   pReq = NULL;
67,815✔
606
  SDBVgInfoReq dbVgInfoReq = {0};
67,815✔
607
  code = tsem_init(&dbVgInfoReq.ready, 0, 0);
67,815✔
608
  if (code != TSDB_CODE_SUCCESS) {
67,507✔
609
    qError("tsem_init failed, error:%s", tstrerror(code));
×
610
    return code;
×
611
  }
612

613
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
67,507✔
614
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
67,412✔
615

616
  tstrncpy(pReq->db, dbFName, TSDB_DB_FNAME_LEN);
67,412✔
617
  QUERY_CHECK_CODE(code, lino, _return);
67,815✔
618

619
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
67,815✔
620
  buf1 = taosMemoryCalloc(1, contLen);
67,412✔
621
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
67,910✔
622

623
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
67,910✔
624
  if (tempRes < 0) {
67,815✔
625
    QUERY_CHECK_CODE(terrno, lino, _return);
×
626
  }
627

628
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
67,815✔
629
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
67,910✔
630

631
  SEpSet pEpSet = {0};
67,910✔
632
  QUERY_CHECK_CODE(getCurrentMnodeEpset(&pEpSet), lino, _return);
67,910✔
633

634
  pMsgSendInfo->param = &dbVgInfoReq;
67,910✔
635
  pMsgSendInfo->msgInfo.pData = buf1;
67,910✔
636
  buf1 = NULL;
67,507✔
637
  pMsgSendInfo->msgInfo.len = contLen;
67,507✔
638
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
67,507✔
639
  pMsgSendInfo->fp = processUseDbRspForInserter;
67,910✔
640
  // pMsgSendInfo->requestId = pTaskInfo->id.queryId;
641

642
  code = asyncSendMsgToServer(clientRpc, &pEpSet, NULL, pMsgSendInfo);
67,910✔
643
  QUERY_CHECK_CODE(code, lino, _return);
67,910✔
644

645
  code = tsem_wait(&dbVgInfoReq.ready);
67,910✔
646
  QUERY_CHECK_CODE(code, lino, _return);
67,910✔
647

648
  code = queryBuildUseDbOutput(output, dbVgInfoReq.pRsp);
67,910✔
649
  QUERY_CHECK_CODE(code, lino, _return);
67,910✔
650

651
  output->dbVgroup->vgArray = taosArrayInit(dbVgInfoReq.pRsp->vgNum, sizeof(SVgroupInfo));
67,910✔
652
  if (NULL == output->dbVgroup->vgArray) {
67,505✔
653
    code = terrno;
×
654
    QUERY_CHECK_CODE(code, lino, _return);
×
655
  }
656

657
  void* pIter = taosHashIterate(output->dbVgroup->vgHash, NULL);
67,505✔
658
  while (pIter) {
174,245✔
659
    if (NULL == taosArrayPush(output->dbVgroup->vgArray, pIter)) {
212,670✔
660
      taosHashCancelIterate(output->dbVgroup->vgHash, pIter);
×
661
      return terrno;
×
662
    }
663

664
    pIter = taosHashIterate(output->dbVgroup->vgHash, pIter);
106,335✔
665
  }
666

667
  taosArraySort(output->dbVgroup->vgArray, inserterVgInfoComp);
67,910✔
668

669
_return:
67,910✔
670

671
  if (code) {
67,505✔
672
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
673
    taosMemoryFree(buf1);
×
674
  }
675
  taosMemoryFree(pReq);
67,505✔
676
  TAOS_UNUSED(tsem_destroy(&dbVgInfoReq.ready));
67,910✔
677
  if (dbVgInfoReq.pRsp) {
67,910✔
678
    tFreeSUsedbRsp(dbVgInfoReq.pRsp);
67,910✔
679
    taosMemoryFreeClear(dbVgInfoReq.pRsp);
67,910✔
680
  }
681
  return code;
67,910✔
682
}
683

684
int32_t inserterBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
328,706✔
685
                                 SArray* tagName, uint8_t tagNum, int32_t ttl) {
686
  pTbReq->type = TD_CHILD_TABLE;
328,706✔
687
  pTbReq->ctb.pTag = (uint8_t*)pTag;
329,509✔
688
  pTbReq->name = taosStrdup(tname);
329,079✔
689
  if (!pTbReq->name) return terrno;
328,701✔
690
  pTbReq->ctb.suid = suid;
329,509✔
691
  pTbReq->ctb.tagNum = tagNum;
328,702✔
692
  if (sname) {
328,706✔
693
    pTbReq->ctb.stbName = taosStrdup(sname);
329,106✔
694
    if (!pTbReq->ctb.stbName) {
329,079✔
695
      taosMemoryFree(pTbReq->name);
×
696
      return terrno;
×
697
    }
698
  }
699
  pTbReq->ctb.tagName = tagName;
328,679✔
700
  pTbReq->ttl = ttl;
329,079✔
701
  pTbReq->commentLen = -1;
328,674✔
702

703
  return TSDB_CODE_SUCCESS;
327,898✔
704
}
705

706
int32_t inserterHashValueComp(void const* lp, void const* rp) {
7,396,141✔
707
  uint32_t*    key = (uint32_t*)lp;
7,396,141✔
708
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
7,396,141✔
709

710
  if (*key < pVg->hashBegin) {
7,396,141✔
711
    return -1;
1,793,592✔
712
  } else if (*key > pVg->hashEnd) {
5,602,980✔
713
    return 1;
296,964✔
714
  }
715

716
  return 0;
5,305,585✔
717
}
718

719

720
int32_t inserterGetVgInfo(SDBVgInfo* dbInfo, char* tbName, SVgroupInfo* pVgInfo) {
5,306,938✔
721
  if (NULL == dbInfo) {
5,306,938✔
722
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
723
  }
724

725
  if (NULL == dbInfo->vgArray) {
5,306,938✔
726
    qError("empty db vgArray, hashSize:%d", taosHashGetSize(dbInfo->vgHash));
×
727
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
728
  }
729

730
  uint32_t hashValue =
5,306,535✔
731
      taosGetTbHashVal(tbName, (int32_t)strlen(tbName), dbInfo->hashMethod, dbInfo->hashPrefix, dbInfo->hashSuffix);
5,306,105✔
732
  SVgroupInfo* vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, inserterHashValueComp, TD_EQ);
5,305,687✔
733
  if (NULL == vgInfo) {
5,305,585✔
734
    qError("no hash range found for hash value [%u], table:%s, numOfVgId:%d", hashValue, tbName,
×
735
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
736
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
737
  }
738
  
739
  *pVgInfo = *vgInfo;
5,305,585✔
740
  qDebug("insert get vgInfo, tbName:%s vgId:%d epset(%s:%d)", tbName, pVgInfo->vgId, pVgInfo->epSet.eps[0].fqdn,
5,305,585✔
741
        pVgInfo->epSet.eps[0].port);
742
        
743
  return TSDB_CODE_SUCCESS;
5,306,535✔
744
}
745

UNCOV
746
int32_t inserterGetVgId(SDBVgInfo* dbInfo, char* tbName, int32_t* vgId) {
×
UNCOV
747
  SVgroupInfo vgInfo = {0};
×
UNCOV
748
  int32_t     code = inserterGetVgInfo(dbInfo, tbName, &vgInfo);
×
UNCOV
749
  if (code != TSDB_CODE_SUCCESS) {
×
750
    qError("inserterGetVgId failed, code:%d", code);
×
751
    return code;
×
752
  }
UNCOV
753
  *vgId = vgInfo.vgId;
×
754

UNCOV
755
  return TSDB_CODE_SUCCESS;
×
756
}
757

758
int32_t inserterGetDbVgInfo(SDataInserterHandle* pInserter, const char* dbFName, SDBVgInfo** dbVgInfo) {
2,544✔
759
  int32_t       code = TSDB_CODE_SUCCESS;
2,544✔
760
  int32_t       line = 0;
2,544✔
761
  SUseDbOutput* output = NULL;
2,544✔
762

763
  // QRY_PARAM_CHECK(dbVgInfo);
764
  // QRY_PARAM_CHECK(pInserter);
765
  // QRY_PARAM_CHECK(name);
766

767
  if (pInserter->dbVgInfoMap == NULL) {
2,544✔
768
    pInserter->dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
2,544✔
769
    if (pInserter->dbVgInfoMap == NULL) {
2,544✔
770
      return TSDB_CODE_OUT_OF_MEMORY;
×
771
    }
772
  }
773

774
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(pInserter->dbVgInfoMap, dbFName, strlen(dbFName));
2,544✔
775

776
  if (find == NULL) {
2,544✔
777
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
2,544✔
778
    if (output == NULL) {
2,544✔
779
      return TSDB_CODE_OUT_OF_MEMORY;
×
780
    }
781

782
    code = buildDbVgInfoMap(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName, output);
2,544✔
783
    QUERY_CHECK_CODE(code, line, _return);
2,544✔
784

785
    code = taosHashPut(pInserter->dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
2,544✔
786
    QUERY_CHECK_CODE(code, line, _return);
2,544✔
787
  } else {
788
    output = *find;
×
789
  }
790

791
  *dbVgInfo = output->dbVgroup;
2,544✔
792
  return code;
2,544✔
793

794
_return:
×
795
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
796
  freeUseDbOutput_tmp(&output);
×
797
  return code;
×
798
}
799

800
int32_t getTableVgInfo(SDataInserterHandle* pInserter, const char* dbFName,
5,300,792✔
801
                       const char* tbName, SVgroupInfo* pVgInfo) {
802
  return getDbVgInfoForExec(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName,
5,300,792✔
803
                              tbName, pVgInfo);
804
}
805

806
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* putParam, void* pMsg, int32_t msgLen,
5,356,886✔
807
                                 void* pTransporter, SEpSet* pEpset) {
808
  // send the fetch remote task result reques
809
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,356,886✔
810
  if (NULL == pMsgSendInfo) {
5,356,886✔
811
    taosMemoryFreeClear(pMsg);
×
812
    return terrno;
×
813
  }
814

815
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
5,356,886✔
816
  if (NULL == pParam) {
5,355,674✔
817
    taosMemoryFreeClear(pMsg);
×
818
    taosMemoryFreeClear(pMsgSendInfo);
×
819
    return terrno;
×
820
  }
821
  pParam->pInserter = pInserter;
5,355,674✔
822
  pParam->putParam = putParam;
5,356,484✔
823

824
  pMsgSendInfo->param = pParam;
5,356,481✔
825
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,356,886✔
826
  pMsgSendInfo->msgInfo.pData = pMsg;
5,356,481✔
827
  pMsgSendInfo->msgInfo.len = msgLen;
5,356,079✔
828
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
5,355,674✔
829
  pMsgSendInfo->fp = inserterCallback;
5,356,481✔
830

831
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
5,356,886✔
832
}
833

834
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
5,356,886✔
835
  int32_t code = TSDB_CODE_SUCCESS;
5,356,886✔
836
  int32_t len = 0;
5,356,886✔
837
  void*   pBuf = NULL;
5,356,886✔
838
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
5,356,886✔
839
  if (TSDB_CODE_SUCCESS == code) {
5,356,456✔
840
    SEncoder encoder;
5,339,394✔
841
    len += sizeof(SSubmitReq2Msg);
5,356,886✔
842
    pBuf = taosMemoryMalloc(len);
5,356,886✔
843
    if (NULL == pBuf) {
5,355,244✔
844
      return terrno;
×
845
    }
846
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
5,355,244✔
847
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
5,356,481✔
848
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
5,356,886✔
849
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
5,356,886✔
850
    code = tEncodeSubmitReq(&encoder, pReq);
5,356,481✔
851
    tEncoderClear(&encoder);
5,356,451✔
852
  }
853

854
  if (TSDB_CODE_SUCCESS == code) {
5,356,037✔
855
    *pData = pBuf;
5,356,037✔
856
    *pLen = len;
5,356,456✔
857
  } else {
858
    taosMemoryFree(pBuf);
×
859
  }
860

861
  return code;
5,355,641✔
862
}
863

864
int32_t buildSubmitReqFromStbBlock(SDataInserterHandle* pInserter, SHashObj* pHash, const SSDataBlock* pDataBlock,
2,544✔
865
                                   const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid) {
866
  SArray* pVals = NULL;
2,544✔
867
  SArray* pTagVals = NULL;
2,544✔
868
  SSubmitReqSendInfo** ppSendInfo = NULL;
2,544✔
869
  int32_t numOfBlks = 0;
2,544✔
870

871
  terrno = TSDB_CODE_SUCCESS;
2,544✔
872

873
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
2,544✔
874
  int32_t rows = pDataBlock->info.rows;
2,544✔
875

876
  if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
2,544✔
877
    goto _end;
×
878
  }
879

880
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
2,544✔
881
    goto _end;
×
882
  }
883

884
  SDBVgInfo* dbInfo = NULL;
2,544✔
885
  int32_t    code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
2,544✔
886
  if (code != TSDB_CODE_SUCCESS) {
2,544✔
887
    terrno = code;
×
888
    goto _end;
×
889
  }
890

891
  for (int32_t j = 0; j < rows; ++j) {
7,632✔
892
    SSubmitTbData tbData = {0};
5,088✔
893
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
5,088✔
894
      goto _end;
×
895
    }
896
    tbData.suid = suid;
5,088✔
897
    tbData.uid = uid;
5,088✔
898
    tbData.sver = pTSchema->version;
5,088✔
899

900
    int64_t lastTs = TSKEY_MIN;
5,088✔
901

902
    taosArrayClear(pVals);
5,088✔
903

904
    int32_t offset = 0;
5,088✔
905
    taosArrayClear(pTagVals);
5,088✔
906
    tbData.uid = 0;
5,088✔
907
    tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
5,088✔
908
    if (NULL == tbData.pCreateTbReq) {
5,088✔
909
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
910
      goto _end;
×
911
    }
912
    tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
5,088✔
913

914
    SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
5,088✔
915
    if (NULL == tbname) {
5,088✔
916
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
917
      qError("Insert into stable must have tbname column");
×
918
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
919
      goto _end;
×
920
    }
921
    if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
5,088✔
922
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
923
      qError("tbname column must be binary");
×
924
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
925
      goto _end;
×
926
    }
927

928
    if (colDataIsNull_s(tbname, j)) {
10,176✔
929
      qError("insert into stable tbname column is null");
×
930
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
931
      goto _end;
×
932
    }
933
    void*   data = colDataGetVarData(tbname, j);
5,088✔
934
    SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data), .pData = varDataVal(data)};
5,088✔
935
    SColVal cv = COL_VAL_VALUE(0, sv);
5,088✔
936

937
    char tbFullName[TSDB_TABLE_FNAME_LEN];
5,088✔
938
    char tableName[TSDB_TABLE_FNAME_LEN];
5,088✔
939
    memcpy(tableName, sv.pData, sv.nData);
5,088✔
940
    tableName[sv.nData] = '\0';
5,088✔
941

942
    int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
5,088✔
943
    if (len >= TSDB_TABLE_FNAME_LEN) {
5,088✔
944
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
945
      qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
946
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
947
      goto _end;
×
948
    }
949
    SVgroupInfo vgInfo = {0};
5,088✔
950
    code = inserterGetVgInfo(dbInfo, tbFullName, &vgInfo);
5,088✔
951
    if (code != TSDB_CODE_SUCCESS) {
5,088✔
952
      terrno = code;
×
953
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
954
      goto _end;
×
955
    }
956
    SSubmitReq2* pReq = NULL;
5,088✔
957
    ppSendInfo = taosHashGet(pHash, &vgInfo.vgId, sizeof(int32_t));
5,088✔
958
    if (ppSendInfo == NULL) {
5,088✔
959
      SSubmitReqSendInfo* pSendInfo = taosMemoryCalloc(1, sizeof(SSubmitReqSendInfo));
5,088✔
960
      if (NULL == pSendInfo) {
5,088✔
NEW
961
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
NEW
962
        goto _end;
×
963
      }
964
      
965
      pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
5,088✔
966
      if (NULL == pReq) {
5,088✔
NEW
967
        taosMemoryFree(pSendInfo);
×
968
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
969
        goto _end;
×
970
      }
971

972
      if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
5,088✔
NEW
973
        taosMemoryFree(pReq);
×
NEW
974
        taosMemoryFree(pSendInfo);
×
NEW
975
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
NEW
976
        goto _end;
×
977
      }
978
      
979
      pSendInfo->msg = pReq;
5,088✔
980
      pSendInfo->epSet = vgInfo.epSet;
5,088✔
981
      code = taosHashPut(pHash, &vgInfo.vgId, sizeof(int32_t), &pSendInfo, POINTER_BYTES);
5,088✔
982
      if (code != TSDB_CODE_SUCCESS) {
5,088✔
NEW
983
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
NEW
984
        taosMemoryFree(pReq);
×
NEW
985
        taosMemoryFree(pSendInfo);
×
NEW
986
        terrno = code;
×
987
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
988
        goto _end;
×
989
      }
990
    } else {
NEW
991
      pReq = (*ppSendInfo)->msg;
×
992
    }
993
    SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
5,088✔
994
    if (!TagNames) {
5,088✔
995
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
996
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
997
      goto _end;
×
998
    }
999
    for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
15,264✔
1000
      SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
10,176✔
1001
      int16_t  colIdx = tSchema->colId;
10,176✔
1002
      int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
10,176✔
1003
      if (NULL == slotId) {
10,176✔
1004
        continue;
5,088✔
1005
      }
1006
      if (NULL == taosArrayPush(TagNames, tSchema->name)) {
10,176✔
1007
        taosArrayDestroy(TagNames);
×
1008
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1009
        goto _end;
×
1010
      }
1011

1012
      colIdx = *slotId;
5,088✔
1013
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
5,088✔
1014
      if (NULL == pColInfoData) {
5,088✔
1015
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1016
        taosArrayDestroy(TagNames);
×
1017
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1018
        goto _end;
×
1019
      }
1020
      // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
1021
      switch (pColInfoData->info.type) {
5,088✔
1022
        case TSDB_DATA_TYPE_NCHAR:
3,816✔
1023
        case TSDB_DATA_TYPE_VARBINARY:
1024
        case TSDB_DATA_TYPE_VARCHAR: {
1025
          if (pColInfoData->info.type != tSchema->type) {
3,816✔
1026
            qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
1027
                   tSchema->type);
1028
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1029
            taosArrayDestroy(TagNames);
×
1030
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1031
            goto _end;
×
1032
          }
1033
          if (colDataIsNull_s(pColInfoData, j)) {
7,632✔
1034
            continue;
×
1035
          } else {
1036
            void*   data = colDataGetVarData(pColInfoData, j);
3,816✔
1037
            STagVal tv = (STagVal){
3,816✔
1038
                .cid = tSchema->colId, .type = tSchema->type, .nData = varDataLen(data), .pData = varDataVal(data)};
3,816✔
1039
            if (NULL == taosArrayPush(pTagVals, &tv)) {
3,816✔
1040
              taosArrayDestroy(TagNames);
×
1041
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1042
              goto _end;
×
1043
            }
1044
          }
1045
          break;
3,816✔
1046
        }
1047
        case TSDB_DATA_TYPE_BLOB:
×
1048
        case TSDB_DATA_TYPE_JSON:
1049
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1050
          qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1051
          terrno = TSDB_CODE_APP_ERROR;
×
1052
          taosArrayDestroy(TagNames);
×
1053
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1054
          goto _end;
×
1055
          break;
1056
        default:
1,272✔
1057
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
1,272✔
1058
            if (colDataIsNull_s(pColInfoData, j)) {
2,544✔
1059
              continue;
×
1060
            } else {
1061
              void*   data = colDataGetData(pColInfoData, j);
1,272✔
1062
              STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
1,272✔
1063
              memcpy(&tv.i64, data, tSchema->bytes);
1,272✔
1064
              if (NULL == taosArrayPush(pTagVals, &tv)) {
1,272✔
1065
                taosArrayDestroy(TagNames);
×
1066
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1067
                goto _end;
×
1068
              }
1069
            }
1070
          } else {
1071
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1072
            terrno = TSDB_CODE_APP_ERROR;
×
1073
            taosArrayDestroy(TagNames);
×
1074
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1075
            goto _end;
×
1076
          }
1077
          break;
1,272✔
1078
      }
1079
    }
1080
    STag* pTag = NULL;
5,088✔
1081
    code = tTagNew(pTagVals, 1, false, &pTag);
5,088✔
1082
    if (code != TSDB_CODE_SUCCESS) {
5,088✔
1083
      terrno = code;
×
1084
      qError("failed to create tag, error:%s", tstrerror(code));
×
1085
      taosArrayDestroy(TagNames);
×
1086
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1087
      goto _end;
×
1088
    }
1089

1090
    code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, suid, pInserter->pNode->tableName, TagNames,
5,088✔
1091
                                    pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
5,088✔
1092
    if (code != TSDB_CODE_SUCCESS) {
5,088✔
1093
      terrno = code;
×
1094
      qError("failed to build create table request, error:%s", tstrerror(code));
×
1095
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1096
      goto _end;
×
1097
    }
1098

1099
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {
25,440✔
1100
      int16_t         colIdx = k;
20,352✔
1101
      const STColumn* pCol = &pTSchema->columns[k];
20,352✔
1102
      int16_t*        slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
20,352✔
1103
      if (NULL == slotId) {
20,352✔
1104
        continue;
3,816✔
1105
      }
1106
      colIdx = *slotId;
16,536✔
1107

1108
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
16,536✔
1109
      if (NULL == pColInfoData) {
16,536✔
1110
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1111
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1112
        goto _end;
×
1113
      }
1114
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
16,536✔
1115

1116
      switch (pColInfoData->info.type) {
16,536✔
1117
        case TSDB_DATA_TYPE_NCHAR:
×
1118
        case TSDB_DATA_TYPE_VARBINARY:
1119
        case TSDB_DATA_TYPE_VARCHAR: {
1120
          if (pColInfoData->info.type != pCol->type) {
×
1121
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1122
                   pCol->type);
1123
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1124
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1125
            goto _end;
×
1126
          }
1127
          if (colDataIsNull_s(pColInfoData, j)) {
×
1128
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1129
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1130
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1131
              goto _end;
×
1132
            }
1133
          } else {
1134
            void*   data = colDataGetVarData(pColInfoData, j);
×
1135
            SValue  sv = (SValue){.type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};
×
1136
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
1137
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1138
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1139
              goto _end;
×
1140
            }
1141
          }
1142
          break;
×
1143
        }
1144
        case TSDB_DATA_TYPE_BLOB:
×
1145
        case TSDB_DATA_TYPE_JSON:
1146
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1147
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1148
          terrno = TSDB_CODE_APP_ERROR;
×
1149
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1150
          goto _end;
×
1151
          break;
1152
        default:
16,536✔
1153
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
16,536✔
1154
            if (colDataIsNull_s(pColInfoData, j)) {
33,072✔
1155
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
×
1156
                qError("Primary timestamp column should not be null");
×
1157
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1158
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1159
                goto _end;
×
1160
              }
1161

1162
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1163
              if (NULL == taosArrayPush(pVals, &cv)) {
×
1164
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1165
                goto _end;
×
1166
              }
1167
            } else {
1168
              // if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
1169
              //   if (*(int64_t*)var <= lastTs) {
1170
              //     needSortMerge = true;
1171
              //   } else {
1172
              //     lastTs = *(int64_t*)var;
1173
              //   }
1174
              // }
1175

1176
              SValue sv = {.type = pCol->type};
16,536✔
1177
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
16,536✔
1178
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
16,536✔
1179
              if (NULL == taosArrayPush(pVals, &cv)) {
16,536✔
1180
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1181
                goto _end;
×
1182
              }
1183
            }
1184
          } else {
1185
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1186
            terrno = TSDB_CODE_APP_ERROR;
×
1187
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1188
            goto _end;
×
1189
          }
1190
          break;
16,536✔
1191
      }
1192
    }
1193

1194
    SRow* pRow = NULL;
5,088✔
1195
    SRowBuildScanInfo sinfo = {0};
5,088✔
1196
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
5,088✔
1197
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1198
      goto _end;
×
1199
    }
1200
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
10,176✔
1201
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1202
      goto _end;
×
1203
    }
1204

1205
    if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
10,176✔
1206
      goto _end;
×
1207
    }
1208
  }
1209

1210
_end:
2,544✔
1211
  taosArrayDestroy(pTagVals);
2,544✔
1212
  taosArrayDestroy(pVals);
2,544✔
1213

1214
  return terrno;
2,544✔
1215
}
1216

1217
int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** ppReq, const SSDataBlock* pDataBlock,
60,256✔
1218
                                const STSchema* pTSchema, int64_t* uid, int32_t* vgId, tb_uid_t* suid) {
1219
  SSubmitReq2* pReq = *ppReq;
60,256✔
1220
  SArray*      pVals = NULL;
60,256✔
1221
  SArray*      pTagVals = NULL;
60,256✔
1222
  int32_t      numOfBlks = 0;
60,256✔
1223
  char*        tableName = NULL;
60,256✔
1224
  int32_t      code = 0, lino = 0;
60,256✔
1225

1226
  terrno = TSDB_CODE_SUCCESS;
60,256✔
1227

1228
  if (NULL == pReq) {
60,256✔
1229
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
60,256✔
1230
      goto _end;
×
1231
    }
1232

1233
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
60,256✔
1234
      goto _end;
×
1235
    }
1236
  }
1237

1238
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
60,256✔
1239
  int32_t rows = pDataBlock->info.rows;
60,256✔
1240

1241
  SSubmitTbData tbData = {0};
60,256✔
1242
  if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
60,256✔
1243
    goto _end;
×
1244
  }
1245
  tbData.suid = *suid;
60,256✔
1246
  tbData.uid = *uid;
60,256✔
1247
  tbData.sver = pTSchema->version;
60,256✔
1248

1249
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
60,256✔
1250
    taosArrayDestroy(tbData.aRowP);
×
1251
    goto _end;
×
1252
  }
1253

1254
  if (pInserter->isStbInserter) {
60,256✔
1255
    if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
×
1256
      taosArrayDestroy(tbData.aRowP);
×
1257
      goto _end;
×
1258
    }
1259
  }
1260

1261
  int64_t lastTs = TSKEY_MIN;
60,256✔
1262
  bool    needSortMerge = false;
60,256✔
1263

1264
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
2,888,099✔
1265
    taosArrayClear(pVals);
2,835,775✔
1266

1267
    int32_t offset = 0;
2,835,775✔
1268
    // 处理超级表的tbname和tags
1269
    if (pInserter->isStbInserter) {
2,835,775✔
1270
      taosArrayClear(pTagVals);
×
1271
      tbData.uid = 0;
×
1272
      *uid = 0;
×
1273
      tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
1274
      tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
×
1275

1276
      SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
×
1277
      if (NULL == tbname) {
×
1278
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1279
        qError("Insert into stable must have tbname column");
×
1280
        goto _end;
×
1281
      }
1282
      if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
×
1283
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1284
        qError("tbname column must be binary");
×
1285
        goto _end;
×
1286
      }
1287

1288
      if (colDataIsNull_s(tbname, j)) {
×
1289
        qError("insert into stable tbname column is null");
×
1290
        goto _end;
×
1291
      }
1292
      void*   data = colDataGetVarData(tbname, j);
×
1293
      SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data),
×
1294
                            .pData = varDataVal(data)};  // address copy, no value
×
1295
      SColVal cv = COL_VAL_VALUE(0, sv);
×
1296

1297
      // 获取子表vgId
1298
      SDBVgInfo* dbInfo = NULL;
×
1299
      code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
×
1300
      if (code != TSDB_CODE_SUCCESS) {
×
1301
        goto _end;
×
1302
      }
1303

1304
      char tbFullName[TSDB_TABLE_FNAME_LEN];
×
1305
      taosMemoryFreeClear(tableName);
×
1306
      tableName = taosMemoryCalloc(1, sv.nData + 1);
×
1307
      TSDB_CHECK_NULL(tableName, code, lino, _end, terrno);
×
1308
      tstrncpy(tableName, sv.pData, sv.nData);
×
1309
      tableName[sv.nData] = '\0';
×
1310

1311
      int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
×
1312
      if (len >= TSDB_TABLE_FNAME_LEN) {
×
1313
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1314
        qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
1315
        goto _end;
×
1316
      }
1317
      code = inserterGetVgId(dbInfo, tbFullName, vgId);
×
1318
      if (code != TSDB_CODE_SUCCESS) {
×
1319
        terrno = code;
×
1320
        goto _end;
×
1321
      }
1322
      // 解析tag
1323
      SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
×
1324
      if (!TagNames) {
×
1325
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1326
        goto _end;
×
1327
      }
1328
      for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
×
1329
        SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
×
1330
        int16_t  colIdx = tSchema->colId;
×
1331
        if (NULL == taosArrayPush(TagNames, tSchema->name)) {
×
1332
          goto _end;
×
1333
        }
1334
        int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
×
1335
        if (NULL == slotId) {
×
1336
          continue;
×
1337
        }
1338

1339
        colIdx = *slotId;
×
1340
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
×
1341
        if (NULL == pColInfoData) {
×
1342
          terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1343
          goto _end;
×
1344
        }
1345
        // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
1346
        switch (pColInfoData->info.type) {
×
1347
          case TSDB_DATA_TYPE_NCHAR:
×
1348
          case TSDB_DATA_TYPE_VARBINARY:
1349
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1350
            if (pColInfoData->info.type != tSchema->type) {
×
1351
              qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
1352
                     tSchema->type);
1353
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1354
              goto _end;
×
1355
            }
1356
            if (colDataIsNull_s(pColInfoData, j)) {
×
1357
              continue;
×
1358
            } else {
1359
              void*   data = colDataGetVarData(pColInfoData, j);
×
1360
              STagVal tv = (STagVal){.cid = tSchema->colId,
×
1361
                                     .type = tSchema->type,
×
1362
                                     .nData = varDataLen(data),
×
1363
                                     .pData = varDataVal(data)};  // address copy, no value
×
1364
              if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1365
                goto _end;
×
1366
              }
1367
            }
1368
            break;
×
1369
          }
1370
          case TSDB_DATA_TYPE_BLOB:
×
1371
          case TSDB_DATA_TYPE_JSON:
1372
          case TSDB_DATA_TYPE_MEDIUMBLOB:
1373
            qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1374
            terrno = TSDB_CODE_APP_ERROR;
×
1375
            goto _end;
×
1376
            break;
1377
          default:
×
1378
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
1379
              if (colDataIsNull_s(pColInfoData, j)) {
×
1380
                continue;
×
1381
              } else {
1382
                void*   data = colDataGetData(pColInfoData, j);
×
1383
                STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
×
1384
                memcpy(&tv.i64, data, tSchema->bytes);
×
1385
                if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1386
                  goto _end;
×
1387
                }
1388
              }
1389
            } else {
1390
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1391
              terrno = TSDB_CODE_APP_ERROR;
×
1392
              goto _end;
×
1393
            }
1394
            break;
×
1395
        }
1396
      }
1397
      STag* pTag = NULL;
×
1398
      code = tTagNew(pTagVals, 1, false, &pTag);
×
1399
      if (code != TSDB_CODE_SUCCESS) {
×
1400
        terrno = code;
×
1401
        qError("failed to create tag, error:%s", tstrerror(code));
×
1402
        goto _end;
×
1403
      }
1404

1405
      code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, *suid, pInserter->pNode->tableName, TagNames,
×
1406
                               pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
×
1407
    }
1408

1409
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {  // iterate by column
14,203,436✔
1410
      int16_t         colIdx = k;
11,367,661✔
1411
      const STColumn* pCol = &pTSchema->columns[k];
11,367,661✔
1412
      if (!pInserter->fullOrderColList) {
11,367,661✔
1413
        int16_t* slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
178,024✔
1414
        if (NULL == slotId) {
178,024✔
1415
          continue;
46,312✔
1416
        }
1417

1418
        colIdx = *slotId;
131,712✔
1419
      }
1420

1421
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
11,321,349✔
1422
      if (NULL == pColInfoData) {
11,321,349✔
1423
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1424
        goto _end;
×
1425
      }
1426
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
11,321,349✔
1427

1428
      switch (pColInfoData->info.type) {
11,321,349✔
1429
        case TSDB_DATA_TYPE_NCHAR:
141,119✔
1430
        case TSDB_DATA_TYPE_VARBINARY:
1431
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1432
          if (pColInfoData->info.type != pCol->type) {
141,119✔
1433
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1434
                   pCol->type);
1435
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1436
            goto _end;
×
1437
          }
1438
          if (colDataIsNull_s(pColInfoData, j)) {
282,238✔
1439
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
2,644✔
1440
            if (NULL == taosArrayPush(pVals, &cv)) {
2,644✔
1441
              goto _end;
×
1442
            }
1443
          } else {
1444
            void*  data = colDataGetVarData(pColInfoData, j);
138,475✔
1445
            SValue sv = (SValue){
415,425✔
1446
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
138,475✔
1447
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
138,475✔
1448
            if (NULL == taosArrayPush(pVals, &cv)) {
138,475✔
1449
              goto _end;
×
1450
            }
1451
          }
1452
          break;
141,119✔
1453
        }
1454
        case TSDB_DATA_TYPE_BLOB:
×
1455
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1456
        case TSDB_DATA_TYPE_JSON:
1457
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1458
          terrno = TSDB_CODE_APP_ERROR;
×
1459
          goto _end;
×
1460
          break;
1461
        default:
11,180,230✔
1462
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
11,180,230✔
1463
            if (colDataIsNull_s(pColInfoData, j)) {
22,360,460✔
1464
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
8,228✔
1465
                qError("Primary timestamp column should not be null");
×
1466
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1467
                goto _end;
×
1468
              }
1469

1470
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
8,228✔
1471
              if (NULL == taosArrayPush(pVals, &cv)) {
8,228✔
1472
                goto _end;
×
1473
              }
1474
            } else {
1475
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
11,172,002✔
1476
                if (*(int64_t*)var <= lastTs) {
2,789,919✔
1477
                  needSortMerge = true;
11,048✔
1478
                } else {
1479
                  lastTs = *(int64_t*)var;
2,778,871✔
1480
                }
1481
              }
1482

1483
              SValue sv = {.type = pCol->type};
11,172,002✔
1484
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
11,172,002✔
1485
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
11,172,002✔
1486
              if (NULL == taosArrayPush(pVals, &cv)) {
11,172,002✔
1487
                goto _end;
×
1488
              }
1489
            }
1490
          } else {
1491
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1492
            terrno = TSDB_CODE_APP_ERROR;
×
1493
            goto _end;
×
1494
          }
1495
          break;
11,180,230✔
1496
      }
1497
    }
1498

1499
    SRow*             pRow = NULL;
2,835,775✔
1500
    SRowBuildScanInfo sinfo = {0};
2,835,775✔
1501
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
2,835,775✔
1502
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
7,932✔
1503
      goto _end;
7,932✔
1504
    }
1505
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
5,655,686✔
1506
      goto _end;
×
1507
    }
1508
  }
1509

1510
  if (needSortMerge) {
52,324✔
1511
    if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
11,048✔
1512
        (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
11,048✔
1513
      goto _end;
×
1514
    }
1515
  }
1516

1517
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
104,648✔
1518
    goto _end;
×
1519
  }
1520

1521
_end:
60,256✔
1522

1523
  taosMemoryFreeClear(tableName);
60,256✔
1524

1525
  taosArrayDestroy(pTagVals);
60,256✔
1526
  taosArrayDestroy(pVals);
60,256✔
1527

1528
  if (terrno != 0) {
60,256✔
1529
    *ppReq = NULL;
7,932✔
1530
    if (pReq) {
7,932✔
1531
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
7,932✔
1532
      taosMemoryFree(pReq);
7,932✔
1533
    }
1534

1535
    return terrno;
7,932✔
1536
  }
1537
  *ppReq = pReq;
52,324✔
1538

1539
  return TSDB_CODE_SUCCESS;
52,324✔
1540
}
1541

1542
static void destroySubmitReqWrapper(void* p) {
5,088✔
1543
  SSubmitReqSendInfo* pSendInfo = *(SSubmitReqSendInfo**)p;
5,088✔
1544
  if (pSendInfo != NULL) {
5,088✔
1545
    if (pSendInfo->msg != NULL) {
5,088✔
1546
      tDestroySubmitReq(pSendInfo->msg, TSDB_MSG_FLG_ENCODE);
5,088✔
1547
      taosMemoryFree(pSendInfo->msg);
5,088✔
1548
    }
1549
    taosMemoryFree(pSendInfo);
5,088✔
1550
  }
1551
}
5,088✔
1552

1553
int32_t dataBlocksToSubmitReqArray(SDataInserterHandle* pInserter, SArray* pMsgs) {
2,544✔
1554
  const SArray*   pBlocks = pInserter->pDataBlocks;
2,544✔
1555
  const STSchema* pTSchema = pInserter->pSchema;
2,544✔
1556
  int64_t         uid = pInserter->pNode->tableId;
2,544✔
1557
  int64_t         suid = pInserter->pNode->stableId;
2,544✔
1558
  int32_t         vgId = pInserter->pNode->vgId;
2,544✔
1559
  int32_t         sz = taosArrayGetSize(pBlocks);
2,544✔
1560
  int32_t         code = 0;
2,544✔
1561

1562
  SHashObj* pHash = NULL;
2,544✔
1563
  void*     iterator = NULL;
2,544✔
1564

1565
  for (int32_t i = 0; i < sz; i++) {
5,088✔
1566
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
2,544✔
1567
    if (NULL == pDataBlock) {
2,544✔
1568
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1569
    }
1570
    if (pHash == NULL) {
2,544✔
1571
      pHash = taosHashInit(sz * pDataBlock->info.rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false,
2,544✔
1572
                           HASH_ENTRY_LOCK);
1573
      if (NULL == pHash) {
2,544✔
1574
        return terrno;
×
1575
      }
1576
      taosHashSetFreeFp(pHash, destroySubmitReqWrapper);
2,544✔
1577
    }
1578
    code = buildSubmitReqFromStbBlock(pInserter, pHash, pDataBlock, pTSchema, uid, vgId, suid);
2,544✔
1579
    if (code != TSDB_CODE_SUCCESS) {
2,544✔
1580
      goto _end;
×
1581
    }
1582
  }
1583

1584
  size_t keyLen = 0;
2,544✔
1585
  while ((iterator = taosHashIterate(pHash, iterator))) {
7,632✔
1586
    SSubmitReqSendInfo* pReqSendInfo = *(SSubmitReqSendInfo**)iterator;
5,088✔
1587
    int32_t*            ctbVgId = taosHashGetKey(iterator, &keyLen);
5,088✔
1588

1589
    SSubmitTbDataSendInfo* pTbSendInfo = taosMemoryCalloc(1, sizeof(SSubmitTbDataSendInfo));
5,088✔
1590
    if (NULL == pTbSendInfo) {
5,088✔
1591
      code = terrno;
×
1592
      goto _end;
×
1593
    }
1594
    code = submitReqToMsg(*ctbVgId, pReqSendInfo->msg, &pTbSendInfo->msg.pData, &pTbSendInfo->msg.len);
5,088✔
1595
    if (code != TSDB_CODE_SUCCESS) {
5,088✔
NEW
1596
      taosMemoryFree(pTbSendInfo);
×
UNCOV
1597
      goto _end;
×
1598
    }
1599
    pTbSendInfo->epSet = pReqSendInfo->epSet;
5,088✔
1600
    if (NULL == taosArrayPush(pMsgs, &pTbSendInfo)) {
5,088✔
NEW
1601
      taosMemoryFree(pTbSendInfo->msg.pData);
×
NEW
1602
      taosMemoryFree(pTbSendInfo);
×
1603
      code = terrno;
×
1604
      goto _end;
×
1605
    }
1606
  }
1607

1608
_end:
2,544✔
1609
  if (pHash != NULL) {
2,544✔
1610
    taosHashCleanup(pHash);
2,544✔
1611
  }
1612

1613
  return code;
2,544✔
1614
}
1615

1616
int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32_t* msgLen) {
60,256✔
1617
  const SArray*   pBlocks = pInserter->pDataBlocks;
60,256✔
1618
  const STSchema* pTSchema = pInserter->pSchema;
60,256✔
1619
  int64_t         uid = pInserter->pNode->tableId;
60,256✔
1620
  int64_t         suid = pInserter->pNode->stableId;
60,256✔
1621
  int32_t         vgId = pInserter->pNode->vgId;
60,256✔
1622
  int32_t         sz = taosArrayGetSize(pBlocks);
60,256✔
1623
  int32_t         code = 0;
60,256✔
1624
  SSubmitReq2*    pReq = NULL;
60,256✔
1625

1626
  for (int32_t i = 0; i < sz; i++) {
112,580✔
1627
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
60,256✔
1628
    if (NULL == pDataBlock) {
60,256✔
1629
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1630
    }
1631
    code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, &uid, &vgId, &suid);
60,256✔
1632
    if (code) {
60,256✔
1633
      if (pReq) {
7,932✔
1634
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
1635
        taosMemoryFree(pReq);
×
1636
      }
1637

1638
      return code;
7,932✔
1639
    }
1640
  }
1641

1642
  code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
52,324✔
1643
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
52,324✔
1644
  taosMemoryFree(pReq);
52,324✔
1645

1646
  return code;
52,324✔
1647
}
1648

1649
int32_t getStreamInsertTableInfo(int64_t streamId, int64_t groupId, SInsertTableInfo*** ppTbInfo) {
5,331,297✔
1650
  int64_t            key[2] = {streamId, groupId};
5,331,297✔
1651
  SInsertTableInfo** pTmp = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
5,332,100✔
1652
  if (NULL == pTmp || *pTmp == NULL) {
5,332,195✔
1653
    return TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
1654
  }
1655

1656
  *ppTbInfo = pTmp;
5,331,765✔
1657
  return TSDB_CODE_SUCCESS;
5,332,195✔
1658
}
1659

1660
static int32_t releaseStreamInsertTableInfo(SInsertTableInfo** ppTbInfo) {
5,331,792✔
1661
  taosHashRelease(gStreamGrpTableHash, ppTbInfo);
5,331,792✔
1662
  return TSDB_CODE_SUCCESS;
5,332,195✔
1663
}
1664

1665
int32_t buildNormalTableCreateReq(SDataInserterHandle* pInserter, SStreamInserterParam* pInsertParam,
127,110✔
1666
                                  SSubmitTbData* tbData) {
1667
  int32_t code = TSDB_CODE_SUCCESS;
127,110✔
1668

1669
  tbData->suid = 0;
127,110✔
1670

1671
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
127,110✔
1672
  if (NULL == tbData->pCreateTbReq) {
127,015✔
1673
    goto _end;
×
1674
  }
1675
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
127,015✔
1676
  tbData->pCreateTbReq->type = TSDB_NORMAL_TABLE;
127,015✔
1677
  tbData->pCreateTbReq->flags |= (TD_CREATE_NORMAL_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
127,015✔
1678
  tbData->pCreateTbReq->uid = 0;
127,015✔
1679
  tbData->sver = pInsertParam->sver;
127,015✔
1680

1681
  tbData->pCreateTbReq->name = taosStrdup(pInsertParam->tbname);
127,015✔
1682
  if (!tbData->pCreateTbReq->name) return terrno;
127,110✔
1683

1684
  int32_t numOfCols = pInsertParam->pFields->size;
127,110✔
1685
  tbData->pCreateTbReq->ntb.schemaRow.nCols = numOfCols;
127,110✔
1686
  tbData->pCreateTbReq->ntb.schemaRow.version = 1;
127,110✔
1687

1688
  tbData->pCreateTbReq->ntb.schemaRow.pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema));
127,110✔
1689
  if (NULL == tbData->pCreateTbReq->ntb.schemaRow.pSchema) {
127,110✔
1690
    goto _end;
×
1691
  }
1692
  for (int32_t i = 0; i < numOfCols; ++i) {
727,039✔
1693
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
599,929✔
1694
    if (NULL == pField) {
599,929✔
1695
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1696
      goto _end;
×
1697
    }
1698
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].colId = i + 1;
599,929✔
1699
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].type = pField->type;
599,929✔
1700
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].bytes = pField->bytes;
599,929✔
1701
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].flags = pField->flags;
599,929✔
1702
    if (i == 0 && pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
599,929✔
1703
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1704
      qError("buildNormalTableCreateReq, the first column must be timestamp.");
×
1705
      goto _end;
×
1706
    }
1707
    snprintf(tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].name, TSDB_COL_NAME_LEN, "%s", pField->name);
599,929✔
1708
    if (IS_DECIMAL_TYPE(pField->type)) {
599,929✔
1709
      if (!tbData->pCreateTbReq->pExtSchemas) {
×
1710
        tbData->pCreateTbReq->pExtSchemas = taosMemoryCalloc(numOfCols, sizeof(SExtSchema));
×
1711
        if (NULL == tbData->pCreateTbReq->pExtSchemas) {
×
1712
          tdDestroySVCreateTbReq(tbData->pCreateTbReq);
×
1713
          tbData->pCreateTbReq = NULL;
×
1714
          return terrno;
×
1715
        }
1716
      }
1717
      tbData->pCreateTbReq->pExtSchemas[i].typeMod = pField->typeMod;
×
1718
    }
1719
  }
1720
  return TSDB_CODE_SUCCESS;
127,110✔
1721
_end:
×
1722
  return code;
×
1723
}
1724

1725
// reference tBuildTSchema funciton
1726
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema) {
451,057✔
1727
  int32_t code = TSDB_CODE_SUCCESS;
451,057✔
1728

1729
  int32_t   numOfCols = pInsertParam->pFields->size;
451,057✔
1730
  STSchema* pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
451,430✔
1731
  if (NULL == pTSchema) {
451,531✔
1732
    return terrno;
×
1733
  }
1734
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
451,531✔
1735
    pTSchema->version =
127,110✔
1736
        1;  // normal table version start from 1, if has exist table, it will be reset by resetInserterTbVersion
1737
  } else {
1738
    pTSchema->version = pInsertParam->sver;
324,421✔
1739
  }
1740
  pTSchema->numOfCols = numOfCols;
451,531✔
1741

1742
  SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, 0);
451,531✔
1743
  if (NULL == pField) {
451,430✔
1744
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1745
    goto _end;
×
1746
  }
1747
  pTSchema->columns[0].colId = PRIMARYKEY_TIMESTAMP_COL_ID;
451,430✔
1748
  pTSchema->columns[0].type = pField->type;
451,430✔
1749
  pTSchema->columns[0].flags = pField->flags;
451,430✔
1750
  pTSchema->columns[0].bytes = TYPE_BYTES[pField->type];
451,430✔
1751
  pTSchema->columns[0].offset = -1;
451,430✔
1752

1753
  pTSchema->tlen = 0;
451,430✔
1754
  pTSchema->flen = 0;
451,430✔
1755
  for (int32_t i = 1; i < numOfCols; ++i) {
2,146,190✔
1756
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
1,694,760✔
1757
    if (NULL == pField) {
1,693,630✔
1758
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1759
      goto _end;
×
1760
    }
1761
    pTSchema->columns[i].colId = i + 1;
1,693,630✔
1762
    pTSchema->columns[i].type = pField->type;
1,693,630✔
1763
    pTSchema->columns[i].flags = pField->flags;
1,694,760✔
1764
    pTSchema->columns[i].bytes = pField->bytes;
1,694,760✔
1765
    pTSchema->columns[i].offset = pTSchema->flen;
1,694,760✔
1766

1767
    if (IS_VAR_DATA_TYPE(pField->type)) {
1,694,760✔
1768
      pTSchema->columns[i].bytes = pField->bytes;
26,222✔
1769
      pTSchema->tlen += (TYPE_BYTES[pField->type] + pField->bytes);
25,817✔
1770
    } else {
1771
      pTSchema->columns[i].bytes = TYPE_BYTES[pField->type];
1,668,671✔
1772
      pTSchema->tlen += TYPE_BYTES[pField->type];
1,668,671✔
1773
    }
1774

1775
    pTSchema->flen += TYPE_BYTES[pField->type];
1,695,165✔
1776
  }
1777

1778
#if 1
1779
  pTSchema->tlen += (int32_t)TD_BITMAP_BYTES(numOfCols);
451,430✔
1780
#endif
1781

1782
_end:
451,430✔
1783
  if (code != TSDB_CODE_SUCCESS) {
451,430✔
1784
    taosMemoryFree(pTSchema);
×
1785
    *ppTSchema = NULL;
×
1786
  } else {
1787
    *ppTSchema = pTSchema;
451,430✔
1788
  }
1789
  return code;
451,430✔
1790
}
1791

1792
static int32_t getTagValsFromStreamInserterInfo(SStreamDataInserterInfo* pInserterInfo, int32_t preCols,
323,991✔
1793
                                                SArray** ppTagVals) {
1794
  int32_t code = TSDB_CODE_SUCCESS;
323,991✔
1795
  int32_t nTags = pInserterInfo->pTagVals->size;
323,991✔
1796
  *ppTagVals = taosArrayInit(nTags, sizeof(STagVal));
323,991✔
1797
  if (!ppTagVals) {
324,421✔
1798
    return terrno;
×
1799
  }
1800
  for (int32_t i = 0; i < pInserterInfo->pTagVals->size; ++i) {
867,997✔
1801
    SStreamTagInfo* pTagInfo = taosArrayGet(pInserterInfo->pTagVals, i);
543,519✔
1802
    STagVal         tagVal = {
543,139✔
1803
                .cid = preCols + i + 1,
543,114✔
1804
                .type = pTagInfo->val.data.type,
543,114✔
1805
    };
1806
    if (!pTagInfo->val.isNull) {
542,741✔
1807
      if (IS_VAR_DATA_TYPE(pTagInfo->val.data.type)) {
543,139✔
1808
        tagVal.nData = pTagInfo->val.data.nData;
369,076✔
1809
        tagVal.pData = pTagInfo->val.data.pData;
369,506✔
1810
      } else {
1811
        tagVal.i64 = pTagInfo->val.data.val;
174,443✔
1812
      }
1813

1814
      if (NULL == taosArrayPush(*ppTagVals, &tagVal)) {
1,086,260✔
1815
        code = terrno;
×
1816
        goto _end;
×
1817
      }
1818
    }
1819
  }
1820
_end:
324,048✔
1821
  if (code != TSDB_CODE_SUCCESS) {
323,618✔
1822
    taosArrayDestroy(*ppTagVals);
×
1823
    *ppTagVals = NULL;
×
1824
  }
1825
  return code;
323,618✔
1826
}
1827

1828
static int32_t buildStreamSubTableCreateReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
323,618✔
1829
                                            SStreamInserterParam* pInsertParam, SStreamDataInserterInfo* pInserterInfo,
1830
                                            SSubmitTbData* tbData) {
1831
  int32_t code = TSDB_CODE_SUCCESS;
323,618✔
1832
  STag*   pTag = NULL;
323,618✔
1833
  SArray* pTagVals = NULL;
324,421✔
1834
  SArray* TagNames = NULL;
324,421✔
1835

1836
  if (pInsertParam->pTagFields == NULL) {
324,421✔
1837
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagFields is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
×
1838
                 pInsertParam->sver);
1839
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1840
  }
1841
  if (pInserterInfo->pTagVals == NULL || pInserterInfo->pTagVals->size == 0) {
323,991✔
1842
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagVals is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
430✔
1843
                 pInsertParam->sver);
1844
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1845
  }
1846
  if (pInsertParam->suid <= 0 || pInsertParam->sver <= 0) {
323,991✔
1847
    ST_TASK_ELOG("buildStreamSubTableCreateReq, suid:%" PRId64
430✔
1848
                 ", sver:%d"
1849
                 " must be greater than 0",
1850
                 pInsertParam->suid, pInsertParam->sver);
1851
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1852
  }
1853
  int32_t nTags = pInserterInfo->pTagVals->size;
323,991✔
1854

1855
  TagNames = taosArrayInit(nTags, TSDB_COL_NAME_LEN);
324,421✔
1856
  if (!TagNames) {
323,586✔
1857
    code = terrno;
×
1858
    goto _end;
×
1859
  }
1860
  for (int32_t i = 0; i < nTags; ++i) {
866,757✔
1861
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pTagFields, i);
542,709✔
1862
    if (NULL == taosArrayPush(TagNames, pField->name)) {
1,086,715✔
1863
      code = terrno;
×
1864
      goto _end;
×
1865
    }
1866
  }
1867

1868
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
324,048✔
1869
  tbData->uid = 0;
324,421✔
1870
  tbData->suid = pInsertParam->suid;
324,421✔
1871
  tbData->sver = pInsertParam->sver;
323,586✔
1872

1873
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
324,421✔
1874
  if (NULL == tbData->pCreateTbReq) {
323,586✔
1875
    code = terrno;
×
1876
    goto _end;
×
1877
  }
1878
  tbData->pCreateTbReq->type = TSDB_CHILD_TABLE;
324,016✔
1879
  tbData->pCreateTbReq->flags |= (TD_CREATE_SUB_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
324,048✔
1880

1881
  code = getTagValsFromStreamInserterInfo(pInserterInfo, pInsertParam->pFields->size, &pTagVals);
323,586✔
1882
  if (code != TSDB_CODE_SUCCESS) {
323,991✔
1883
    goto _end;
×
1884
  }
1885

1886
  code = tTagNew(pTagVals, pInsertParam->sver, false, &pTag);
323,991✔
1887
  if (code != TSDB_CODE_SUCCESS) {
323,991✔
1888
    ST_TASK_ELOG("failed to create tag, error:%s", tstrerror(code));
×
1889
    goto _end;
×
1890
  }
1891
  code = inserterBuildCreateTbReq(tbData->pCreateTbReq, pInserterInfo->tbName, pTag, tbData->suid,
324,421✔
1892
                                  pInsertParam->stbname, TagNames, nTags, TSDB_DEFAULT_TABLE_TTL);
323,991✔
1893
  if (code != TSDB_CODE_SUCCESS) {
322,810✔
1894
    ST_TASK_ELOG("failed to build create table request, error:%s", tstrerror(code));
×
1895
    goto _end;
×
1896
  }
1897

1898
_end:
322,810✔
1899
  if (code != TSDB_CODE_SUCCESS) {
323,618✔
1900
    ST_TASK_ELOG("buildStreamSubTableCreateReq failed, error:%s", tstrerror(code));
×
1901
    if (tbData->pCreateTbReq) {
×
1902
      taosMemoryFreeClear(tbData->pCreateTbReq->name);
×
1903
      taosMemoryFreeClear(tbData->pCreateTbReq);
×
1904
    }
1905
    if (TagNames) {
×
1906
      taosArrayDestroy(TagNames);
×
1907
    }
1908
  }
1909

1910
  if (pTagVals) {
323,618✔
1911
    taosArrayDestroy(pTagVals);
323,618✔
1912
  }
1913
  return code;
323,186✔
1914
}
1915

1916
static int32_t appendInsertData(SStreamInserterParam* pInsertParam, const SSDataBlock* pDataBlock,
5,301,015✔
1917
                                SSubmitTbData* tbData, STSchema* pTSchema, SBuildInsertDataInfo* dataInsertInfo) {
1918
  int32_t code = TSDB_CODE_SUCCESS;
5,301,015✔
1919
  int32_t lino = 0;
5,301,015✔
1920

1921
  int32_t rows = pDataBlock ? pDataBlock->info.rows : 0;
5,301,015✔
1922
  int32_t numOfCols = pInsertParam->pFields->size;
5,301,015✔
1923
  int32_t colNum = pDataBlock ? taosArrayGetSize(pDataBlock->pDataBlock) : 0;
5,301,418✔
1924

1925
  SArray* pVals = NULL;
5,301,418✔
1926
  if (!(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
5,301,418✔
1927
    code = terrno;
×
1928
    QUERY_CHECK_CODE(code, lino, _end);
×
1929
  }
1930

1931
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
49,797,227✔
1932
    taosArrayClear(pVals);
44,495,836✔
1933

1934
    bool tsOrPrimaryKeyIsNull = false;
44,495,836✔
1935
    for (int32_t k = 0; k < numOfCols; ++k) {  // iterate by column
149,076,783✔
1936
      int16_t colIdx = k + 1;
104,595,132✔
1937

1938
      SFieldWithOptions* pCol = taosArrayGet(pInsertParam->pFields, k);
104,595,132✔
1939
      if (PRIMARYKEY_TIMESTAMP_COL_ID != colIdx && TSDB_DATA_TYPE_NULL == pCol->type) {
104,595,535✔
1940
        SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
×
1941
        if (NULL == taosArrayPush(pVals, &cv)) {
×
1942
          code = terrno;
×
1943
          QUERY_CHECK_CODE(code, lino, _end);
×
1944
        }
1945
        continue;
×
1946
      }
1947

1948
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
104,595,938✔
1949
      if (NULL == pColInfoData) {
104,593,360✔
1950
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1951
        QUERY_CHECK_CODE(code, lino, _end);
×
1952
      }
1953
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
104,593,360✔
1954

1955
      if (colDataIsNull_s(pColInfoData, j) && (pCol->flags & COL_IS_KEY)) {
209,198,405✔
1956
        tsOrPrimaryKeyIsNull = true;
12,325✔
1957
        qDebug("Primary key column should not be null, skip this row");
12,325✔
1958
        break;
12,325✔
1959
      }
1960
      switch (pColInfoData->info.type) {
104,590,865✔
1961
        case TSDB_DATA_TYPE_NCHAR:
4,747,381✔
1962
        case TSDB_DATA_TYPE_VARBINARY:
1963
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1964
          if (pColInfoData->info.type != pCol->type) {
4,747,381✔
1965
            qError("tb:%s column:%d type:%d in block dismatch with schema col:%d type:%d", pInsertParam->tbname, k,
×
1966
                   pColInfoData->info.type, k, pCol->type);
1967
            code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1968
            QUERY_CHECK_CODE(code, lino, _end);
×
1969
          }
1970
          if (colDataIsNull_s(pColInfoData, j)) {
9,494,762✔
1971
            SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
4,073✔
1972
            if (NULL == taosArrayPush(pVals, &cv)) {
4,073✔
1973
              code = terrno;
×
1974
              QUERY_CHECK_CODE(code, lino, _end);
×
1975
            }
1976
          } else {
1977
            if (pColInfoData->pData == NULL) {
4,743,308✔
1978
              qError("build insert tb:%s, column:%d data is NULL in block", pInsertParam->tbname, k);
×
1979
              code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1980
              QUERY_CHECK_CODE(code, lino, _end);
×
1981
            }
1982
            void*  data = colDataGetVarData(pColInfoData, j);
4,743,308✔
1983
            SValue sv = (SValue){
14,229,924✔
1984
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
4,743,308✔
1985
            SColVal cv = COL_VAL_VALUE(colIdx, sv);
4,743,308✔
1986
            if (NULL == taosArrayPush(pVals, &cv)) {
4,743,308✔
1987
              code = terrno;
×
1988
              QUERY_CHECK_CODE(code, lino, _end);
×
1989
            }
1990
          }
1991
          break;
4,747,381✔
1992
        }
1993
        case TSDB_DATA_TYPE_BLOB:
×
1994
        case TSDB_DATA_TYPE_JSON:
1995
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1996
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1997
          code = TSDB_CODE_APP_ERROR;
×
1998
          QUERY_CHECK_CODE(code, lino, _end);
×
1999
          break;
×
2000
        default:
99,837,922✔
2001
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
99,837,922✔
2002
            if (colDataIsNull_s(pColInfoData, j)) {
199,680,515✔
2003
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx) {
2,048,794✔
2004
                tsOrPrimaryKeyIsNull = true;
4,035✔
2005
                qDebug("Primary timestamp column should not be null, skip this row");
4,035✔
2006
                break;
4,035✔
2007
              }
2008

2009
              SColVal cv = COL_VAL_NULL(colIdx, pCol->type);  // should use pCol->type
2,044,759✔
2010
              if (NULL == taosArrayPush(pVals, &cv)) {
2,045,162✔
2011
                code = terrno;
×
2012
                QUERY_CHECK_CODE(code, lino, _end);
×
2013
              }
2014
            } else {
2015
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx && !dataInsertInfo->needSortMerge) {
97,793,477✔
2016
                if (*(int64_t*)var <= dataInsertInfo->lastTs) {
3,533,552✔
2017
                  dataInsertInfo->needSortMerge = true;
33,109✔
2018
                } else {
2019
                  dataInsertInfo->lastTs = *(int64_t*)var;
3,500,039✔
2020
                }
2021
              }
2022

2023
              SValue sv = {.type = pCol->type};
97,793,878✔
2024
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
97,794,287✔
2025
              SColVal cv = COL_VAL_VALUE(colIdx, sv);
97,793,562✔
2026
              if (NULL == taosArrayPush(pVals, &cv)) {
97,794,287✔
2027
                code = terrno;
×
2028
                QUERY_CHECK_CODE(code, lino, _end);
×
2029
              }
2030
            }
2031
          } else {
2032
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
2033
            code = TSDB_CODE_APP_ERROR;
×
2034
            QUERY_CHECK_CODE(code, lino, _end);
×
2035
          }
2036
          break;
99,833,163✔
2037
      }
2038
      if (tsOrPrimaryKeyIsNull) break;  // skip remaining columns because the primary key is null
104,584,982✔
2039
    }
2040
    if (tsOrPrimaryKeyIsNull) continue;  // skip this row if primary key is null
44,498,011✔
2041
    SRow*             pRow = NULL;
44,481,651✔
2042
    SRowBuildScanInfo sinfo = {0};
44,479,129✔
2043
    if ((code = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) != TSDB_CODE_SUCCESS) {
44,479,559✔
2044
      QUERY_CHECK_CODE(code, lino, _end);
×
2045
    }
2046
    if (NULL == taosArrayPush(tbData->aRowP, &pRow)) {
88,960,568✔
2047
      taosMemFree(pRow);
×
2048
      code = terrno;
×
2049
      QUERY_CHECK_CODE(code, lino, _end);
×
2050
    }
2051
  }
2052
  if (dataInsertInfo->isLastBlock) {
5,301,391✔
2053
    int32_t nRows = taosArrayGetSize(tbData->aRowP);
5,299,474✔
2054
    if (taosArrayGetSize(tbData->aRowP) == 0) {
5,299,069✔
2055
      tbData->flags |= SUBMIT_REQ_ONLY_CREATE_TABLE;
3,991,764✔
2056
      stDebug("no valid data to insert, try to only create tabale:%s", pInsertParam->tbname);
3,992,169✔
2057
    }
2058
    stDebug("appendInsertData, isLastBlock:%d, needSortMerge:%d, totalRows:%d", dataInsertInfo->isLastBlock,
5,299,474✔
2059
            dataInsertInfo->needSortMerge, nRows);
2060
    if (dataInsertInfo->needSortMerge) {
5,299,474✔
2061
      if ((tRowSort(tbData->aRowP) != TSDB_CODE_SUCCESS) ||
66,218✔
2062
          (code = tRowMerge(tbData->aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
33,109✔
2063
        QUERY_CHECK_CODE(code, lino, _end);
×
2064
      }
2065
    }
2066
    nRows = taosArrayGetSize(tbData->aRowP);
5,299,071✔
2067
    stDebug("appendInsertData, after merge, totalRows:%d", nRows);
5,299,069✔
2068
  }
2069

2070
_end:
265,886✔
2071
  taosArrayDestroy(pVals);
5,301,443✔
2072
  return code;
5,300,613✔
2073
}
2074

2075
int32_t buildStreamSubmitReqFromBlock(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
5,301,418✔
2076
                                      SStreamDataInserterInfo* pInserterInfo, SSubmitReq2** ppReq,
2077
                                      const SSDataBlock* pDataBlock, SVgroupInfo* vgInfo,
2078
                                      SBuildInsertDataInfo* tbDataInfo) {
2079
  SSubmitReq2* pReq = *ppReq;
5,301,418✔
2080
  int32_t      numOfBlks = 0;
5,301,418✔
2081

2082
  int32_t               code = TSDB_CODE_SUCCESS;
5,301,418✔
2083
  int32_t               lino = 0;
5,301,418✔
2084
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
5,301,418✔
2085
  SInsertTableInfo**    ppTbInfo = NULL;
5,301,418✔
2086
  SInsertTableInfo*     pTbInfo = NULL;
5,301,418✔
2087
  STSchema*             pTSchema = NULL;
5,301,418✔
2088
  SSubmitTbData*        tbData = &tbDataInfo->pTbData;
5,301,418✔
2089
  int32_t               colNum = 0;
5,301,418✔
2090
  int32_t               rows = 0;
5,301,418✔
2091

2092
  if (NULL == pReq) {
5,301,418✔
2093
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
5,299,474✔
2094
      code = terrno;
×
2095
      QUERY_CHECK_CODE(code, lino, _end);
×
2096
    }
2097
    *ppReq = pReq;
5,299,474✔
2098

2099
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
5,299,474✔
2100
      code = terrno;
×
2101
      QUERY_CHECK_CODE(code, lino, _end);
×
2102
    }
2103
  }
2104

2105
  if (pDataBlock) {
5,301,418✔
2106
    colNum = taosArrayGetSize(pDataBlock->pDataBlock);
1,313,284✔
2107
    rows = pDataBlock->info.rows;
1,313,284✔
2108
  }
2109

2110
  tbData->flags |= SUBMIT_REQ_SCHEMA_RES;
5,301,418✔
2111

2112
  if (tbDataInfo->isFirstBlock) {
5,301,418✔
2113
    if (pInserterInfo->isAutoCreateTable) {
5,299,474✔
2114
      code = initTableInfo(pInserter, pInserterInfo);
451,531✔
2115
      QUERY_CHECK_CODE(code, lino, _end);
451,531✔
2116
      if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
451,531✔
2117
        code = buildNormalTableCreateReq(pInserter, pInsertParam, tbData);
127,110✔
2118
      } else if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
323,991✔
2119
        code = buildStreamSubTableCreateReq(pTask, pInserter, pInsertParam, pInserterInfo, tbData);
323,991✔
2120
      } else {
2121
        code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
2122
        ST_TASK_ELOG("buildStreamSubmitReqFromBlock, unknown table type %d", pInsertParam->tbType);
×
2123
      }
2124
      QUERY_CHECK_CODE(code, lino, _end);
451,101✔
2125
    }
2126
  }
2127

2128
  code = getStreamInsertTableInfo(pInserterInfo->streamId, pInserterInfo->groupId, &ppTbInfo);
5,300,988✔
2129
  pTbInfo = *ppTbInfo;
5,301,418✔
2130
  if (tbDataInfo->isFirstBlock) {
5,301,418✔
2131
    if (!pInserterInfo->isAutoCreateTable) {
5,299,044✔
2132
      tstrncpy(pInserterInfo->tbName, pTbInfo->tbname, TSDB_TABLE_NAME_LEN);
4,847,943✔
2133
    }
2134

2135
    tbData->uid = pTbInfo->uid;
5,299,474✔
2136
    tbData->sver = pTbInfo->version;
5,299,474✔
2137

2138
    if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
5,299,069✔
2139
      tbData->suid = pInsertParam->suid;
4,360,547✔
2140
    }
2141

2142
    pTSchema = pTbInfo->pSchema;
5,298,261✔
2143
  } else {
2144
    pTSchema = pTbInfo->pSchema;
1,944✔
2145
  }
2146

2147
  code = getTableVgInfo(pInserter, pInsertParam->dbFName, pTbInfo->tbname, vgInfo);
5,300,585✔
2148
  QUERY_CHECK_CODE(code, lino, _end);
5,301,418✔
2149

2150
  ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64 " tbname:%s autoCreate:%d uid:%" PRId64 " suid:%" PRId64
5,301,418✔
2151
               " sver:%d vgid:%d isLastBlock:%d",
2152
               pInserter, pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, tbData->uid,
2153
               tbData->suid, tbData->sver, vgInfo->vgId, tbDataInfo->isFirstBlock);
2154

2155
  code = appendInsertData(pInsertParam, pDataBlock, tbData, pTSchema, tbDataInfo);
5,301,418✔
2156
  QUERY_CHECK_CODE(code, lino, _end);
5,301,015✔
2157

2158
_end:
5,301,015✔
2159
  releaseStreamInsertTableInfo(ppTbInfo);
5,301,015✔
2160
  if (code != TSDB_CODE_SUCCESS) {
5,301,418✔
2161
    ST_TASK_ELOG("buildStreamSubmitReqFromBlock, code:0x%0x, groupId:%" PRId64 " tbname:%s autoCreate:%d", code,
×
2162
                 pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable);
2163
  }
2164
  return code;
5,301,418✔
2165
}
2166

2167
int32_t streamDataBlocksToSubmitReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
5,298,576✔
2168
                                    SStreamDataInserterInfo* pInserterInfo, void** pMsg, int32_t* msgLen,
2169
                                    SVgroupInfo* vgInfo) {
2170
  int32_t code = 0;
5,298,576✔
2171
  int32_t lino = 0;
5,298,576✔
2172

2173
  const SArray*        pBlocks = pInserter->pDataBlocks;
5,298,576✔
2174
  int32_t              sz = taosArrayGetSize(pBlocks);
5,299,379✔
2175
  SSubmitReq2*         pReq = NULL;
5,299,474✔
2176
  SBuildInsertDataInfo tbDataInfo = {0};
5,299,474✔
2177

2178
  int32_t rows = 0;
5,299,044✔
2179
  for (int32_t i = 0; i < sz; i++) {
10,600,109✔
2180
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
5,300,988✔
2181
    if (NULL == pDataBlock) {
5,301,418✔
2182
      stDebug("data block is NULL, just create empty table");
3,988,134✔
2183
      continue;
3,988,134✔
2184
    }
2185
    rows += pDataBlock->info.rows;
1,313,284✔
2186
  }
2187
  code = initInsertProcessInfo(&tbDataInfo, rows);
5,299,121✔
2188
  if (code != TSDB_CODE_SUCCESS) {
5,299,474✔
2189
    ST_TASK_ELOG("streamDataBlocksToSubmitReq, initInsertDataInfo failed, code:%d", code);
430✔
2190
    return code;
×
2191
  }
2192

2193
  for (int32_t i = 0; i < sz; i++) {
10,600,462✔
2194
    tbDataInfo.isFirstBlock = (i == 0);
5,300,988✔
2195
    tbDataInfo.isLastBlock = (i == sz - 1);
5,300,988✔
2196
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
5,300,988✔
2197
    ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64
5,301,418✔
2198
            " tbname:%s autoCreate:%d block: %d/%d rows:%" PRId64,
2199
            pInserter, pInserterInfo->groupId, pInserterInfo->tbName,
2200
            pInserterInfo->isAutoCreateTable, i + 1, sz, (pDataBlock != NULL ? pDataBlock->info.rows : 0));
2201
    code = buildStreamSubmitReqFromBlock(pTask, pInserter, pInserterInfo, &pReq, pDataBlock, vgInfo, &tbDataInfo);
5,301,848✔
2202
    QUERY_CHECK_CODE(code, lino, _end);
5,301,418✔
2203
  }
2204

2205
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbDataInfo.pTbData)) {
10,598,948✔
2206
    code = terrno;
×
2207
    QUERY_CHECK_CODE(code, lino, _end);
×
2208
  }
2209

2210
  code = submitReqToMsg(vgInfo->vgId, pReq, pMsg, msgLen);
5,299,474✔
2211
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
5,298,229✔
2212
  taosMemoryFree(pReq);
5,297,420✔
2213
  ST_TASK_DLOG("[data inserter], submit req, vgid:%d, GROUP:%" PRId64 " tbname:%s autoCreate:%d code:%d ", vgInfo->vgId,
5,298,625✔
2214
               pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, code);
2215

2216
_end:
5,282,412✔
2217
  if (code != 0) {
5,299,072✔
2218
    tDestroySubmitTbData(&tbDataInfo.pTbData, TSDB_MSG_FLG_ENCODE);
×
2219
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
2220
    taosMemoryFree(pReq);
×
2221
  }
2222

2223
  return code;
5,299,474✔
2224
}
2225

2226
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
68,083✔
2227
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
68,083✔
2228
  if (!pInserter->explain) {
68,083✔
2229
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
125,600✔
2230
      return terrno;
×
2231
    }
2232
    if (pInserter->isStbInserter) {
62,800✔
2233
      SArray* pMsgs = taosArrayInit(4, sizeof(POINTER_BYTES));
2,544✔
2234
      if (NULL == pMsgs) {
2,544✔
2235
        return terrno;
×
2236
      }
2237
      int32_t code = dataBlocksToSubmitReqArray(pInserter, pMsgs);
2,544✔
2238
      if (code) {
2,544✔
NEW
2239
        taosArrayDestroyP(pMsgs, destroySSubmitTbDataSendInfo);
×
2240
        return code;
×
2241
      }
2242
      taosArrayClear(pInserter->pDataBlocks);
2,544✔
2243
      for (int32_t i = 0; i < taosArrayGetSize(pMsgs); ++i) {
7,632✔
2244
        SSubmitTbDataSendInfo* pSendInfo = taosArrayGetP(pMsgs, i);
5,088✔
2245
        code = sendSubmitRequest(pInserter, NULL, pSendInfo->msg.pData, pSendInfo->msg.len,
10,176✔
2246
                                 pInserter->pParam->readHandle->pMsgCb->clientRpc, &pSendInfo->epSet);
5,088✔
2247
        taosMemoryFree(pSendInfo);
5,088✔
2248
        if (code) {
5,088✔
2249
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
NEW
2250
            SSubmitTbDataSendInfo* pSendInfo2 = taosArrayGetP(pMsgs, j);
×
NEW
2251
            destroySSubmitTbDataSendInfo(pSendInfo2);
×
2252
          }
2253
          taosArrayDestroy(pMsgs);
×
2254
          return code;
×
2255
        }
2256
        QRY_ERR_RET(tsem_wait(&pInserter->ready));
5,088✔
2257

2258
        if (pInserter->submitRes.code) {
5,088✔
2259
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
NEW
2260
            SSubmitTbDataSendInfo* pSendInfo2 = taosArrayGetP(pMsgs, j);
×
NEW
2261
            destroySSubmitTbDataSendInfo(pSendInfo2);
×
2262
          }
2263
          taosArrayDestroy(pMsgs);
×
2264
          return pInserter->submitRes.code;
×
2265
        }
2266
      }
2267

2268
      taosArrayDestroy(pMsgs);
2,544✔
2269

2270
    } else {
2271
      void*   pMsg = NULL;
60,256✔
2272
      int32_t msgLen = 0;
60,256✔
2273
      int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
60,256✔
2274
      if (code) {
60,256✔
2275
        return code;
7,932✔
2276
      }
2277

2278
      taosArrayClear(pInserter->pDataBlocks);
52,324✔
2279

2280
      code = sendSubmitRequest(pInserter, NULL, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc,
52,324✔
2281
                               &pInserter->pNode->epSet);
52,324✔
2282
      if (code) {
52,324✔
2283
        return code;
×
2284
      }
2285

2286
      QRY_ERR_RET(tsem_wait(&pInserter->ready));
52,324✔
2287

2288
      if (pInserter->submitRes.code) {
52,324✔
2289
        return pInserter->submitRes.code;
×
2290
      }
2291
    }
2292
  }
2293

2294
  *pContinue = true;
60,151✔
2295

2296
  return TSDB_CODE_SUCCESS;
60,151✔
2297
}
2298

2299
static int32_t resetInserterTbVersion(SDataInserterHandle* pInserter, const SInputData* pInput) {
30,345✔
2300
  SInsertTableInfo** ppTbInfo = NULL;
30,345✔
2301
  int32_t           code = getStreamInsertTableInfo(pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId, &ppTbInfo);
30,345✔
2302
  if (code != TSDB_CODE_SUCCESS) {
30,345✔
2303
    return code;
×
2304
  }
2305

2306
  SInsertTableInfo*  pTbInfo  = *ppTbInfo;
30,345✔
2307
  stDebug("resetInserterTbVersion, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbName:%s, uid:%" PRId64 ", version:%d",
30,345✔
2308
          pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId,
2309
          pInput->pStreamDataInserterInfo->tbName, pTbInfo->uid, pTbInfo->version);
2310
  if (pInserter->pParam->streamInserterParam->tbType != TSDB_NORMAL_TABLE) {
30,345✔
2311
    pInserter->pParam->streamInserterParam->sver = pTbInfo->version;
1,612✔
2312
  }
2313
  code = releaseStreamInsertTableInfo(ppTbInfo);
30,345✔
2314
  return code;
30,345✔
2315
}
2316

2317
static int32_t putStreamDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
5,268,326✔
2318
  int32_t              code = 0;
5,268,326✔
2319
  int32_t              lino = 0;
5,268,326✔
2320
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
5,268,326✔
2321
  SStreamRunnerTask*   pTask = pInput->pTask;
5,268,326✔
2322
  if (!pInserter || !pInserter->pParam || !pInserter->pParam->streamInserterParam) {
5,269,129✔
UNCOV
2323
    ST_TASK_ELOG("putStreamDataBlock invalid param, pInserter: %p, pParam:%p", pInserter,
×
2324
                 pInserter ? pInserter->pParam : NULL);
2325
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2326
  }
2327
  if (!pInserter->explain) {
5,269,129✔
2328
    code = TSDB_CODE_SUCCESS;
5,268,776✔
2329
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
10,537,475✔
2330
      return terrno;
×
2331
    }
2332
    void*       pMsg = NULL;
5,268,699✔
2333
    int32_t     msgLen = 0;
5,268,699✔
2334
    SVgroupInfo vgInfo = {0};
5,268,699✔
2335

2336
    code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
5,268,699✔
2337
    QUERY_CHECK_CODE(code, lino, _return);
5,268,724✔
2338

2339
    code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
5,269,129✔
2340
                             pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
5,268,724✔
2341
    QUERY_CHECK_CODE(code, lino, _return);
5,269,129✔
2342

2343
    code = tsem_wait(&pInserter->ready);
5,269,129✔
2344
    QUERY_CHECK_CODE(code, lino, _return);
5,269,129✔
2345

2346
    if (pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
5,269,129✔
2347
      pInput->pStreamDataInserterInfo->isAutoCreateTable = false;
30,345✔
2348
      code = resetInserterTbVersion(pInserter, pInput);
30,345✔
2349
      QUERY_CHECK_CODE(code, lino, _return);
30,345✔
2350

2351
      code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
30,345✔
2352
      QUERY_CHECK_CODE(code, lino, _return);
30,345✔
2353

2354
      code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
30,345✔
2355
                               pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
30,345✔
2356
      QUERY_CHECK_CODE(code, lino, _return);
30,345✔
2357

2358
      code = tsem_wait(&pInserter->ready);
30,345✔
2359
      QUERY_CHECK_CODE(code, lino, _return);
30,345✔
2360
    }
2361

2362
    if (pInput->pStreamDataInserterInfo->isAutoCreateTable &&
5,269,129✔
2363
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
421,186✔
2364
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
1,944✔
2365
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
1,944✔
2366
                   ", tbName:%s. so reset dbVgInfo and try again",
2367
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2368
      return putStreamDataBlock(pHandle, pInput, pContinue);
1,944✔
2369
    }
2370

2371
    if ((pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_NOT_EXIST &&
5,267,185✔
2372
         !pInput->pStreamDataInserterInfo->isAutoCreateTable) ||
1,183✔
2373
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
5,266,002✔
2374
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
1,183✔
2375
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
1,183✔
2376
                   ", tbName:%s. so reset dbVgInfo",
2377
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2378
      code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
1,183✔
2379
      QUERY_CHECK_CODE(code, lino, _return);
1,183✔
2380
    }
2381

2382
    if (pInserter->submitRes.code) {
5,266,002✔
2383
      code = pInserter->submitRes.code;
4,152✔
2384
      ST_TASK_ELOG("submitRes err:%s, code:%0x", tstrerror(pInserter->submitRes.code), pInserter->submitRes.code);
4,152✔
2385
      QUERY_CHECK_CODE(code, lino, _return);
4,152✔
2386
    }
2387

2388
    *pContinue = true;
5,261,850✔
2389

2390
  _return:
5,267,185✔
2391
    taosArrayClear(pInserter->pDataBlocks);
5,267,185✔
2392
    if (code == TSDB_CODE_STREAM_NO_DATA) {
5,267,185✔
2393
      ST_TASK_DLOG("putStreamDataBlock, no valid data to insert, skip this block, groupID:%" PRId64,
×
2394
                   pInput->pStreamDataInserterInfo->groupId);
2395
      code = TSDB_CODE_SUCCESS;
×
2396
    } else if (code) {
5,267,185✔
2397
      ST_TASK_ELOG("submitRes err:%s, code:%0x lino:%d", tstrerror(code), code, lino);
5,335✔
2398
      return code;
5,335✔
2399
    }
2400
    return code;
5,261,850✔
2401
  }
2402
  return TSDB_CODE_SUCCESS;
×
2403
}
2404

2405
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
59,567✔
2406
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
59,567✔
2407
  (void)taosThreadMutexLock(&pInserter->mutex);
59,567✔
2408
  pInserter->queryEnd = true;
59,567✔
2409
  pInserter->useconds = useconds;
59,567✔
2410
  (void)taosThreadMutexUnlock(&pInserter->mutex);
59,567✔
2411
}
59,567✔
2412

2413
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
59,567✔
2414
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
59,567✔
2415
  *pLen = pDispatcher->submitRes.affectedRows;
59,567✔
2416
  qDebug("got total affectedRows %" PRId64, *pLen);
59,567✔
2417
}
59,567✔
2418

2419
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
391,761✔
2420
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
391,761✔
2421
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
391,761✔
2422
  taosArrayDestroy(pInserter->pDataBlocks);
391,761✔
2423
  taosMemoryFree(pInserter->pSchema);
391,761✔
2424
  if (pInserter->pParam->streamInserterParam) {
391,761✔
2425
    destroyStreamInserterParam(pInserter->pParam->streamInserterParam);
323,626✔
2426
    taosMemoryFree(pInserter->pParam->readHandle); // only for stream
323,626✔
2427
  }
2428
  taosMemoryFree(pInserter->pParam);
391,761✔
2429
  taosHashCleanup(pInserter->pCols);
391,761✔
2430
  nodesDestroyNode((SNode*)pInserter->pNode);
391,761✔
2431
  pInserter->pNode = NULL;
391,761✔
2432

2433
  (void)taosThreadMutexDestroy(&pInserter->mutex);
391,761✔
2434

2435
  taosMemoryFree(pInserter->pManager);
391,761✔
2436

2437
  if (pInserter->dbVgInfoMap) {
391,761✔
2438
    taosHashSetFreeFp(pInserter->dbVgInfoMap, freeUseDbOutput_tmp);
2,544✔
2439
    taosHashCleanup(pInserter->dbVgInfoMap);
2,544✔
2440
  }
2441

2442
  if (pInserter->pTagSchema) {
391,761✔
2443
    taosMemoryFreeClear(pInserter->pTagSchema->pSchema);
3,180✔
2444
    taosMemoryFree(pInserter->pTagSchema);
3,180✔
2445
  }
2446

2447
  return TSDB_CODE_SUCCESS;
391,761✔
2448
}
2449

2450
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
2451
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
×
2452

2453
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
2454
  return TSDB_CODE_SUCCESS;
×
2455
}
2456

2457
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
67,499✔
2458
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
67,499✔
2459

2460
  *pFlags = atomic_load_64(&pDispatcher->flags);
67,499✔
2461
  return TSDB_CODE_SUCCESS;
67,499✔
2462
}
2463

2464
int32_t createDataInserter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
68,135✔
2465
                           void* pParam) {
2466
  SDataSinkNode*       pDataSink = *ppDataSink;
68,135✔
2467
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
68,135✔
2468
  if (NULL == inserter) {
68,135✔
2469
    taosMemoryFree(pParam);
×
2470
    goto _return;
×
2471
  }
2472

2473
  SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
68,135✔
2474
  inserter->sink.fPut = putDataBlock;
68,135✔
2475
  inserter->sink.fEndPut = endPut;
68,135✔
2476
  inserter->sink.fGetLen = getDataLength;
68,135✔
2477
  inserter->sink.fGetData = NULL;
68,135✔
2478
  inserter->sink.fDestroy = destroyDataSinker;
68,135✔
2479
  inserter->sink.fGetCacheSize = getCacheSize;
68,135✔
2480
  inserter->sink.fGetFlags = getSinkFlags;
68,135✔
2481
  inserter->pManager = pManager;
68,135✔
2482
  inserter->pNode = pInserterNode;
68,135✔
2483
  inserter->pParam = pParam;
68,135✔
2484
  inserter->status = DS_BUF_EMPTY;
68,135✔
2485
  inserter->queryEnd = false;
68,135✔
2486
  inserter->explain = pInserterNode->explain;
68,135✔
2487
  *ppDataSink = NULL;
68,135✔
2488

2489
  int64_t suid = 0;
68,135✔
2490
  int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
68,135✔
2491
                                                       &inserter->pSchema, &suid, &inserter->pTagSchema);
2492
  if (code) {
68,135✔
2493
    terrno = code;
×
2494
    goto _return;
×
2495
  }
2496

2497
  pManager->pAPI->metaFn.getBasicInfo(inserter->pParam->readHandle->vnode, &inserter->dbFName, NULL, NULL, NULL);
68,135✔
2498

2499
  if (pInserterNode->tableType == TSDB_SUPER_TABLE) {
68,135✔
2500
    inserter->isStbInserter = true;
3,180✔
2501
  }
2502

2503
  if (pInserterNode->stableId != suid) {
68,135✔
2504
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
×
2505
    goto _return;
×
2506
  }
2507

2508
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
68,135✔
2509
  if (NULL == inserter->pDataBlocks) {
68,135✔
2510
    goto _return;
×
2511
  }
2512
  QRY_ERR_JRET(taosThreadMutexInit(&inserter->mutex, NULL));
68,135✔
2513

2514
  inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;
68,135✔
2515

2516
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
68,135✔
2517
                                 false, HASH_NO_LOCK);
2518
  if (NULL == inserter->pCols) {
68,135✔
2519
    goto _return;
×
2520
  }
2521

2522
  SNode*  pNode = NULL;
68,135✔
2523
  int32_t i = 0;
68,135✔
2524
  bool    foundTbname = false;
68,135✔
2525
  FOREACH(pNode, pInserterNode->pCols) {
285,492✔
2526
    if (pNode->type == QUERY_NODE_FUNCTION && ((SFunctionNode*)pNode)->funcType == FUNCTION_TYPE_TBNAME) {
217,357✔
2527
      int16_t colId = 0;
2,544✔
2528
      int16_t slotId = 0;
2,544✔
2529
      QRY_ERR_JRET(taosHashPut(inserter->pCols, &colId, sizeof(colId), &slotId, sizeof(slotId)));
2,544✔
2530
      foundTbname = true;
2,544✔
2531
      continue;
2,544✔
2532
    }
2533
    SColumnNode* pCol = (SColumnNode*)pNode;
214,813✔
2534
    QRY_ERR_JRET(taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId)));
214,813✔
2535
    if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) {
214,813✔
2536
      inserter->fullOrderColList = false;
1,272✔
2537
    }
2538
    ++i;
214,813✔
2539
  }
2540

2541
  if (inserter->isStbInserter && !foundTbname) {
68,135✔
2542
    QRY_ERR_JRET(TSDB_CODE_PAR_TBNAME_ERROR);
636✔
2543
  }
2544

2545
  QRY_ERR_JRET(tsem_init(&inserter->ready, 0, 0));
67,499✔
2546

2547
  inserter->dbVgInfoMap = NULL;
67,499✔
2548

2549
  *pHandle = inserter;
67,499✔
2550
  return TSDB_CODE_SUCCESS;
67,499✔
2551

2552
_return:
636✔
2553

2554
  if (inserter) {
636✔
2555
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
636✔
2556
    taosMemoryFree(inserter);
636✔
2557
  } else {
2558
    taosMemoryFree(pManager);
×
2559
  }
2560

2561
  nodesDestroyNode((SNode*)*ppDataSink);
636✔
2562
  *ppDataSink = NULL;
636✔
2563

2564
  return terrno;
636✔
2565
}
2566

2567
                           
2568
static TdThreadOnce g_dbVgInfoMgrInit = PTHREAD_ONCE_INIT;
2569

2570
SDBVgInfoMgr g_dbVgInfoMgr = {0};
2571
                           
2572
void dbVgInfoMgrInitOnce() {
27,532✔
2573
  g_dbVgInfoMgr.dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
27,532✔
2574
  if (g_dbVgInfoMgr.dbVgInfoMap == NULL) {
27,532✔
2575
    stError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
2576
    return;
×
2577
  }
2578

2579
  taosHashSetFreeFp(g_dbVgInfoMgr.dbVgInfoMap, freeUseDbOutput_tmp);
27,532✔
2580
}
2581

2582

2583

2584
int32_t createStreamDataInserter(SDataSinkManager* pManager, DataSinkHandle* pHandle, void* pParam) {
323,626✔
2585
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
323,626✔
2586

2587
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
323,626✔
2588
  TSDB_CHECK_NULL(g_dbVgInfoMgr.dbVgInfoMap, code, lino, _exit, terrno);
323,626✔
2589

2590
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
323,626✔
2591
  TSDB_CHECK_NULL(inserter, code, lino, _exit, terrno);
323,626✔
2592

2593
  inserter->sink.fPut = putStreamDataBlock;
323,626✔
2594
  inserter->sink.fEndPut = endPut;
323,626✔
2595
  inserter->sink.fGetLen = getDataLength;
323,626✔
2596
  inserter->sink.fGetData = NULL;
323,626✔
2597
  inserter->sink.fDestroy = destroyDataSinker;
323,626✔
2598
  inserter->sink.fGetCacheSize = getCacheSize;
323,626✔
2599
  inserter->sink.fGetFlags = getSinkFlags;
323,626✔
2600
  inserter->pManager = pManager;
323,273✔
2601
  inserter->pNode = NULL;
323,626✔
2602
  inserter->pParam = pParam;
323,626✔
2603
  inserter->status = DS_BUF_EMPTY;
323,626✔
2604
  inserter->queryEnd = false;
323,273✔
2605
  inserter->explain = false;
323,273✔
2606

2607
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
323,273✔
2608
  TSDB_CHECK_NULL(inserter->pDataBlocks, code, lino, _exit, terrno);
323,626✔
2609
  
2610
  TAOS_CHECK_EXIT(taosThreadMutexInit(&inserter->mutex, NULL));
323,626✔
2611
  TAOS_CHECK_EXIT(tsem_init(&inserter->ready, 0, 0));
323,626✔
2612

2613
  inserter->dbVgInfoMap = NULL;
323,626✔
2614

2615
  *pHandle = inserter;
323,626✔
2616
  return TSDB_CODE_SUCCESS;
323,626✔
2617

2618
_exit:
×
2619

2620
  if (inserter) {
×
2621
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
×
2622
    taosMemoryFree(inserter);
×
2623
  } else {
2624
    taosMemoryFree(pManager);
×
2625
  }
2626

2627
  if (code) {
×
2628
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2629
  }
2630

2631
  return code;
×
2632
}
2633

2634
int32_t getDbVgInfoByTbName(void* clientRpc, const char* dbFName, SDBVgInfo** dbVgInfo) {
5,301,420✔
2635
  int32_t       code = TSDB_CODE_SUCCESS;
5,301,420✔
2636
  int32_t       line = 0;
5,301,420✔
2637
  SUseDbOutput* output = NULL;
5,301,420✔
2638

2639
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
5,301,850✔
2640

2641
  if (find == NULL) {
5,301,755✔
2642
    output = taosMemoryCalloc(1, sizeof(SUseDbOutput));
65,271✔
2643
    if (output == NULL) {
65,366✔
2644
      return TSDB_CODE_OUT_OF_MEMORY;
×
2645
    }
2646

2647
    code = buildDbVgInfoMap(clientRpc, dbFName, output);
65,366✔
2648
    QUERY_CHECK_CODE(code, line, _return);
65,366✔
2649

2650
    code = taosHashPut(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
65,366✔
2651
    if (code == TSDB_CODE_DUP_KEY) {
65,366✔
2652
      code = TSDB_CODE_SUCCESS;
6,584✔
2653
      // another thread has put the same dbFName, so we need to free the output
2654
      freeUseDbOutput_tmp(&output);
6,584✔
2655
      find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
6,584✔
2656
      if (find == NULL) {
6,584✔
2657
        QUERY_CHECK_CODE(code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, line, _return);
×
2658
      }
2659
      output = *find;
6,584✔
2660
    }
2661
    QUERY_CHECK_CODE(code, line, _return);
65,366✔
2662
  } else {
2663
    output = *find;
5,236,484✔
2664
  }
2665

2666
  *dbVgInfo = output->dbVgroup;
5,301,850✔
2667
  return code;
5,301,850✔
2668

2669
_return:
×
2670
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2671
  freeUseDbOutput_tmp(&output);
×
2672
  return code;
×
2673
}
2674

2675
int32_t getDbVgInfoForExec(void* clientRpc, const char* dbFName, const char* tbName, SVgroupInfo* pVgInfo) {
5,301,420✔
2676
  SDBVgInfo* dbInfo = NULL;
5,301,420✔
2677
  int32_t code = 0, lino = 0;
5,301,850✔
2678
  char tbFullName[TSDB_TABLE_FNAME_LEN];
5,284,788✔
2679
  snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbFName, tbName);
5,301,850✔
2680
  
2681
  taosRLockLatch(&g_dbVgInfoMgr.lock);
5,301,850✔
2682
  
2683
  TAOS_CHECK_EXIT(getDbVgInfoByTbName(clientRpc, dbFName, &dbInfo));
5,301,850✔
2684

2685
  TAOS_CHECK_EXIT(inserterGetVgInfo(dbInfo, tbFullName, pVgInfo));
5,301,850✔
2686

2687
_exit:
5,301,447✔
2688

2689
  taosRUnLockLatch(&g_dbVgInfoMgr.lock);
5,301,850✔
2690

2691
  if (code) {
5,301,850✔
2692
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2693
  }
2694

2695
  return code;
5,301,850✔
2696
}
2697

2698
void rmDbVgInfoFromCache(const char* dbFName) {
3,127✔
2699
  taosWLockLatch(&g_dbVgInfoMgr.lock);
3,127✔
2700

2701
  TAOS_UNUSED(taosHashRemove(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName)));
3,127✔
2702

2703
  taosWUnLockLatch(&g_dbVgInfoMgr.lock);
3,127✔
2704
}
3,127✔
2705

2706
static int32_t dropTableReqToMsg(int32_t vgId, SVDropTbBatchReq* pReq, void** pData, int32_t* pLen) {
432✔
2707
  int32_t code = TSDB_CODE_SUCCESS;
432✔
2708
  int32_t len = 0;
432✔
2709
  void*   pBuf = NULL;
432✔
2710
  tEncodeSize(tEncodeSVDropTbBatchReq, pReq, len, code);
432✔
2711
  if (TSDB_CODE_SUCCESS == code) {
432✔
2712
    SEncoder encoder;
432✔
2713
    len += sizeof(SMsgHead);
432✔
2714
    pBuf = taosMemoryMalloc(len);
432✔
2715
    if (NULL == pBuf) {
432✔
2716
      return terrno;
×
2717
    }
2718
    ((SDropTbDataMsg*)pBuf)->header.vgId = htonl(vgId);
432✔
2719
    ((SDropTbDataMsg*)pBuf)->header.contLen = htonl(len);
432✔
2720
    //((SDropTbDataMsg*)pBuf)->pData = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
2721
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
432✔
2722
    code = tEncodeSVDropTbBatchReq(&encoder, pReq);
432✔
2723
    tEncoderClear(&encoder);
432✔
2724
  }
2725

2726
  if (TSDB_CODE_SUCCESS == code) {
432✔
2727
    *pData = pBuf;
432✔
2728
    *pLen = len;
432✔
2729
  } else {
2730
    taosMemoryFree(pBuf);
×
2731
  }
2732

2733
  return code;
432✔
2734
}
2735

2736
int32_t dropTbCallback(void* param, SDataBuf* pMsg, int32_t code) {
432✔
2737
  SDropTbCtx* pCtx = (SDropTbCtx*)param;
432✔
2738
  if (code) {
432✔
2739
    stError("dropTbCallback, code:%d, stream:%" PRId64 " gid:%" PRId64, code, pCtx->req->streamId, pCtx->req->gid);
×
2740
  }
2741
  pCtx->code = code;
432✔
2742
  code = tsem_post(&pCtx->ready);
432✔
2743
  taosMemoryFree(pMsg->pData);
432✔
2744

2745
  return TSDB_CODE_SUCCESS;
432✔
2746
}
2747

2748
static int32_t sendDropTbRequest(SDropTbCtx* ctx, void* pMsg, int32_t msgLen, void* pTransporter, SEpSet* pEpset) {
432✔
2749
  // send the fetch remote task result reques
2750
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
432✔
2751
  if (NULL == pMsgSendInfo) {
432✔
2752
    return terrno;
×
2753
  }
2754

2755
  pMsgSendInfo->param = ctx;
432✔
2756
  pMsgSendInfo->paramFreeFp = NULL;
432✔
2757
  pMsgSendInfo->msgInfo.pData = pMsg;
432✔
2758
  pMsgSendInfo->msgInfo.len = msgLen;
432✔
2759
  pMsgSendInfo->msgType = TDMT_VND_SNODE_DROP_TABLE;
432✔
2760
  pMsgSendInfo->fp = dropTbCallback;
432✔
2761

2762
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
432✔
2763
}
2764

2765
int32_t doDropStreamTable(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq) {
432✔
2766
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
432✔
2767
  int32_t                  code = 0;
432✔
2768
  int32_t                  lino = 0;
432✔
2769
  SVDropTbBatchReq         req = {.nReqs = 1};
432✔
2770
  SVDropTbReq*             pDropReq = NULL;
432✔
2771
  int32_t                  msgLen = 0;
432✔
2772
  tsem_t*                  pSem = NULL;
432✔
2773
  SDropTbDataMsg*          pMsg = NULL;
432✔
2774

2775
  SInsertTableInfo** ppTbInfo = NULL;
432✔
2776
  int32_t            vgId = 0;
432✔
2777

2778
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
432✔
2779
  if (!req.pArray) return terrno;
432✔
2780

2781
  pDropReq = taosArrayGet(req.pArray, 0);
432✔
2782

2783
  code = getStreamInsertTableInfo(pReq->streamId, pReq->gid, &ppTbInfo);
432✔
2784
  if (TSDB_CODE_SUCCESS == code) {
432✔
2785
    pDropReq->name = taosStrdup((*ppTbInfo)->tbname);
432✔
2786
    pDropReq->suid = (*ppTbInfo)->uid;
432✔
2787
    pDropReq->uid = (*ppTbInfo)->uid;
432✔
2788
    pDropReq->igNotExists = true;
432✔
2789
    vgId = (*ppTbInfo)->vgid;
432✔
2790

2791
    int64_t key[2] = {pReq->streamId, pReq->gid};
432✔
2792
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
432✔
2793
  } else {
2794
    code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
2795
  }
2796
  QUERY_CHECK_CODE(code, lino, _end);
432✔
2797

2798
  code = dropTableReqToMsg(vgId, &req, (void**)&pMsg, &msgLen);
432✔
2799
  QUERY_CHECK_CODE(code, lino, _end);
432✔
2800

2801
  SVgroupInfo vgInfo = {0};
432✔
2802
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
432✔
2803
  QUERY_CHECK_CODE(code, lino, _end);
432✔
2804

2805
  SDropTbCtx ctx = {.req = pReq};
432✔
2806
  code = tsem_init(&ctx.ready, 0, 0);
432✔
2807
  QUERY_CHECK_CODE(code, lino, _end);
432✔
2808
  pSem = &ctx.ready;
432✔
2809

2810
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
432✔
2811
  QUERY_CHECK_CODE(code, lino, _end);
432✔
2812
  pMsg = NULL;  // now owned by sendDropTbRequest
432✔
2813

2814
  code = tsem_wait(&ctx.ready);
432✔
2815
  code = ctx.code;
432✔
2816
  stDebug("doDropStreamTable,  code:0x%" PRIx32 " req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
432✔
2817
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2818

2819
_end:
432✔
2820
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND) {
432✔
2821
    stError("doDropStreamTable, code:0x%" PRIx32 ", streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2822
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2823
    if (pMsg) {
×
2824
      taosMemoryFreeClear(pMsg);
×
2825
    }
2826
  }
2827
  if (pSem) tsem_destroy(pSem);
432✔
2828
  if (pDropReq && pDropReq->name) taosMemoryFreeClear(pDropReq->name);
432✔
2829
  if (ppTbInfo) releaseStreamInsertTableInfo(ppTbInfo);
432✔
2830
  taosArrayDestroy(req.pArray);
432✔
2831

2832
  return code;
432✔
2833
}
2834

2835
int32_t doDropStreamTableByTbName(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
2836
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
×
2837
  int32_t                  code = 0;
×
2838
  int32_t                  lino = 0;
×
2839
  SVDropTbBatchReq         req = {.nReqs = 1};
×
2840
  SVDropTbReq*             pDropReq = NULL;
×
2841
  int32_t                  msgLen = 0;
×
2842
  tsem_t*                  pSem = NULL;
×
2843
  SDropTbDataMsg*          pMsg = NULL;
×
2844

2845
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
×
2846

2847
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
×
2848
  if (!req.pArray) return terrno;
×
2849

2850
  pDropReq = taosArrayGet(req.pArray, 0);
×
2851

2852
  pDropReq->name = tbName;
×
2853
  pDropReq->igNotExists = true;
×
2854

2855
  int64_t key[2] = {pReq->streamId, pReq->gid};
×
2856
  TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
×
2857

2858
  SVgroupInfo vgInfo = {0};
×
2859
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
×
2860
  QUERY_CHECK_CODE(code, lino, _end);
×
2861

2862
  code = dropTableReqToMsg(vgInfo.vgId, &req, (void**)&pMsg, &msgLen);
×
2863
  QUERY_CHECK_CODE(code, lino, _end);
×
2864

2865
  SDropTbCtx ctx = {.req = pReq};
×
2866
  code = tsem_init(&ctx.ready, 0, 0);
×
2867
  QUERY_CHECK_CODE(code, lino, _end);
×
2868
  pSem = &ctx.ready;
×
2869

2870
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
×
2871
  QUERY_CHECK_CODE(code, lino, _end);
×
2872
  pMsg = NULL;  // now owned by sendDropTbRequest
×
2873

2874
  code = tsem_wait(&ctx.ready);
×
2875
  code = ctx.code;
×
2876
  stDebug("doDropStreamTableByTbName,  code:%d req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
×
2877
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2878

2879
_end:
×
2880
  if (code != TSDB_CODE_SUCCESS) {
×
2881
    stError("doDropStreamTableByTbName, code:%d, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2882
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2883
    if (pMsg) {
×
2884
      taosMemoryFreeClear(pMsg);
×
2885
    }
2886
  }
2887
  if (pSem) tsem_destroy(pSem);
×
2888
  taosArrayDestroy(req.pArray);
×
2889

2890
  return code;
×
2891
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc