• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4909

30 Dec 2025 10:52AM UTC coverage: 65.542% (+0.2%) from 65.386%
#4909

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

857 existing lines in 113 files now uncovered.

193924 of 295877 relevant lines covered (65.54%)

120594206.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.66
/source/libs/executor/src/dataInserter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdint.h>
17
#include <stdlib.h>
18
#include <string.h>
19
#include "dataSinkInt.h"
20
#include "dataSinkMgt.h"
21
#include "executor.h"
22
#include "executorInt.h"
23
#include "functionMgt.h"
24
#include "libs/new-stream/stream.h"
25
#include "osAtomic.h"
26
#include "osMemPool.h"
27
#include "osMemory.h"
28
#include "osSemaphore.h"
29
#include "planner.h"
30
#include "query.h"
31
#include "querytask.h"
32
#include "storageapi.h"
33
#include "taoserror.h"
34
#include "tarray.h"
35
#include "tcompression.h"
36
#include "tdatablock.h"
37
#include "tdataformat.h"
38
#include "tglobal.h"
39
#include "thash.h"
40
#include "tmsg.h"
41
#include "tqueue.h"
42

43
extern SDataSinkStat gDataSinkStat;
44
SHashObj*            gStreamGrpTableHash = NULL;
45
typedef struct SSubmitRes {
46
  int64_t      affectedRows;
47
  int32_t      code;
48
  SSubmitRsp2* pRsp;
49
} SSubmitRes;
50

51
typedef struct SSubmitTbDataMsg {
52
  int32_t vgId;
53
  int32_t len;
54
  void*   pData;
55
} SSubmitTbDataMsg;
56

57
typedef struct SSubmitTbDataSendInfo {
58
  SSubmitTbDataMsg msg;
59
  SEpSet epSet;
60
} SSubmitTbDataSendInfo;
61

62
typedef struct SSubmitReqSendInfo {
63
  SSubmitReq2* msg;
64
  SEpSet epSet;
65
} SSubmitReqSendInfo;
66

67
static void destroySSubmitTbDataMsg(void* p) {
×
68
  if (p == NULL) return;
×
69
  SSubmitTbDataMsg* pVg = p;
×
70
  taosMemoryFree(pVg->pData);
×
71
  taosMemoryFree(pVg);
×
72
}
73

74
static void destroySSubmitTbDataSendInfo(void* p) {
×
75
  if (p == NULL) return;
×
76
  SSubmitTbDataSendInfo* pSendInfo = p;
×
77
  taosMemoryFree(pSendInfo->msg.pData);
×
78
  taosMemoryFree(pSendInfo);
×
79
}
80

81
typedef struct SDataInserterHandle {
82
  SDataSinkHandle     sink;
83
  SDataSinkManager*   pManager;
84
  STSchema*           pSchema;
85
  SQueryInserterNode* pNode;
86
  SSubmitRes          submitRes;
87
  SInserterParam*     pParam;
88
  SArray*             pDataBlocks;
89
  SHashObj*           pCols;
90
  int32_t             status;
91
  bool                queryEnd;
92
  bool                fullOrderColList;
93
  uint64_t            useconds;
94
  uint64_t            cachedSize;
95
  uint64_t            flags;
96
  TdThreadMutex       mutex;
97
  tsem_t              ready;
98
  bool                explain;
99
  bool                isStbInserter;
100
  SSchemaWrapper*     pTagSchema;
101
  const char*         dbFName;
102
  SHashObj*           dbVgInfoMap;
103
  SUseDbRsp*          pRsp;
104
} SDataInserterHandle;
105

106
typedef struct SSubmitRspParam {
107
  SDataInserterHandle* pInserter;
108
  void*                putParam;
109
} SSubmitRspParam;
110

111
typedef struct SBuildInsertDataInfo {
112
  SSubmitTbData  pTbData;
113
  bool           isFirstBlock;
114
  bool           isLastBlock;
115
  int64_t        lastTs;
116
  bool           needSortMerge;
117
} SBuildInsertDataInfo;
118

119
typedef struct SDropTbCtx {
120
  SSTriggerDropRequest* req;
121
  tsem_t                ready;
122
  int32_t               code;
123
} SDropTbCtx;
124
typedef struct SDropTbDataMsg {
125
  SMsgHead header;
126
  void*    pData;
127
} SDropTbDataMsg;
128

129
typedef struct SRunnerDropTableInfo {
130
  SSTriggerDropRequest* pReq;
131
  int32_t               code;
132
} SRunnerDropTableInfo;
133

134
static int32_t initInsertProcessInfo(SBuildInsertDataInfo* pBuildInsertDataInfo, int32_t rows) {
2,592,403✔
135
  pBuildInsertDataInfo->isLastBlock = false;
2,592,403✔
136
  pBuildInsertDataInfo->lastTs = TSKEY_MIN;
2,592,577✔
137
  pBuildInsertDataInfo->isFirstBlock = true;
2,592,577✔
138
  pBuildInsertDataInfo->needSortMerge = false;
2,592,577✔
139

140
  if (!(pBuildInsertDataInfo->pTbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
2,592,577✔
141
    return terrno;
×
142
  }
143

144
  return TSDB_CODE_SUCCESS;
2,592,577✔
145
}
146

147
static void freeCacheTbInfo(void* pp) {
224,684✔
148
  if (pp == NULL || *(SInsertTableInfo**)pp == NULL) {
224,684✔
149
    return;
×
150
  }
151
  SInsertTableInfo* pTbInfo = *(SInsertTableInfo**)pp;
224,684✔
152
  if (pTbInfo->tbname) {
224,684✔
153
    taosMemFree(pTbInfo->tbname);
224,684✔
154
    pTbInfo->tbname = NULL;
224,684✔
155
  }
156
  if (pTbInfo->pSchema) {
224,684✔
157
    tDestroyTSchema(pTbInfo->pSchema);
224,684✔
158
    pTbInfo->pSchema = NULL;
224,684✔
159
  }
160
  taosMemoryFree(pTbInfo);
224,684✔
161
}
162

163
int32_t initInserterGrpInfo() {
552,733✔
164
  gStreamGrpTableHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
552,733✔
165
  if (NULL == gStreamGrpTableHash) {
552,733✔
166
    qError("failed to create stream group table hash");
×
167
    return terrno;
×
168
  }
169
  taosHashSetFreeFp(gStreamGrpTableHash, freeCacheTbInfo);
552,733✔
170
  return TSDB_CODE_SUCCESS;
552,733✔
171
}
172

173
void destroyInserterGrpInfo() {
552,733✔
174
  static int8_t destoryGrpInfo = 0;
175
  int8_t        flag = atomic_val_compare_exchange_8(&destoryGrpInfo, 0, 1);
552,733✔
176
  if (flag != 0) {
552,733✔
177
    return;
×
178
  }
179
  if (NULL != gStreamGrpTableHash) {
552,733✔
180
    taosHashCleanup(gStreamGrpTableHash);
552,733✔
181
    gStreamGrpTableHash = NULL;
552,733✔
182
  }
183
}
184

185
static int32_t checkResAndResetTableInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* res,
307,539✔
186
                                         bool* pSchemaChaned) {
187
  int32_t code = TSDB_CODE_SUCCESS;
307,539✔
188
  if (!pSubmitRes->pRsp) {
307,539✔
189
    stError("create table response is NULL");
×
190
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
191
  }
192
  if (pSubmitRes->pRsp->aCreateTbRsp->size < 1) {
307,539✔
193
    stError("create table response size is less than 1");
×
194
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
195
  }
196
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
307,539✔
197
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
307,539✔
198
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
199
    return pCreateTbRsp->code;
×
200
  }
201
  if (!pCreateTbRsp->pMeta || pCreateTbRsp->pMeta->tuid == 0) {
307,539✔
202
    stError("create table can not get tuid");
×
203
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
204
  }
205

206
  *pSchemaChaned = false;
307,539✔
207
  res->vgid = pCreateTbRsp->pMeta->vgId;
307,539✔
208
  res->uid = pCreateTbRsp->pMeta->tuid;
307,539✔
209

210
  if (pCreateTbRsp->pMeta->sversion != 0 && res->version != pCreateTbRsp->pMeta->sversion) {
307,539✔
211
    *pSchemaChaned = true;
408✔
212
  }
213

214
  stDebug("inserter callback, uid:%" PRId64 "  vgid: %" PRId64 ", version: %d", res->uid, res->vgid, res->version);
307,539✔
215

216
  return TSDB_CODE_SUCCESS;
307,539✔
217
}
218

219
static int32_t createNewInsertTbInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* pOldInsertTbInfo,
408✔
220
                                     SInsertTableInfo** ppNewInsertTbInfo) {
221
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
408✔
222
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
408✔
223
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
224
    return pCreateTbRsp->code;
×
225
  }
226

227
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
408✔
228
  if (res == NULL) {
408✔
229
    return terrno;
×
230
  }
231
  res->tbname = taosStrdup(pOldInsertTbInfo->tbname);
408✔
232
  if (res->tbname == NULL) {
408✔
233
    taosMemoryFree(res);
×
234
    stError("failed to allocate memory for table name");
×
235
    return terrno;
×
236
  }
237

238
  res->vgid = pCreateTbRsp->pMeta->vgId;
408✔
239

240
  res->uid = pCreateTbRsp->pMeta->tuid;
408✔
241
  res->vgid = pCreateTbRsp->pMeta->vgId;
408✔
242

243
  res->version = pCreateTbRsp->pMeta->sversion;
408✔
244
  res->pSchema = tBuildTSchema(pCreateTbRsp->pMeta->pSchemas, pCreateTbRsp->pMeta->numOfColumns, res->version);
408✔
245
  if (res->pSchema == NULL) {
408✔
246
    stError("failed to build schema for table:%s, uid:%" PRId64 ", vgid:%" PRId64 ", version:%d", res->tbname, res->uid,
×
247
            res->vgid, res->version);
248
    return terrno;
×
249
  }
250
  *ppNewInsertTbInfo = res;
408✔
251
  return TSDB_CODE_SUCCESS;
408✔
252
}
253

254
static int32_t updateInsertGrpTableInfo(SStreamDataInserterInfo* pInserterInfo, const SSubmitRes* pSubmitRes) {
307,539✔
255
  int32_t            code = TSDB_CODE_SUCCESS;
307,539✔
256
  int32_t            lino = 0;
307,539✔
257
  int64_t            key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
307,539✔
258
  SInsertTableInfo** ppTbRes = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
307,539✔
259
  if (NULL == ppTbRes || *ppTbRes == NULL) {
307,539✔
260
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
261
  }
262

263
  bool schemaChanged = false;
307,539✔
264
  code = checkResAndResetTableInfo(pSubmitRes, *ppTbRes, &schemaChanged);
307,539✔
265
  QUERY_CHECK_CODE(code, lino, _exit);
307,539✔
266

267
  if (schemaChanged) {
307,539✔
268
    SInsertTableInfo* pNewInfo = NULL;
408✔
269
    code = createNewInsertTbInfo(pSubmitRes, *ppTbRes, &pNewInfo);
408✔
270
    QUERY_CHECK_CODE(code, lino, _exit);
408✔
271

272
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
408✔
273

274
    code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &pNewInfo, sizeof(SInsertTableInfo*));
408✔
275

276
    if (code == TSDB_CODE_DUP_KEY) {
408✔
277
      freeCacheTbInfo(&pNewInfo);
×
278
      code = TSDB_CODE_SUCCESS;
×
279
      goto _exit;
×
280
    } else if (code != TSDB_CODE_SUCCESS) {
408✔
281
      freeCacheTbInfo(&pNewInfo);
×
282
      stError("failed to put new insert tbInfo for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
283
              pInserterInfo->streamId, pInserterInfo->groupId, code);
284
      QUERY_CHECK_CODE(code, lino, _exit);
×
285
    }
286

287
    stInfo("update table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", uid:%" PRId64 ", vgid:%" PRId64
408✔
288
           ", version:%d",
289
           pInserterInfo->streamId, pInserterInfo->groupId, pNewInfo->uid, pNewInfo->vgid, pNewInfo->version);
290
  }
291
  return TSDB_CODE_SUCCESS;
307,539✔
292

293
_exit:
×
294
  if (code != TSDB_CODE_SUCCESS) {
×
295
    stError("failed to check and reset table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
296
            pInserterInfo->streamId, pInserterInfo->groupId, code);
297
  }
298
  taosHashRelease(gStreamGrpTableHash, ppTbRes);
×
299
  return code;
×
300
}
301

302
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema);
303
static int32_t initTableInfo(SDataInserterHandle* pInserter, SStreamDataInserterInfo* pInserterInfo) {
309,723✔
304
  int32_t           code = TSDB_CODE_SUCCESS;
309,723✔
305
  int32_t           lino = 0;
309,723✔
306
  int64_t           key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
309,723✔
307
  
308
  // Check if key already exists to avoid unnecessary allocation
309
  SInsertTableInfo** ppExisting = taosHashGet(gStreamGrpTableHash, key, sizeof(key));
309,723✔
310
  if (ppExisting != NULL && *ppExisting != NULL) {
309,723✔
311
    return TSDB_CODE_SUCCESS;
85,447✔
312
  }
313

314
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
224,276✔
315
  if (res == NULL) {
224,276✔
316
    return terrno;
×
317
  }
318

319
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
224,276✔
320
  res->uid = 0;
224,276✔
321
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
224,276✔
322
    res->version = 1;
73,036✔
323
  } else {
324
    res->version = pInsertParam->sver;
151,240✔
325
  }
326

327
  res->tbname = taosStrdup(pInserterInfo->tbName);
224,276✔
328
  if (res->tbname == NULL) {
224,276✔
329
    taosMemoryFree(res);
×
330
    stError("failed to allocate memory for table name");
×
331
    return terrno;
×
332
  }
333

334
  code = buildTSchmaFromInserter(pInserter->pParam->streamInserterParam, &res->pSchema);
224,276✔
335
  QUERY_CHECK_CODE(code, lino, _return);
224,077✔
336

337
  code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &res, sizeof(SInsertTableInfo*));
224,077✔
338
  if (code == TSDB_CODE_DUP_KEY) {
224,276✔
339
    freeCacheTbInfo(&res);
×
340
    return TSDB_CODE_SUCCESS;
×
341
  }
342

343
_return:
224,276✔
344
  if (code != TSDB_CODE_SUCCESS) {
224,276✔
345
    stError("failed to build table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
346
            pInserterInfo->streamId, pInserterInfo->groupId, code);
347
    freeCacheTbInfo(&res);
×
348
  }
349
  return code;
224,276✔
350
}
351

352
static bool colsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
25,259✔
353
  SArray* pCreatingFields = pInserterParam->pFields;
25,259✔
354

355
  for (int32_t i = 0; i < pCreatingFields->size; ++i) {
141,153✔
356
    SFieldWithOptions* pField = taosArrayGet(pCreatingFields, i);
115,894✔
357
    if (NULL == pField) {
115,894✔
358
      stError("isSupportedSTableSchema: failed to get field from array");
×
359
      return false;
×
360
    }
361

362
    for (int j = 0; j < pTableMetaRsp->numOfColumns; ++j) {
342,812✔
363
      if (strncmp(pTableMetaRsp->pSchemas[j].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
341,180✔
364
        if (pTableMetaRsp->pSchemas[j].type == pField->type && pTableMetaRsp->pSchemas[j].bytes == pField->bytes) {
114,262✔
365
          break;
366
        } else {
367
          return false;
×
368
        }
369
      }
370
    }
371
  }
372
  return true;
25,259✔
373
}
374

375
static bool TagsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
816✔
376
  SArray* pCreatingTags = pInserterParam->pTagFields;
816✔
377

378
  int32_t            tagIndexOffset = -1;
816✔
379
  SFieldWithOptions* pField = taosArrayGet(pCreatingTags, 0);
816✔
380
  if (NULL == pField) {
816✔
381
    stError("isSupportedSTableSchema: failed to get field from array");
×
382
    return false;
×
383
  }
384
  for (int32_t i = 0; i < pTableMetaRsp->numOfColumns + pTableMetaRsp->numOfTags; ++i) {
816✔
385
    if (strncmp(pTableMetaRsp->pSchemas[i].name, pField->name, TSDB_COL_NAME_LEN) != 0) {
408✔
386
      tagIndexOffset = i;
408✔
387
      break;
408✔
388
    }
389
  }
390
  if (tagIndexOffset == -1) {
816✔
391
    stError("isSupportedSTableSchema: failed to get tag index");
408✔
392
    return false;
408✔
393
  }
394

395
  for (int32_t i = 0; i < pTableMetaRsp->numOfTags; ++i) {
816✔
396
    int32_t            index = i + tagIndexOffset;
408✔
397
    SFieldWithOptions* pField = taosArrayGet(pCreatingTags, i);
408✔
398
    if (NULL == pField) {
408✔
399
      stError("isSupportedSTableSchema: failed to get field from array");
×
400
      return false;
×
401
    }
402

403
    for(int32_t j = 0; j < pTableMetaRsp->numOfTags; ++j) {
816✔
404
      if (strncmp(pTableMetaRsp->pSchemas[index].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
408✔
405
        if (pTableMetaRsp->pSchemas[index].type == pField->type &&
×
406
            pTableMetaRsp->pSchemas[index].bytes == pField->bytes) {
×
407
          break;
408
        } else {
409
          return false;
×
410
        }
411
      }
412
    }
413
  }
414
  return true;
408✔
415
}
416

417
static bool isSupportedSTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
816✔
418
  if (!colsIsSupported(pTableMetaRsp, pInserterParam)) {
816✔
419
    return false;
×
420
  }
421
  if (!TagsIsSupported(pTableMetaRsp, pInserterParam)) {
816✔
422
    return false;
408✔
423
  }
424
  return true;
408✔
425
}
426

427
static bool isSupportedNTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
24,443✔
428
  return colsIsSupported(pTableMetaRsp, pInserterParam);
24,443✔
429
}
430

431
static int32_t checkAndSaveCreateGrpTableInfo(SDataInserterHandle*     pInserthandle,
25,259✔
432
                                              SStreamDataInserterInfo* pInserterInfo) {
433
  int32_t     code = TSDB_CODE_SUCCESS;
25,259✔
434
  SSubmitRes* pSubmitRes = &pInserthandle->submitRes;
25,259✔
435
  int8_t      tbType = pInserthandle->pParam->streamInserterParam->tbType;
25,259✔
436

437
  SVCreateTbRsp*        pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
25,259✔
438
  SSchema*              pExistRow = pCreateTbRsp->pMeta->pSchemas;
25,259✔
439
  SStreamInserterParam* pInserterParam = pInserthandle->pParam->streamInserterParam;
25,259✔
440

441
  if (tbType == TSDB_CHILD_TABLE || tbType == TSDB_SUPER_TABLE) {
25,259✔
442
    if (!isSupportedSTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
816✔
443
      stError("create table failed, schema is not supported");
408✔
444
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
408✔
445
    }
446
  } else if (tbType == TSDB_NORMAL_TABLE) {
24,443✔
447
    if (!isSupportedNTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
24,443✔
448
      stError("create table failed, schema is not supported");
×
449
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
×
450
    }
451
  } else {
452
    stError("checkAndSaveCreateGrpTableInfo failed, tbType:%d is not supported", tbType);
×
453
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
454
  }
455

456
  return updateInsertGrpTableInfo(pInserterInfo, pSubmitRes);
24,851✔
457
}
458

459
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
3,060,981✔
460
  SSubmitRspParam*     pParam = (SSubmitRspParam*)param;
3,060,981✔
461
  SDataInserterHandle* pInserter = pParam->pInserter;
3,060,981✔
462
  int32_t              code2 = 0;
3,060,981✔
463

464
  if (code) {
3,060,981✔
465
    pInserter->submitRes.code = code;
28,933✔
466
  } else {
467
    pInserter->submitRes.code = TSDB_CODE_SUCCESS;
3,032,048✔
468
  }
469
  SDecoder coder = {0};
3,060,981✔
470

471
  if (code == TSDB_CODE_SUCCESS) {
3,060,981✔
472
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
3,032,048✔
473
    if (NULL == pInserter->submitRes.pRsp) {
3,032,048✔
474
      pInserter->submitRes.code = terrno;
×
475
      goto _return;
×
476
    }
477

478
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
3,032,048✔
479
    code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
3,032,048✔
480
    if (code) {
3,032,048✔
481
      tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
482
      taosMemoryFree(pInserter->submitRes.pRsp);
×
483
      pInserter->submitRes.code = code;
×
484
      goto _return;
×
485
    }
486

487
    if (pInserter->submitRes.pRsp->affectedRows > 0) {
3,032,048✔
488
      SArray* pCreateTbList = pInserter->submitRes.pRsp->aCreateTbRsp;
1,100,856✔
489
      int32_t numOfTables = taosArrayGetSize(pCreateTbList);
1,100,856✔
490

491
      for (int32_t i = 0; i < numOfTables; ++i) {
1,382,049✔
492
        SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i);
281,193✔
493
        if (NULL == pRsp) {
281,193✔
494
          pInserter->submitRes.code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
495
          goto _return;
×
496
        }
497
        if (TSDB_CODE_SUCCESS != pRsp->code) {
281,193✔
498
          code = pRsp->code;
×
499
          tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
500
          taosMemoryFree(pInserter->submitRes.pRsp);
×
501
          pInserter->submitRes.code = code;
×
502
          goto _return;
×
503
        }
504
      }
505
    }
506

507
    if (pParam->putParam != NULL && ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) {
3,032,048✔
508
      code2 = updateInsertGrpTableInfo((SStreamDataInserterInfo*)pParam->putParam, &pInserter->submitRes);
282,688✔
509
    }
510

511
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
3,032,048✔
512
    qDebug("submit rsp received, affectedRows:%d, total:%" PRId64, pInserter->submitRes.pRsp->affectedRows,
3,032,048✔
513
           pInserter->submitRes.affectedRows);
514
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
3,032,048✔
515
    taosMemoryFree(pInserter->submitRes.pRsp);
3,032,048✔
516
  } else if ((TSDB_CODE_TDB_TABLE_ALREADY_EXIST == code && pParam->putParam != NULL &&
28,933✔
517
              ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) ||
28,933✔
518
             TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER == code) {
519
    pInserter->submitRes.code = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
25,259✔
520
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
25,259✔
521
    if (NULL == pInserter->submitRes.pRsp) {
25,259✔
522
      code2 = terrno;
×
523
      goto _return;
×
524
    }
525

526
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
25,259✔
527
    code2 = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
25,259✔
528
    if (code2 == TSDB_CODE_SUCCESS) {
25,259✔
529
      code2 = checkAndSaveCreateGrpTableInfo(pInserter, (SStreamDataInserterInfo*)pParam->putParam);
25,259✔
530
    }
531
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
25,259✔
532
    taosMemoryFree(pInserter->submitRes.pRsp);
25,259✔
533
  }
534

535
_return:
3,058,920✔
536

537
  if (code2) {
3,060,981✔
538
    qError("update inserter table info failed, error:%s", tstrerror(code2));
408✔
539
  }
540
  tDecoderClear(&coder);
3,060,981✔
541
  TAOS_UNUSED(tsem_post(&pInserter->ready));
3,060,981✔
542

543
  taosMemoryFree(pMsg->pData);
3,060,981✔
544

545
  return TSDB_CODE_SUCCESS;
3,060,981✔
546
}
547

548
void freeUseDbOutput_tmp(void* ppOutput) {
9,690✔
549
  SUseDbOutput* pOut = *(SUseDbOutput**)ppOutput;
9,690✔
550
  if (NULL == ppOutput) {
9,690✔
551
    return;
×
552
  }
553

554
  if (pOut->dbVgroup) {
9,690✔
555
    freeVgInfo(pOut->dbVgroup);
9,690✔
556
  }
557
  taosMemFree(pOut);
9,690✔
558
  *(SUseDbOutput**)ppOutput = NULL;
9,690✔
559
}
560

561
static int32_t processUseDbRspForInserter(void* param, SDataBuf* pMsg, int32_t code) {
38,660✔
562
  int32_t       lino = 0;
38,660✔
563
  SDBVgInfoReq* pVgInfoReq = (SDBVgInfoReq*)param;
38,660✔
564

565
  if (TSDB_CODE_SUCCESS != code) {
38,660✔
566
    // pInserter->pTaskInfo->code = rpcCvtErrCode(code);
567
    // if (pInserter->pTaskInfo->code != code) {
568
    //   qError("load db info rsp received, error:%s, cvted error:%s", tstrerror(code),
569
    //          tstrerror(pInserter->pTaskInfo->code));
570
    // } else {
571
    //   qError("load db info rsp received, error:%s", tstrerror(code));
572
    // }
573
    goto _return;
×
574
  }
575

576
  pVgInfoReq->pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
38,660✔
577
  QUERY_CHECK_NULL(pVgInfoReq->pRsp, code, lino, _return, terrno);
38,660✔
578

579
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pVgInfoReq->pRsp);
38,660✔
580
  QUERY_CHECK_CODE(code, lino, _return);
38,660✔
581

582
_return:
38,660✔
583
  taosMemoryFreeClear(pMsg->pData);
38,660✔
584
  taosMemoryFreeClear(pMsg->pEpSet);
38,660✔
585
  if (code != 0){
38,660✔
586
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
587
  }
588
  int ret = tsem_post(&pVgInfoReq->ready);
38,660✔
589
  if (ret != 0) {
38,660✔
590
    qError("%s failed code: %d", __func__, ret);
×
591
  }
592
  return code;
38,660✔
593
}
594

595

596
int inserterVgInfoComp(const void* lp, const void* rp) {
49,848✔
597
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
49,848✔
598
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
49,848✔
599
  if (pLeft->hashBegin < pRight->hashBegin) {
49,848✔
600
    return -1;
42,501✔
601
  } else if (pLeft->hashBegin > pRight->hashBegin) {
7,347✔
602
    return 1;
7,347✔
603
  }
604

605
  return 0;
×
606
}
607

608
static int32_t buildDbVgInfoMap(void* clientRpc, const char* dbFName, SUseDbOutput* output) {
38,660✔
609
  int32_t      code = TSDB_CODE_SUCCESS;
38,660✔
610
  int32_t      lino = 0;
38,660✔
611
  char*        buf1 = NULL;
38,660✔
612
  SUseDbReq*   pReq = NULL;
38,660✔
613
  SDBVgInfoReq dbVgInfoReq = {0};
38,660✔
614
  code = tsem_init(&dbVgInfoReq.ready, 0, 0);
38,660✔
615
  if (code != TSDB_CODE_SUCCESS) {
38,660✔
616
    qError("tsem_init failed, error:%s", tstrerror(code));
×
617
    return code;
×
618
  }
619

620
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
38,660✔
621
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
38,660✔
622

623
  tstrncpy(pReq->db, dbFName, TSDB_DB_FNAME_LEN);
38,660✔
624
  QUERY_CHECK_CODE(code, lino, _return);
38,660✔
625

626
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
38,660✔
627
  buf1 = taosMemoryCalloc(1, contLen);
38,425✔
628
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
38,660✔
629

630
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
38,660✔
631
  if (tempRes < 0) {
38,660✔
632
    QUERY_CHECK_CODE(terrno, lino, _return);
×
633
  }
634

635
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
38,660✔
636
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
38,456✔
637

638
  SEpSet pEpSet = {0};
38,456✔
639
  QUERY_CHECK_CODE(getCurrentMnodeEpset(&pEpSet), lino, _return);
38,456✔
640

641
  pMsgSendInfo->param = &dbVgInfoReq;
38,660✔
642
  pMsgSendInfo->msgInfo.pData = buf1;
38,660✔
643
  buf1 = NULL;
38,456✔
644
  pMsgSendInfo->msgInfo.len = contLen;
38,456✔
645
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
38,456✔
646
  pMsgSendInfo->fp = processUseDbRspForInserter;
38,660✔
647
  // pMsgSendInfo->requestId = pTaskInfo->id.queryId;
648

649
  code = asyncSendMsgToServer(clientRpc, &pEpSet, NULL, pMsgSendInfo);
38,221✔
650
  QUERY_CHECK_CODE(code, lino, _return);
38,660✔
651

652
  code = tsem_wait(&dbVgInfoReq.ready);
38,660✔
653
  QUERY_CHECK_CODE(code, lino, _return);
38,660✔
654

655
  code = queryBuildUseDbOutput(output, dbVgInfoReq.pRsp);
38,660✔
656
  QUERY_CHECK_CODE(code, lino, _return);
38,660✔
657

658
  output->dbVgroup->vgArray = taosArrayInit(dbVgInfoReq.pRsp->vgNum, sizeof(SVgroupInfo));
38,660✔
659
  if (NULL == output->dbVgroup->vgArray) {
38,660✔
660
    code = terrno;
×
661
    QUERY_CHECK_CODE(code, lino, _return);
×
662
  }
663

664
  void* pIter = taosHashIterate(output->dbVgroup->vgHash, NULL);
38,660✔
665
  while (pIter) {
99,054✔
666
    if (NULL == taosArrayPush(output->dbVgroup->vgArray, pIter)) {
120,788✔
667
      taosHashCancelIterate(output->dbVgroup->vgHash, pIter);
×
668
      return terrno;
×
669
    }
670

671
    pIter = taosHashIterate(output->dbVgroup->vgHash, pIter);
60,394✔
672
  }
673

674
  taosArraySort(output->dbVgroup->vgArray, inserterVgInfoComp);
38,660✔
675

676
_return:
38,660✔
677

678
  if (code) {
38,660✔
679
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
680
    taosMemoryFree(buf1);
×
681
  }
682
  taosMemoryFree(pReq);
38,660✔
683
  TAOS_UNUSED(tsem_destroy(&dbVgInfoReq.ready));
38,660✔
684
  if (dbVgInfoReq.pRsp) {
38,660✔
685
    tFreeSUsedbRsp(dbVgInfoReq.pRsp);
38,660✔
686
    taosMemoryFreeClear(dbVgInfoReq.pRsp);
38,660✔
687
  }
688
  return code;
38,660✔
689
}
690

691
int32_t inserterBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
216,539✔
692
                                 SArray* tagName, uint8_t tagNum, int32_t ttl) {
693
  pTbReq->type = TD_CHILD_TABLE;
216,539✔
694
  pTbReq->ctb.pTag = (uint8_t*)pTag;
216,774✔
695
  pTbReq->name = taosStrdup(tname);
216,570✔
696
  if (!pTbReq->name) return terrno;
216,774✔
697
  pTbReq->ctb.suid = suid;
216,774✔
698
  pTbReq->ctb.tagNum = tagNum;
216,774✔
699
  if (sname) {
216,774✔
700
    pTbReq->ctb.stbName = taosStrdup(sname);
216,774✔
701
    if (!pTbReq->ctb.stbName) {
216,774✔
702
      taosMemoryFree(pTbReq->name);
×
703
      return terrno;
×
704
    }
705
  }
706
  pTbReq->ctb.tagName = tagName;
216,774✔
707
  pTbReq->ttl = ttl;
216,774✔
708
  pTbReq->commentLen = -1;
216,774✔
709

710
  return TSDB_CODE_SUCCESS;
216,774✔
711
}
712

713
int32_t inserterHashValueComp(void const* lp, void const* rp) {
3,662,642✔
714
  uint32_t*    key = (uint32_t*)lp;
3,662,642✔
715
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
3,662,642✔
716

717
  if (*key < pVg->hashBegin) {
3,662,642✔
718
    return -1;
857,261✔
719
  } else if (*key > pVg->hashEnd) {
2,804,978✔
720
    return 1;
206,697✔
721
  }
722

723
  return 0;
2,598,716✔
724
}
725

726

727
int32_t inserterGetVgInfo(SDBVgInfo* dbInfo, char* tbName, SVgroupInfo* pVgInfo) {
2,598,920✔
728
  if (NULL == dbInfo) {
2,598,920✔
729
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
730
  }
731

732
  if (NULL == dbInfo->vgArray) {
2,598,920✔
733
    qError("empty db vgArray, hashSize:%d", taosHashGetSize(dbInfo->vgHash));
×
734
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
735
  }
736

737
  uint32_t hashValue =
2,598,920✔
738
      taosGetTbHashVal(tbName, (int32_t)strlen(tbName), dbInfo->hashMethod, dbInfo->hashPrefix, dbInfo->hashSuffix);
2,598,920✔
739
  SVgroupInfo* vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, inserterHashValueComp, TD_EQ);
2,598,716✔
740
  if (NULL == vgInfo) {
2,598,684✔
741
    qError("no hash range found for hash value [%u], table:%s, numOfVgId:%d", hashValue, tbName,
×
742
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
743
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
744
  }
745
  
746
  *pVgInfo = *vgInfo;
2,598,684✔
747
  qDebug("insert get vgInfo, tbName:%s vgId:%d epset(%s:%d)", tbName, pVgInfo->vgId, pVgInfo->epSet.eps[0].fqdn,
2,598,684✔
748
        pVgInfo->epSet.eps[0].port);
749
        
750
  return TSDB_CODE_SUCCESS;
2,598,716✔
751
}
752

753
int32_t inserterGetVgId(SDBVgInfo* dbInfo, char* tbName, int32_t* vgId) {
×
754
  SVgroupInfo vgInfo = {0};
×
755
  int32_t     code = inserterGetVgInfo(dbInfo, tbName, &vgInfo);
×
756
  if (code != TSDB_CODE_SUCCESS) {
×
757
    qError("inserterGetVgId failed, code:%d", code);
×
758
    return code;
×
759
  }
760
  *vgId = vgInfo.vgId;
×
761

762
  return TSDB_CODE_SUCCESS;
×
763
}
764

765
int32_t inserterGetDbVgInfo(SDataInserterHandle* pInserter, const char* dbFName, SDBVgInfo** dbVgInfo) {
2,268✔
766
  int32_t       code = TSDB_CODE_SUCCESS;
2,268✔
767
  int32_t       line = 0;
2,268✔
768
  SUseDbOutput* output = NULL;
2,268✔
769

770
  // QRY_PARAM_CHECK(dbVgInfo);
771
  // QRY_PARAM_CHECK(pInserter);
772
  // QRY_PARAM_CHECK(name);
773

774
  if (pInserter->dbVgInfoMap == NULL) {
2,268✔
775
    pInserter->dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
2,268✔
776
    if (pInserter->dbVgInfoMap == NULL) {
2,268✔
777
      return TSDB_CODE_OUT_OF_MEMORY;
×
778
    }
779
  }
780

781
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(pInserter->dbVgInfoMap, dbFName, strlen(dbFName));
2,268✔
782

783
  if (find == NULL) {
2,268✔
784
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
2,268✔
785
    if (output == NULL) {
2,268✔
786
      return TSDB_CODE_OUT_OF_MEMORY;
×
787
    }
788

789
    code = buildDbVgInfoMap(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName, output);
2,268✔
790
    QUERY_CHECK_CODE(code, line, _return);
2,268✔
791

792
    code = taosHashPut(pInserter->dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
2,268✔
793
    QUERY_CHECK_CODE(code, line, _return);
2,268✔
794
  } else {
795
    output = *find;
×
796
  }
797

798
  *dbVgInfo = output->dbVgroup;
2,268✔
799
  return code;
2,268✔
800

801
_return:
×
802
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
803
  freeUseDbOutput_tmp(&output);
×
804
  return code;
×
805
}
806

807
int32_t getTableVgInfo(SDataInserterHandle* pInserter, const char* dbFName,
2,593,914✔
808
                       const char* tbName, SVgroupInfo* pVgInfo) {
809
  return getDbVgInfoForExec(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName,
2,593,914✔
810
                              tbName, pVgInfo);
811
}
812

813
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* putParam, void* pMsg, int32_t msgLen,
3,060,777✔
814
                                 void* pTransporter, SEpSet* pEpset) {
815
  // send the fetch remote task result reques
816
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,060,777✔
817
  if (NULL == pMsgSendInfo) {
3,060,981✔
818
    taosMemoryFreeClear(pMsg);
×
819
    return terrno;
×
820
  }
821

822
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
3,060,981✔
823
  if (NULL == pParam) {
3,060,981✔
824
    taosMemoryFreeClear(pMsg);
×
825
    taosMemoryFreeClear(pMsgSendInfo);
×
826
    return terrno;
×
827
  }
828
  pParam->pInserter = pInserter;
3,060,981✔
829
  pParam->putParam = putParam;
3,060,981✔
830

831
  pMsgSendInfo->param = pParam;
3,060,981✔
832
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
3,060,981✔
833
  pMsgSendInfo->msgInfo.pData = pMsg;
3,060,981✔
834
  pMsgSendInfo->msgInfo.len = msgLen;
3,060,981✔
835
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
3,060,981✔
836
  pMsgSendInfo->fp = inserterCallback;
3,060,777✔
837

838
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
3,060,981✔
839
}
840

841
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
3,060,782✔
842
  int32_t code = TSDB_CODE_SUCCESS;
3,060,782✔
843
  int32_t len = 0;
3,060,782✔
844
  void*   pBuf = NULL;
3,060,782✔
845
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
3,060,782✔
846
  if (TSDB_CODE_SUCCESS == code) {
3,060,399✔
847
    SEncoder encoder;
3,058,338✔
848
    len += sizeof(SSubmitReq2Msg);
3,060,603✔
849
    pBuf = taosMemoryMalloc(len);
3,060,603✔
850
    if (NULL == pBuf) {
3,060,578✔
851
      return terrno;
×
852
    }
853
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
3,060,578✔
854
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
3,060,578✔
855
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
3,060,200✔
856
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
3,060,777✔
857
    code = tEncodeSubmitReq(&encoder, pReq);
3,060,777✔
858
    tEncoderClear(&encoder);
3,060,981✔
859
  }
860

861
  if (TSDB_CODE_SUCCESS == code) {
3,060,981✔
862
    *pData = pBuf;
3,060,981✔
863
    *pLen = len;
3,060,981✔
864
  } else {
865
    taosMemoryFree(pBuf);
×
866
  }
867

868
  return code;
3,060,546✔
869
}
870

871
int32_t buildSubmitReqFromStbBlock(SDataInserterHandle* pInserter, SHashObj* pHash, const SSDataBlock* pDataBlock,
2,268✔
872
                                   const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid) {
873
  SArray* pVals = NULL;
2,268✔
874
  SArray* pTagVals = NULL;
2,268✔
875
  SSubmitReqSendInfo** ppSendInfo = NULL;
2,268✔
876
  int32_t numOfBlks = 0;
2,268✔
877

878
  terrno = TSDB_CODE_SUCCESS;
2,268✔
879

880
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
2,268✔
881
  int32_t rows = pDataBlock->info.rows;
2,268✔
882

883
  if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
2,268✔
884
    goto _end;
×
885
  }
886

887
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
2,268✔
888
    goto _end;
×
889
  }
890

891
  SDBVgInfo* dbInfo = NULL;
2,268✔
892
  int32_t    code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
2,268✔
893
  if (code != TSDB_CODE_SUCCESS) {
2,268✔
894
    terrno = code;
×
895
    goto _end;
×
896
  }
897

898
  for (int32_t j = 0; j < rows; ++j) {
6,804✔
899
    SSubmitTbData tbData = {0};
4,536✔
900
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
4,536✔
901
      goto _end;
×
902
    }
903
    tbData.suid = suid;
4,536✔
904
    tbData.uid = uid;
4,536✔
905
    tbData.sver = pTSchema->version;
4,536✔
906

907
    int64_t lastTs = TSKEY_MIN;
4,536✔
908

909
    taosArrayClear(pVals);
4,536✔
910

911
    int32_t offset = 0;
4,536✔
912
    taosArrayClear(pTagVals);
4,536✔
913
    tbData.uid = 0;
4,536✔
914
    tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
4,536✔
915
    if (NULL == tbData.pCreateTbReq) {
4,536✔
916
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
917
      goto _end;
×
918
    }
919
    tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
4,536✔
920

921
    SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
4,536✔
922
    if (NULL == tbname) {
4,536✔
923
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
924
      qError("Insert into stable must have tbname column");
×
925
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
926
      goto _end;
×
927
    }
928
    if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
4,536✔
929
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
930
      qError("tbname column must be binary");
×
931
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
932
      goto _end;
×
933
    }
934

935
    if (colDataIsNull_s(tbname, j)) {
9,072✔
936
      qError("insert into stable tbname column is null");
×
937
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
938
      goto _end;
×
939
    }
940
    void*   data = colDataGetVarData(tbname, j);
4,536✔
941
    SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data), .pData = varDataVal(data)};
4,536✔
942
    SColVal cv = COL_VAL_VALUE(0, sv);
4,536✔
943

944
    char tbFullName[TSDB_TABLE_FNAME_LEN];
4,536✔
945
    char tableName[TSDB_TABLE_FNAME_LEN];
4,536✔
946
    memcpy(tableName, sv.pData, sv.nData);
4,536✔
947
    tableName[sv.nData] = '\0';
4,536✔
948

949
    int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
4,536✔
950
    if (len >= TSDB_TABLE_FNAME_LEN) {
4,536✔
951
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
952
      qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
953
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
954
      goto _end;
×
955
    }
956
    SVgroupInfo vgInfo = {0};
4,536✔
957
    code = inserterGetVgInfo(dbInfo, tbFullName, &vgInfo);
4,536✔
958
    if (code != TSDB_CODE_SUCCESS) {
4,536✔
959
      terrno = code;
×
960
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
961
      goto _end;
×
962
    }
963
    SSubmitReq2* pReq = NULL;
4,536✔
964
    ppSendInfo = taosHashGet(pHash, &vgInfo.vgId, sizeof(int32_t));
4,536✔
965
    if (ppSendInfo == NULL) {
4,536✔
966
      SSubmitReqSendInfo* pSendInfo = taosMemoryCalloc(1, sizeof(SSubmitReqSendInfo));
4,536✔
967
      if (NULL == pSendInfo) {
4,536✔
968
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
969
        goto _end;
×
970
      }
971
      
972
      pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
4,536✔
973
      if (NULL == pReq) {
4,536✔
974
        taosMemoryFree(pSendInfo);
×
975
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
976
        goto _end;
×
977
      }
978

979
      if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
4,536✔
980
        taosMemoryFree(pReq);
×
981
        taosMemoryFree(pSendInfo);
×
982
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
983
        goto _end;
×
984
      }
985
      
986
      pSendInfo->msg = pReq;
4,536✔
987
      pSendInfo->epSet = vgInfo.epSet;
4,536✔
988
      code = taosHashPut(pHash, &vgInfo.vgId, sizeof(int32_t), &pSendInfo, POINTER_BYTES);
4,536✔
989
      if (code != TSDB_CODE_SUCCESS) {
4,536✔
990
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
991
        taosMemoryFree(pReq);
×
992
        taosMemoryFree(pSendInfo);
×
993
        terrno = code;
×
994
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
995
        goto _end;
×
996
      }
997
    } else {
998
      pReq = (*ppSendInfo)->msg;
×
999
    }
1000
    SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
4,536✔
1001
    if (!TagNames) {
4,536✔
1002
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1003
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1004
      goto _end;
×
1005
    }
1006
    for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
13,608✔
1007
      SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
9,072✔
1008
      int16_t  colIdx = tSchema->colId;
9,072✔
1009
      int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
9,072✔
1010
      if (NULL == slotId) {
9,072✔
1011
        continue;
4,536✔
1012
      }
1013
      if (NULL == taosArrayPush(TagNames, tSchema->name)) {
9,072✔
1014
        taosArrayDestroy(TagNames);
×
1015
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1016
        goto _end;
×
1017
      }
1018

1019
      colIdx = *slotId;
4,536✔
1020
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
4,536✔
1021
      if (NULL == pColInfoData) {
4,536✔
1022
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1023
        taosArrayDestroy(TagNames);
×
1024
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1025
        goto _end;
×
1026
      }
1027
      // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
1028
      switch (pColInfoData->info.type) {
4,536✔
1029
        case TSDB_DATA_TYPE_NCHAR:
3,402✔
1030
        case TSDB_DATA_TYPE_VARBINARY:
1031
        case TSDB_DATA_TYPE_VARCHAR: {
1032
          if (pColInfoData->info.type != tSchema->type) {
3,402✔
1033
            qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
1034
                   tSchema->type);
1035
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1036
            taosArrayDestroy(TagNames);
×
1037
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1038
            goto _end;
×
1039
          }
1040
          if (colDataIsNull_s(pColInfoData, j)) {
6,804✔
1041
            continue;
×
1042
          } else {
1043
            void*   data = colDataGetVarData(pColInfoData, j);
3,402✔
1044
            STagVal tv = (STagVal){
3,402✔
1045
                .cid = tSchema->colId, .type = tSchema->type, .nData = varDataLen(data), .pData = varDataVal(data)};
3,402✔
1046
            if (NULL == taosArrayPush(pTagVals, &tv)) {
3,402✔
1047
              taosArrayDestroy(TagNames);
×
1048
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1049
              goto _end;
×
1050
            }
1051
          }
1052
          break;
3,402✔
1053
        }
1054
        case TSDB_DATA_TYPE_BLOB:
×
1055
        case TSDB_DATA_TYPE_JSON:
1056
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1057
          qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1058
          terrno = TSDB_CODE_APP_ERROR;
×
1059
          taosArrayDestroy(TagNames);
×
1060
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1061
          goto _end;
×
1062
          break;
1063
        default:
1,134✔
1064
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
1,134✔
1065
            if (colDataIsNull_s(pColInfoData, j)) {
2,268✔
1066
              continue;
×
1067
            } else {
1068
              void*   data = colDataGetData(pColInfoData, j);
1,134✔
1069
              STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
1,134✔
1070
              memcpy(&tv.i64, data, tSchema->bytes);
1,134✔
1071
              if (NULL == taosArrayPush(pTagVals, &tv)) {
1,134✔
1072
                taosArrayDestroy(TagNames);
×
1073
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1074
                goto _end;
×
1075
              }
1076
            }
1077
          } else {
1078
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1079
            terrno = TSDB_CODE_APP_ERROR;
×
1080
            taosArrayDestroy(TagNames);
×
1081
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1082
            goto _end;
×
1083
          }
1084
          break;
1,134✔
1085
      }
1086
    }
1087
    STag* pTag = NULL;
4,536✔
1088
    code = tTagNew(pTagVals, 1, false, &pTag);
4,536✔
1089
    if (code != TSDB_CODE_SUCCESS) {
4,536✔
1090
      terrno = code;
×
1091
      qError("failed to create tag, error:%s", tstrerror(code));
×
1092
      taosArrayDestroy(TagNames);
×
1093
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1094
      goto _end;
×
1095
    }
1096

1097
    code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, suid, pInserter->pNode->tableName, TagNames,
4,536✔
1098
                                    pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
4,536✔
1099
    if (code != TSDB_CODE_SUCCESS) {
4,536✔
1100
      terrno = code;
×
1101
      qError("failed to build create table request, error:%s", tstrerror(code));
×
1102
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1103
      goto _end;
×
1104
    }
1105

1106
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {
22,680✔
1107
      int16_t         colIdx = k;
18,144✔
1108
      const STColumn* pCol = &pTSchema->columns[k];
18,144✔
1109
      int16_t*        slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
18,144✔
1110
      if (NULL == slotId) {
18,144✔
1111
        continue;
3,402✔
1112
      }
1113
      colIdx = *slotId;
14,742✔
1114

1115
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
14,742✔
1116
      if (NULL == pColInfoData) {
14,742✔
1117
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1118
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1119
        goto _end;
×
1120
      }
1121
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
14,742✔
1122

1123
      switch (pColInfoData->info.type) {
14,742✔
1124
        case TSDB_DATA_TYPE_NCHAR:
×
1125
        case TSDB_DATA_TYPE_VARBINARY:
1126
        case TSDB_DATA_TYPE_VARCHAR: {
1127
          if (pColInfoData->info.type != pCol->type) {
×
1128
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1129
                   pCol->type);
1130
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1131
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1132
            goto _end;
×
1133
          }
1134
          if (colDataIsNull_s(pColInfoData, j)) {
×
1135
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1136
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1137
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1138
              goto _end;
×
1139
            }
1140
          } else {
1141
            void*   data = colDataGetVarData(pColInfoData, j);
×
1142
            SValue  sv = (SValue){.type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};
×
1143
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
1144
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1145
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1146
              goto _end;
×
1147
            }
1148
          }
1149
          break;
×
1150
        }
1151
        case TSDB_DATA_TYPE_BLOB:
×
1152
        case TSDB_DATA_TYPE_JSON:
1153
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1154
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1155
          terrno = TSDB_CODE_APP_ERROR;
×
1156
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1157
          goto _end;
×
1158
          break;
1159
        default:
14,742✔
1160
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
14,742✔
1161
            if (colDataIsNull_s(pColInfoData, j)) {
29,484✔
1162
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
×
1163
                qError("Primary timestamp column should not be null");
×
1164
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1165
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1166
                goto _end;
×
1167
              }
1168

1169
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1170
              if (NULL == taosArrayPush(pVals, &cv)) {
×
1171
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1172
                goto _end;
×
1173
              }
1174
            } else {
1175
              // if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
1176
              //   if (*(int64_t*)var <= lastTs) {
1177
              //     needSortMerge = true;
1178
              //   } else {
1179
              //     lastTs = *(int64_t*)var;
1180
              //   }
1181
              // }
1182

1183
              SValue sv = {.type = pCol->type};
14,742✔
1184
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
14,742✔
1185
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
14,742✔
1186
              if (NULL == taosArrayPush(pVals, &cv)) {
14,742✔
1187
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1188
                goto _end;
×
1189
              }
1190
            }
1191
          } else {
1192
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1193
            terrno = TSDB_CODE_APP_ERROR;
×
1194
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1195
            goto _end;
×
1196
          }
1197
          break;
14,742✔
1198
      }
1199
    }
1200

1201
    SRow* pRow = NULL;
4,536✔
1202
    SRowBuildScanInfo sinfo = {0};
4,536✔
1203
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
4,536✔
1204
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1205
      goto _end;
×
1206
    }
1207
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
9,072✔
1208
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1209
      goto _end;
×
1210
    }
1211

1212
    if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
9,072✔
1213
      goto _end;
×
1214
    }
1215
  }
1216

1217
_end:
2,268✔
1218
  taosArrayDestroy(pTagVals);
2,268✔
1219
  taosArrayDestroy(pVals);
2,268✔
1220

1221
  return terrno;
2,268✔
1222
}
1223

1224
int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** ppReq, const SSDataBlock* pDataBlock,
471,200✔
1225
                                const STSchema* pTSchema, int64_t* uid, int32_t* vgId, tb_uid_t* suid) {
1226
  SSubmitReq2* pReq = *ppReq;
471,200✔
1227
  SArray*      pVals = NULL;
471,200✔
1228
  SArray*      pTagVals = NULL;
471,200✔
1229
  int32_t      numOfBlks = 0;
471,200✔
1230
  char*        tableName = NULL;
471,200✔
1231
  int32_t      code = 0, lino = 0;
471,200✔
1232

1233
  terrno = TSDB_CODE_SUCCESS;
471,200✔
1234

1235
  if (NULL == pReq) {
471,200✔
1236
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
471,200✔
1237
      goto _end;
×
1238
    }
1239

1240
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
471,200✔
1241
      goto _end;
×
1242
    }
1243
  }
1244

1245
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
471,200✔
1246
  int32_t rows = pDataBlock->info.rows;
471,200✔
1247

1248
  SSubmitTbData tbData = {0};
471,200✔
1249
  if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
471,200✔
1250
    goto _end;
×
1251
  }
1252
  tbData.suid = *suid;
471,200✔
1253
  tbData.uid = *uid;
471,200✔
1254
  tbData.sver = pTSchema->version;
471,200✔
1255

1256
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
471,200✔
1257
    taosArrayDestroy(tbData.aRowP);
×
1258
    goto _end;
×
1259
  }
1260

1261
  if (pInserter->isStbInserter) {
471,200✔
1262
    if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
×
1263
      taosArrayDestroy(tbData.aRowP);
×
1264
      goto _end;
×
1265
    }
1266
  }
1267

1268
  int64_t lastTs = TSKEY_MIN;
471,200✔
1269
  bool    needSortMerge = false;
471,200✔
1270

1271
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
420,525,799✔
1272
    taosArrayClear(pVals);
420,061,931✔
1273

1274
    int32_t offset = 0;
420,061,931✔
1275
    // 处理超级表的tbname和tags
1276
    if (pInserter->isStbInserter) {
420,061,931✔
1277
      taosArrayClear(pTagVals);
×
1278
      tbData.uid = 0;
×
1279
      *uid = 0;
×
1280
      tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
1281
      tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
×
1282

1283
      SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
×
1284
      if (NULL == tbname) {
×
1285
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1286
        qError("Insert into stable must have tbname column");
×
1287
        goto _end;
×
1288
      }
1289
      if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
×
1290
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1291
        qError("tbname column must be binary");
×
1292
        goto _end;
×
1293
      }
1294

1295
      if (colDataIsNull_s(tbname, j)) {
×
1296
        qError("insert into stable tbname column is null");
×
1297
        goto _end;
×
1298
      }
1299
      void*   data = colDataGetVarData(tbname, j);
×
1300
      SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data),
×
1301
                            .pData = varDataVal(data)};  // address copy, no value
×
1302
      SColVal cv = COL_VAL_VALUE(0, sv);
×
1303

1304
      // 获取子表vgId
1305
      SDBVgInfo* dbInfo = NULL;
×
1306
      code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
×
1307
      if (code != TSDB_CODE_SUCCESS) {
×
1308
        goto _end;
×
1309
      }
1310

1311
      char tbFullName[TSDB_TABLE_FNAME_LEN];
×
1312
      taosMemoryFreeClear(tableName);
×
1313
      tableName = taosMemoryCalloc(1, sv.nData + 1);
×
1314
      TSDB_CHECK_NULL(tableName, code, lino, _end, terrno);
×
1315
      tstrncpy(tableName, sv.pData, sv.nData);
×
1316
      tableName[sv.nData] = '\0';
×
1317

1318
      int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
×
1319
      if (len >= TSDB_TABLE_FNAME_LEN) {
×
1320
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1321
        qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
1322
        goto _end;
×
1323
      }
1324
      code = inserterGetVgId(dbInfo, tbFullName, vgId);
×
1325
      if (code != TSDB_CODE_SUCCESS) {
×
1326
        terrno = code;
×
1327
        goto _end;
×
1328
      }
1329
      // 解析tag
1330
      SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
×
1331
      if (!TagNames) {
×
1332
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1333
        goto _end;
×
1334
      }
1335
      for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
×
1336
        SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
×
1337
        int16_t  colIdx = tSchema->colId;
×
1338
        if (NULL == taosArrayPush(TagNames, tSchema->name)) {
×
1339
          goto _end;
×
1340
        }
1341
        int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
×
1342
        if (NULL == slotId) {
×
1343
          continue;
×
1344
        }
1345

1346
        colIdx = *slotId;
×
1347
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
×
1348
        if (NULL == pColInfoData) {
×
1349
          terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1350
          goto _end;
×
1351
        }
1352
        // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
1353
        switch (pColInfoData->info.type) {
×
1354
          case TSDB_DATA_TYPE_NCHAR:
×
1355
          case TSDB_DATA_TYPE_VARBINARY:
1356
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1357
            if (pColInfoData->info.type != tSchema->type) {
×
1358
              qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
1359
                     tSchema->type);
1360
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1361
              goto _end;
×
1362
            }
1363
            if (colDataIsNull_s(pColInfoData, j)) {
×
1364
              continue;
×
1365
            } else {
1366
              void*   data = colDataGetVarData(pColInfoData, j);
×
1367
              STagVal tv = (STagVal){.cid = tSchema->colId,
×
1368
                                     .type = tSchema->type,
×
1369
                                     .nData = varDataLen(data),
×
1370
                                     .pData = varDataVal(data)};  // address copy, no value
×
1371
              if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1372
                goto _end;
×
1373
              }
1374
            }
1375
            break;
×
1376
          }
1377
          case TSDB_DATA_TYPE_BLOB:
×
1378
          case TSDB_DATA_TYPE_JSON:
1379
          case TSDB_DATA_TYPE_MEDIUMBLOB:
1380
            qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1381
            terrno = TSDB_CODE_APP_ERROR;
×
1382
            goto _end;
×
1383
            break;
1384
          default:
×
1385
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
1386
              if (colDataIsNull_s(pColInfoData, j)) {
×
1387
                continue;
×
1388
              } else {
1389
                void*   data = colDataGetData(pColInfoData, j);
×
1390
                STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
×
1391
                memcpy(&tv.i64, data, tSchema->bytes);
×
1392
                if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1393
                  goto _end;
×
1394
                }
1395
              }
1396
            } else {
1397
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1398
              terrno = TSDB_CODE_APP_ERROR;
×
1399
              goto _end;
×
1400
            }
1401
            break;
×
1402
        }
1403
      }
1404
      STag* pTag = NULL;
×
1405
      code = tTagNew(pTagVals, 1, false, &pTag);
×
1406
      if (code != TSDB_CODE_SUCCESS) {
×
1407
        terrno = code;
×
1408
        qError("failed to create tag, error:%s", tstrerror(code));
×
1409
        goto _end;
×
1410
      }
1411

1412
      code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, *suid, pInserter->pNode->tableName, TagNames,
×
1413
                               pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
×
1414
    }
1415

1416
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {  // iterate by column
2,147,483,647✔
1417
      int16_t         colIdx = k;
2,147,483,647✔
1418
      const STColumn* pCol = &pTSchema->columns[k];
2,147,483,647✔
1419
      if (!pInserter->fullOrderColList) {
2,147,483,647✔
1420
        int16_t* slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
130,202✔
1421
        if (NULL == slotId) {
130,202✔
1422
          continue;
31,700✔
1423
        }
1424

1425
        colIdx = *slotId;
98,502✔
1426
      }
1427

1428
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
2,147,483,647✔
1429
      if (NULL == pColInfoData) {
2,147,483,647✔
1430
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1431
        goto _end;
×
1432
      }
1433
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
2,147,483,647✔
1434

1435
      switch (pColInfoData->info.type) {
2,147,483,647✔
1436
        case TSDB_DATA_TYPE_NCHAR:
1,636,806,164✔
1437
        case TSDB_DATA_TYPE_VARBINARY:
1438
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1439
          if (pColInfoData->info.type != pCol->type) {
1,636,806,164✔
1440
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1441
                   pCol->type);
1442
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1443
            goto _end;
×
1444
          }
1445
          if (colDataIsNull_s(pColInfoData, j)) {
2,147,483,647✔
1446
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
2,444✔
1447
            if (NULL == taosArrayPush(pVals, &cv)) {
2,444✔
1448
              goto _end;
×
1449
            }
1450
          } else {
1451
            void*  data = colDataGetVarData(pColInfoData, j);
1,636,803,720✔
1452
            SValue sv = (SValue){
2,147,483,647✔
1453
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
1,636,803,720✔
1454
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
1,636,803,720✔
1455
            if (NULL == taosArrayPush(pVals, &cv)) {
1,636,803,720✔
1456
              goto _end;
×
1457
            }
1458
          }
1459
          break;
1,636,806,164✔
1460
        }
1461
        case TSDB_DATA_TYPE_BLOB:
×
1462
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1463
        case TSDB_DATA_TYPE_JSON:
1464
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1465
          terrno = TSDB_CODE_APP_ERROR;
×
1466
          goto _end;
×
1467
          break;
1468
        default:
2,147,483,647✔
1469
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
2,147,483,647✔
1470
            if (colDataIsNull_s(pColInfoData, j)) {
2,147,483,647✔
1471
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
20,704✔
1472
                qError("Primary timestamp column should not be null");
×
1473
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1474
                goto _end;
×
1475
              }
1476

1477
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
20,704✔
1478
              if (NULL == taosArrayPush(pVals, &cv)) {
20,704✔
1479
                goto _end;
×
1480
              }
1481
            } else {
1482
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
2,147,483,647✔
1483
                if (*(int64_t*)var <= lastTs) {
419,988,633✔
1484
                  needSortMerge = true;
11,151✔
1485
                } else {
1486
                  lastTs = *(int64_t*)var;
419,977,482✔
1487
                }
1488
              }
1489

1490
              SValue sv = {.type = pCol->type};
2,147,483,647✔
1491
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
2,147,483,647✔
1492
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
2,147,483,647✔
1493
              if (NULL == taosArrayPush(pVals, &cv)) {
2,147,483,647✔
1494
                goto _end;
×
1495
              }
1496
            }
1497
          } else {
1498
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1499
            terrno = TSDB_CODE_APP_ERROR;
×
1500
            goto _end;
×
1501
          }
1502
          break;
2,147,483,647✔
1503
      }
1504
    }
1505

1506
    SRow*             pRow = NULL;
420,061,931✔
1507
    SRowBuildScanInfo sinfo = {0};
420,061,931✔
1508
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
420,061,931✔
1509
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
7,332✔
1510
      goto _end;
7,332✔
1511
    }
1512
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
840,109,198✔
1513
      goto _end;
×
1514
    }
1515
  }
1516

1517
  if (needSortMerge) {
463,868✔
1518
    if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
11,151✔
1519
        (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
11,151✔
1520
      goto _end;
×
1521
    }
1522
  }
1523

1524
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
927,736✔
1525
    goto _end;
×
1526
  }
1527

1528
_end:
471,200✔
1529

1530
  taosMemoryFreeClear(tableName);
471,200✔
1531

1532
  taosArrayDestroy(pTagVals);
471,200✔
1533
  taosArrayDestroy(pVals);
471,200✔
1534

1535
  if (terrno != 0) {
471,200✔
1536
    *ppReq = NULL;
7,332✔
1537
    if (pReq) {
7,332✔
1538
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
7,332✔
1539
      taosMemoryFree(pReq);
7,332✔
1540
    }
1541

1542
    return terrno;
7,332✔
1543
  }
1544
  *ppReq = pReq;
463,868✔
1545

1546
  return TSDB_CODE_SUCCESS;
463,868✔
1547
}
1548

1549
static void destroySubmitReqWrapper(void* p) {
4,536✔
1550
  SSubmitReqSendInfo* pSendInfo = *(SSubmitReqSendInfo**)p;
4,536✔
1551
  if (pSendInfo != NULL) {
4,536✔
1552
    if (pSendInfo->msg != NULL) {
4,536✔
1553
      tDestroySubmitReq(pSendInfo->msg, TSDB_MSG_FLG_ENCODE);
4,536✔
1554
      taosMemoryFree(pSendInfo->msg);
4,536✔
1555
    }
1556
    taosMemoryFree(pSendInfo);
4,536✔
1557
  }
1558
}
4,536✔
1559

1560
int32_t dataBlocksToSubmitReqArray(SDataInserterHandle* pInserter, SArray* pMsgs) {
2,268✔
1561
  const SArray*   pBlocks = pInserter->pDataBlocks;
2,268✔
1562
  const STSchema* pTSchema = pInserter->pSchema;
2,268✔
1563
  int64_t         uid = pInserter->pNode->tableId;
2,268✔
1564
  int64_t         suid = pInserter->pNode->stableId;
2,268✔
1565
  int32_t         vgId = pInserter->pNode->vgId;
2,268✔
1566
  int32_t         sz = taosArrayGetSize(pBlocks);
2,268✔
1567
  int32_t         code = 0;
2,268✔
1568

1569
  SHashObj* pHash = NULL;
2,268✔
1570
  void*     iterator = NULL;
2,268✔
1571

1572
  for (int32_t i = 0; i < sz; i++) {
4,536✔
1573
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
2,268✔
1574
    if (NULL == pDataBlock) {
2,268✔
1575
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1576
    }
1577
    if (pHash == NULL) {
2,268✔
1578
      pHash = taosHashInit(sz * pDataBlock->info.rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false,
2,268✔
1579
                           HASH_ENTRY_LOCK);
1580
      if (NULL == pHash) {
2,268✔
1581
        return terrno;
×
1582
      }
1583
      taosHashSetFreeFp(pHash, destroySubmitReqWrapper);
2,268✔
1584
    }
1585
    code = buildSubmitReqFromStbBlock(pInserter, pHash, pDataBlock, pTSchema, uid, vgId, suid);
2,268✔
1586
    if (code != TSDB_CODE_SUCCESS) {
2,268✔
1587
      goto _end;
×
1588
    }
1589
  }
1590

1591
  size_t keyLen = 0;
2,268✔
1592
  while ((iterator = taosHashIterate(pHash, iterator))) {
6,804✔
1593
    SSubmitReqSendInfo* pReqSendInfo = *(SSubmitReqSendInfo**)iterator;
4,536✔
1594
    int32_t*            ctbVgId = taosHashGetKey(iterator, &keyLen);
4,536✔
1595

1596
    SSubmitTbDataSendInfo* pTbSendInfo = taosMemoryCalloc(1, sizeof(SSubmitTbDataSendInfo));
4,536✔
1597
    if (NULL == pTbSendInfo) {
4,536✔
1598
      code = terrno;
×
1599
      goto _end;
×
1600
    }
1601
    code = submitReqToMsg(*ctbVgId, pReqSendInfo->msg, &pTbSendInfo->msg.pData, &pTbSendInfo->msg.len);
4,536✔
1602
    if (code != TSDB_CODE_SUCCESS) {
4,536✔
1603
      taosMemoryFree(pTbSendInfo);
×
1604
      goto _end;
×
1605
    }
1606
    pTbSendInfo->epSet = pReqSendInfo->epSet;
4,536✔
1607
    if (NULL == taosArrayPush(pMsgs, &pTbSendInfo)) {
4,536✔
1608
      taosMemoryFree(pTbSendInfo->msg.pData);
×
1609
      taosMemoryFree(pTbSendInfo);
×
1610
      code = terrno;
×
1611
      goto _end;
×
1612
    }
1613
  }
1614

1615
_end:
2,268✔
1616
  if (pHash != NULL) {
2,268✔
1617
    taosHashCleanup(pHash);
2,268✔
1618
  }
1619

1620
  return code;
2,268✔
1621
}
1622

1623
int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32_t* msgLen) {
471,200✔
1624
  const SArray*   pBlocks = pInserter->pDataBlocks;
471,200✔
1625
  const STSchema* pTSchema = pInserter->pSchema;
471,200✔
1626
  int64_t         uid = pInserter->pNode->tableId;
471,200✔
1627
  int64_t         suid = pInserter->pNode->stableId;
471,200✔
1628
  int32_t         vgId = pInserter->pNode->vgId;
471,200✔
1629
  int32_t         sz = taosArrayGetSize(pBlocks);
471,200✔
1630
  int32_t         code = 0;
471,200✔
1631
  SSubmitReq2*    pReq = NULL;
471,200✔
1632

1633
  for (int32_t i = 0; i < sz; i++) {
935,068✔
1634
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
471,200✔
1635
    if (NULL == pDataBlock) {
471,200✔
1636
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1637
    }
1638
    code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, &uid, &vgId, &suid);
471,200✔
1639
    if (code) {
471,200✔
1640
      if (pReq) {
7,332✔
1641
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
1642
        taosMemoryFree(pReq);
×
1643
      }
1644

1645
      return code;
7,332✔
1646
    }
1647
  }
1648

1649
  code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
463,868✔
1650
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
463,868✔
1651
  taosMemoryFree(pReq);
463,868✔
1652

1653
  return code;
463,868✔
1654
}
1655

1656
int32_t getStreamInsertTableInfo(int64_t streamId, int64_t groupId, SInsertTableInfo*** ppTbInfo) {
2,619,209✔
1657
  int64_t            key[2] = {streamId, groupId};
2,619,209✔
1658
  SInsertTableInfo** pTmp = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
2,619,444✔
1659
  if (NULL == pTmp || *pTmp == NULL) {
2,619,408✔
1660
    return TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
1661
  }
1662

1663
  *ppTbInfo = pTmp;
2,619,408✔
1664
  return TSDB_CODE_SUCCESS;
2,619,408✔
1665
}
1666

1667
static int32_t releaseStreamInsertTableInfo(SInsertTableInfo** ppTbInfo) {
2,619,439✔
1668
  taosHashRelease(gStreamGrpTableHash, ppTbInfo);
2,619,439✔
1669
  return TSDB_CODE_SUCCESS;
2,619,643✔
1670
}
1671

1672
int32_t buildNormalTableCreateReq(SDataInserterHandle* pInserter, SStreamInserterParam* pInsertParam,
97,485✔
1673
                                  SSubmitTbData* tbData) {
1674
  int32_t code = TSDB_CODE_SUCCESS;
97,485✔
1675

1676
  tbData->suid = 0;
97,485✔
1677

1678
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
97,485✔
1679
  if (NULL == tbData->pCreateTbReq) {
97,485✔
1680
    goto _end;
×
1681
  }
1682
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
97,485✔
1683
  tbData->pCreateTbReq->type = TSDB_NORMAL_TABLE;
97,485✔
1684
  tbData->pCreateTbReq->flags |= (TD_CREATE_NORMAL_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
97,485✔
1685
  tbData->pCreateTbReq->uid = 0;
97,286✔
1686
  tbData->sver = pInsertParam->sver;
97,286✔
1687

1688
  tbData->pCreateTbReq->name = taosStrdup(pInsertParam->tbname);
97,485✔
1689
  if (!tbData->pCreateTbReq->name) return terrno;
97,485✔
1690

1691
  int32_t numOfCols = pInsertParam->pFields->size;
97,485✔
1692
  tbData->pCreateTbReq->ntb.schemaRow.nCols = numOfCols;
97,485✔
1693
  tbData->pCreateTbReq->ntb.schemaRow.version = 1;
97,485✔
1694

1695
  tbData->pCreateTbReq->ntb.schemaRow.pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema));
97,485✔
1696
  if (NULL == tbData->pCreateTbReq->ntb.schemaRow.pSchema) {
97,485✔
1697
    goto _end;
×
1698
  }
1699
  for (int32_t i = 0; i < numOfCols; ++i) {
547,964✔
1700
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
450,479✔
1701
    if (NULL == pField) {
450,479✔
1702
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1703
      goto _end;
×
1704
    }
1705
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].colId = i + 1;
450,479✔
1706
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].type = pField->type;
450,280✔
1707
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].bytes = pField->bytes;
450,280✔
1708
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].flags = pField->flags;
450,479✔
1709
    if (i == 0 && pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
450,479✔
1710
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1711
      qError("buildNormalTableCreateReq, the first column must be timestamp.");
×
1712
      goto _end;
×
1713
    }
1714
    snprintf(tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].name, TSDB_COL_NAME_LEN, "%s", pField->name);
450,280✔
1715
    if (IS_DECIMAL_TYPE(pField->type)) {
450,479✔
1716
      if (!tbData->pCreateTbReq->pExtSchemas) {
×
1717
        tbData->pCreateTbReq->pExtSchemas = taosMemoryCalloc(numOfCols, sizeof(SExtSchema));
×
1718
        if (NULL == tbData->pCreateTbReq->pExtSchemas) {
×
1719
          tdDestroySVCreateTbReq(tbData->pCreateTbReq);
×
1720
          tbData->pCreateTbReq = NULL;
×
1721
          return terrno;
×
1722
        }
1723
      }
1724
      tbData->pCreateTbReq->pExtSchemas[i].typeMod = pField->typeMod;
×
1725
    }
1726
  }
1727
  return TSDB_CODE_SUCCESS;
97,485✔
1728
_end:
×
1729
  return code;
×
1730
}
1731

1732
// reference tBuildTSchema funciton
1733
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema) {
224,276✔
1734
  int32_t code = TSDB_CODE_SUCCESS;
224,276✔
1735

1736
  int32_t   numOfCols = pInsertParam->pFields->size;
224,276✔
1737
  STSchema* pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
224,276✔
1738
  if (NULL == pTSchema) {
224,072✔
1739
    return terrno;
×
1740
  }
1741
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
224,072✔
1742
    pTSchema->version =
73,036✔
1743
        1;  // normal table version start from 1, if has exist table, it will be reset by resetInserterTbVersion
1744
  } else {
1745
    pTSchema->version = pInsertParam->sver;
151,036✔
1746
  }
1747
  pTSchema->numOfCols = numOfCols;
224,276✔
1748

1749
  SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, 0);
224,072✔
1750
  if (NULL == pField) {
224,072✔
1751
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1752
    goto _end;
×
1753
  }
1754
  pTSchema->columns[0].colId = PRIMARYKEY_TIMESTAMP_COL_ID;
224,072✔
1755
  pTSchema->columns[0].type = pField->type;
224,072✔
1756
  pTSchema->columns[0].flags = pField->flags;
224,276✔
1757
  pTSchema->columns[0].bytes = TYPE_BYTES[pField->type];
224,276✔
1758
  pTSchema->columns[0].offset = -1;
224,276✔
1759

1760
  pTSchema->tlen = 0;
224,276✔
1761
  pTSchema->flen = 0;
224,276✔
1762
  for (int32_t i = 1; i < numOfCols; ++i) {
1,065,448✔
1763
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
841,172✔
1764
    if (NULL == pField) {
840,968✔
1765
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1766
      goto _end;
×
1767
    }
1768
    pTSchema->columns[i].colId = i + 1;
840,968✔
1769
    pTSchema->columns[i].type = pField->type;
840,968✔
1770
    pTSchema->columns[i].flags = pField->flags;
840,968✔
1771
    pTSchema->columns[i].bytes = pField->bytes;
840,968✔
1772
    pTSchema->columns[i].offset = pTSchema->flen;
841,172✔
1773

1774
    if (IS_VAR_DATA_TYPE(pField->type)) {
841,376✔
1775
      pTSchema->columns[i].bytes = pField->bytes;
13,051✔
1776
      pTSchema->tlen += (TYPE_BYTES[pField->type] + pField->bytes);
13,051✔
1777
    } else {
1778
      pTSchema->columns[i].bytes = TYPE_BYTES[pField->type];
828,325✔
1779
      pTSchema->tlen += TYPE_BYTES[pField->type];
828,325✔
1780
    }
1781

1782
    pTSchema->flen += TYPE_BYTES[pField->type];
841,172✔
1783
  }
1784

1785
#if 1
1786
  pTSchema->tlen += (int32_t)TD_BITMAP_BYTES(numOfCols);
224,276✔
1787
#endif
1788

1789
_end:
224,276✔
1790
  if (code != TSDB_CODE_SUCCESS) {
224,276✔
1791
    taosMemoryFree(pTSchema);
×
1792
    *ppTSchema = NULL;
×
1793
  } else {
1794
    *ppTSchema = pTSchema;
224,276✔
1795
  }
1796
  return code;
224,276✔
1797
}
1798

1799
static int32_t getTagValsFromStreamInserterInfo(SStreamDataInserterInfo* pInserterInfo, int32_t preCols,
212,238✔
1800
                                                SArray** ppTagVals, SArray* pTagCids) {
1801
  int32_t code = TSDB_CODE_SUCCESS;
212,238✔
1802
  int32_t nTags = pInserterInfo->pTagVals->size;
212,238✔
1803
  *ppTagVals = taosArrayInit(nTags, sizeof(STagVal));
212,238✔
1804
  if (!ppTagVals) {
212,238✔
1805
    return terrno;
×
1806
  }
1807
  for (int32_t i = 0; i < pInserterInfo->pTagVals->size; ++i) {
561,059✔
1808
    SStreamTagInfo* pTagInfo = taosArrayGet(pInserterInfo->pTagVals, i);
348,821✔
1809
    STagVal         tagVal = {
351,393✔
1810
                .cid = pTagCids ? *(col_id_t*)taosArrayGet(pTagCids, i) : preCols + i + 1,
348,821✔
1811
                .type = pTagInfo->val.data.type,
348,821✔
1812
    };
1813
    if (!pTagInfo->val.isNull) {
348,821✔
1814
      if (IS_VAR_DATA_TYPE(pTagInfo->val.data.type)) {
348,821✔
1815
        tagVal.nData = pTagInfo->val.data.nData;
253,091✔
1816
        tagVal.pData = pTagInfo->val.data.pData;
252,887✔
1817
      } else {
1818
        tagVal.i64 = pTagInfo->val.data.val;
95,730✔
1819
      }
1820

1821
      if (NULL == taosArrayPush(*ppTagVals, &tagVal)) {
697,438✔
1822
        code = terrno;
×
1823
        goto _end;
×
1824
      }
1825
    }
1826
  }
1827
_end:
212,238✔
1828
  if (code != TSDB_CODE_SUCCESS) {
212,238✔
1829
    taosArrayDestroy(*ppTagVals);
×
1830
    *ppTagVals = NULL;
×
1831
  }
1832
  return code;
212,238✔
1833
}
1834

1835
static int32_t buildStreamSubTableCreateReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
212,238✔
1836
                                            SStreamInserterParam* pInsertParam, SStreamDataInserterInfo* pInserterInfo,
1837
                                            SSubmitTbData* tbData) {
1838
  int32_t code = TSDB_CODE_SUCCESS;
212,238✔
1839
  STag*   pTag = NULL;
212,238✔
1840
  SArray* pTagVals = NULL;
212,238✔
1841
  SArray* TagNames = NULL;
212,238✔
1842

1843
  if (pInsertParam->pTagFields == NULL) {
212,238✔
1844
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagFields is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
×
1845
                 pInsertParam->sver);
1846
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1847
  }
1848
  if (pInserterInfo->pTagVals == NULL || pInserterInfo->pTagVals->size == 0) {
212,238✔
UNCOV
1849
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagVals is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
×
1850
                 pInsertParam->sver);
1851
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1852
  }
1853
  if (pInsertParam->suid <= 0 || pInsertParam->sver <= 0) {
212,238✔
1854
    ST_TASK_ELOG("buildStreamSubTableCreateReq, suid:%" PRId64
×
1855
                 ", sver:%d"
1856
                 " must be greater than 0",
1857
                 pInsertParam->suid, pInsertParam->sver);
1858
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1859
  }
1860
  int32_t nTags = pInserterInfo->pTagVals->size;
212,238✔
1861

1862
  TagNames = taosArrayInit(nTags, TSDB_COL_NAME_LEN);
212,238✔
1863
  if (!TagNames) {
212,238✔
1864
    code = terrno;
×
1865
    goto _end;
×
1866
  }
1867
  for (int32_t i = 0; i < nTags; ++i) {
561,059✔
1868
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pTagFields, i);
348,821✔
1869
    if (NULL == taosArrayPush(TagNames, pField->name)) {
697,642✔
1870
      code = terrno;
×
1871
      goto _end;
×
1872
    }
1873
  }
1874

1875
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
212,238✔
1876
  tbData->uid = 0;
212,238✔
1877
  tbData->suid = pInsertParam->suid;
212,238✔
1878
  tbData->sver = pInsertParam->sver;
212,238✔
1879

1880
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
212,238✔
1881
  if (NULL == tbData->pCreateTbReq) {
212,238✔
1882
    code = terrno;
×
1883
    goto _end;
×
1884
  }
1885
  tbData->pCreateTbReq->type = TSDB_CHILD_TABLE;
212,238✔
1886
  tbData->pCreateTbReq->flags |= (TD_CREATE_SUB_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
212,238✔
1887

1888
  code = getTagValsFromStreamInserterInfo(pInserterInfo, pInsertParam->pFields->size, &pTagVals, pInsertParam->tagCids);
212,033✔
1889
  if (code != TSDB_CODE_SUCCESS) {
212,238✔
1890
    goto _end;
×
1891
  }
1892

1893
  code = tTagNew(pTagVals, pInsertParam->sver, false, &pTag);
212,238✔
1894
  if (code != TSDB_CODE_SUCCESS) {
212,238✔
1895
    ST_TASK_ELOG("failed to create tag, error:%s", tstrerror(code));
×
1896
    goto _end;
×
1897
  }
1898
  code = inserterBuildCreateTbReq(tbData->pCreateTbReq, pInserterInfo->tbName, pTag, tbData->suid,
212,238✔
1899
                                  pInsertParam->stbname, TagNames, nTags, TSDB_DEFAULT_TABLE_TTL);
212,238✔
1900
  if (code != TSDB_CODE_SUCCESS) {
212,238✔
1901
    ST_TASK_ELOG("failed to build create table request, error:%s", tstrerror(code));
×
1902
    goto _end;
×
1903
  }
1904

1905
_end:
212,238✔
1906
  if (code != TSDB_CODE_SUCCESS) {
212,238✔
1907
    ST_TASK_ELOG("buildStreamSubTableCreateReq failed, error:%s", tstrerror(code));
×
1908
    if (tbData->pCreateTbReq) {
×
1909
      taosMemoryFreeClear(tbData->pCreateTbReq->name);
×
1910
      taosMemoryFreeClear(tbData->pCreateTbReq);
×
1911
    }
1912
    if (TagNames) {
×
1913
      taosArrayDestroy(TagNames);
×
1914
    }
1915
  }
1916

1917
  if (pTagVals) {
212,238✔
1918
    taosArrayDestroy(pTagVals);
212,003✔
1919
  }
1920
  return code;
212,238✔
1921
}
1922

1923
static int32_t appendInsertData(SStreamInserterParam* pInsertParam, const SSDataBlock* pDataBlock,
2,594,149✔
1924
                                SSubmitTbData* tbData, STSchema* pTSchema, SBuildInsertDataInfo* dataInsertInfo) {
1925
  int32_t code = TSDB_CODE_SUCCESS;
2,594,149✔
1926
  int32_t lino = 0;
2,594,149✔
1927

1928
  int32_t rows = pDataBlock ? pDataBlock->info.rows : 0;
2,594,149✔
1929
  int32_t numOfCols = pInsertParam->pFields->size;
2,593,945✔
1930
  int32_t colNum = pDataBlock ? taosArrayGetSize(pDataBlock->pDataBlock) : 0;
2,593,741✔
1931

1932
  SArray* pVals = NULL;
2,593,945✔
1933
  if (!(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
2,593,945✔
1934
    code = terrno;
×
1935
    QUERY_CHECK_CODE(code, lino, _end);
×
1936
  }
1937

1938
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
26,939,435✔
1939
    taosArrayClear(pVals);
24,345,051✔
1940

1941
    bool tsOrPrimaryKeyIsNull = false;
24,343,406✔
1942
    for (int32_t k = 0; k < numOfCols; ++k) {  // iterate by column
85,837,747✔
1943
      int16_t colIdx = k + 1;
61,561,187✔
1944

1945
      SFieldWithOptions* pCol = taosArrayGet(pInsertParam->pFields, k);
61,561,187✔
1946
      if (PRIMARYKEY_TIMESTAMP_COL_ID != colIdx && TSDB_DATA_TYPE_NULL == pCol->type) {
61,531,572✔
1947
        SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
×
1948
        if (NULL == taosArrayPush(pVals, &cv)) {
×
1949
          code = terrno;
×
1950
          QUERY_CHECK_CODE(code, lino, _end);
×
1951
        }
1952
        continue;
×
1953
      }
1954

1955
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
61,530,162✔
1956
      if (NULL == pColInfoData) {
61,494,490✔
1957
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1958
        QUERY_CHECK_CODE(code, lino, _end);
×
1959
      }
1960
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
61,494,490✔
1961

1962
      if (colDataIsNull_s(pColInfoData, j) && (pCol->flags & COL_IS_KEY)) {
123,082,479✔
1963
        tsOrPrimaryKeyIsNull = true;
43,620✔
1964
        qDebug("Primary key column should not be null, skip this row");
43,620✔
1965
        break;
43,620✔
1966
      }
1967
      switch (pColInfoData->info.type) {
61,536,379✔
1968
        case TSDB_DATA_TYPE_NCHAR:
3,004,764✔
1969
        case TSDB_DATA_TYPE_VARBINARY:
1970
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1971
          if (pColInfoData->info.type != pCol->type) {
3,004,764✔
1972
            qError("tb:%s column:%d type:%d in block dismatch with schema col:%d type:%d", pInsertParam->tbname, k,
×
1973
                   pColInfoData->info.type, k, pCol->type);
1974
            code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1975
            QUERY_CHECK_CODE(code, lino, _end);
×
1976
          }
1977
          if (colDataIsNull_s(pColInfoData, j)) {
6,009,528✔
1978
            SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
2,928✔
1979
            if (NULL == taosArrayPush(pVals, &cv)) {
2,928✔
1980
              code = terrno;
×
1981
              QUERY_CHECK_CODE(code, lino, _end);
×
1982
            }
1983
          } else {
1984
            if (pColInfoData->pData == NULL) {
3,001,836✔
1985
              qError("build insert tb:%s, column:%d data is NULL in block", pInsertParam->tbname, k);
×
1986
              code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1987
              QUERY_CHECK_CODE(code, lino, _end);
×
1988
            }
1989
            void*  data = colDataGetVarData(pColInfoData, j);
3,001,836✔
1990
            SValue sv = (SValue){
9,005,508✔
1991
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
3,001,836✔
1992
            SColVal cv = COL_VAL_VALUE(colIdx, sv);
3,001,836✔
1993
            if (NULL == taosArrayPush(pVals, &cv)) {
3,001,836✔
1994
              code = terrno;
×
1995
              QUERY_CHECK_CODE(code, lino, _end);
×
1996
            }
1997
          }
1998
          break;
3,004,764✔
1999
        }
2000
        case TSDB_DATA_TYPE_BLOB:
×
2001
        case TSDB_DATA_TYPE_JSON:
2002
        case TSDB_DATA_TYPE_MEDIUMBLOB:
2003
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
2004
          code = TSDB_CODE_APP_ERROR;
×
2005
          QUERY_CHECK_CODE(code, lino, _end);
×
2006
          break;
×
2007
        default:
58,531,509✔
2008
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
58,531,509✔
2009
            if (colDataIsNull_s(pColInfoData, j)) {
117,072,759✔
2010
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx) {
1,130,721✔
2011
                tsOrPrimaryKeyIsNull = true;
2,045✔
2012
                qDebug("Primary timestamp column should not be null, skip this row");
2,045✔
2013
                break;
2,045✔
2014
              }
2015

2016
              SColVal cv = COL_VAL_NULL(colIdx, pCol->type);  // should use pCol->type
1,128,676✔
2017
              if (NULL == taosArrayPush(pVals, &cv)) {
1,128,676✔
2018
                code = terrno;
×
2019
                QUERY_CHECK_CODE(code, lino, _end);
×
2020
              }
2021
            } else {
2022
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx && !dataInsertInfo->needSortMerge) {
57,414,591✔
2023
                if (*(int64_t*)var <= dataInsertInfo->lastTs) {
2,634,557✔
2024
                  dataInsertInfo->needSortMerge = true;
20,279✔
2025
                } else {
2026
                  dataInsertInfo->lastTs = *(int64_t*)var;
2,614,278✔
2027
                }
2028
              }
2029

2030
              SValue sv = {.type = pCol->type};
57,416,440✔
2031
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
57,376,725✔
2032
              SColVal cv = COL_VAL_VALUE(colIdx, sv);
57,366,608✔
2033
              if (NULL == taosArrayPush(pVals, &cv)) {
57,369,999✔
2034
                code = terrno;
×
2035
                QUERY_CHECK_CODE(code, lino, _end);
1,175✔
2036
              }
2037
            }
2038
          } else {
2039
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
2040
            code = TSDB_CODE_APP_ERROR;
9,671✔
2041
            QUERY_CHECK_CODE(code, lino, _end);
9,671✔
2042
          }
2043
          break;
58,505,761✔
2044
      }
2045
      if (tsOrPrimaryKeyIsNull) break;  // skip remaining columns because the primary key is null
61,496,386✔
2046
    }
2047
    if (tsOrPrimaryKeyIsNull) continue;  // skip this row if primary key is null
24,322,225✔
2048
    SRow*             pRow = NULL;
24,276,560✔
2049
    SRowBuildScanInfo sinfo = {0};
24,275,651✔
2050
    if ((code = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) != TSDB_CODE_SUCCESS) {
24,277,766✔
2051
      QUERY_CHECK_CODE(code, lino, _end);
×
2052
    }
2053
    if (NULL == taosArrayPush(tbData->aRowP, &pRow)) {
48,588,197✔
2054
      taosMemFree(pRow);
×
2055
      code = terrno;
×
2056
      QUERY_CHECK_CODE(code, lino, _end);
×
2057
    }
2058
  }
2059
  if (dataInsertInfo->isLastBlock) {
2,594,384✔
2060
    int32_t nRows = taosArrayGetSize(tbData->aRowP);
2,592,577✔
2061
    if (taosArrayGetSize(tbData->aRowP) == 0) {
2,592,373✔
2062
      tbData->flags |= SUBMIT_REQ_ONLY_CREATE_TABLE;
1,932,491✔
2063
      stDebug("no valid data to insert, try to only create tabale:%s", pInsertParam->tbname);
1,932,491✔
2064
    }
2065
    stDebug("appendInsertData, isLastBlock:%d, needSortMerge:%d, totalRows:%d", dataInsertInfo->isLastBlock,
2,592,373✔
2066
            dataInsertInfo->needSortMerge, nRows);
2067
    if (dataInsertInfo->needSortMerge) {
2,592,373✔
2068
      if ((tRowSort(tbData->aRowP) != TSDB_CODE_SUCCESS) ||
40,558✔
2069
          (code = tRowMerge(tbData->aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
20,279✔
2070
        QUERY_CHECK_CODE(code, lino, _end);
×
2071
      }
2072
    }
2073
    nRows = taosArrayGetSize(tbData->aRowP);
2,592,373✔
2074
    stDebug("appendInsertData, after merge, totalRows:%d", nRows);
2,592,373✔
2075
  }
2076

2077
_end:
144,661✔
2078
  taosArrayDestroy(pVals);
2,593,710✔
2079
  return code;
2,594,149✔
2080
}
2081

2082
int32_t buildStreamSubmitReqFromBlock(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
2,594,149✔
2083
                                      SStreamDataInserterInfo* pInserterInfo, SSubmitReq2** ppReq,
2084
                                      const SSDataBlock* pDataBlock, SVgroupInfo* vgInfo,
2085
                                      SBuildInsertDataInfo* tbDataInfo) {
2086
  SSubmitReq2* pReq = *ppReq;
2,594,149✔
2087
  int32_t      numOfBlks = 0;
2,594,149✔
2088

2089
  int32_t               code = TSDB_CODE_SUCCESS;
2,594,149✔
2090
  int32_t               lino = 0;
2,594,149✔
2091
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
2,594,149✔
2092
  SInsertTableInfo**    ppTbInfo = NULL;
2,594,149✔
2093
  SInsertTableInfo*     pTbInfo = NULL;
2,594,149✔
2094
  STSchema*             pTSchema = NULL;
2,594,149✔
2095
  SSubmitTbData*        tbData = &tbDataInfo->pTbData;
2,594,149✔
2096
  int32_t               colNum = 0;
2,594,149✔
2097
  int32_t               rows = 0;
2,594,149✔
2098

2099
  if (NULL == pReq) {
2,594,149✔
2100
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
2,592,577✔
2101
      code = terrno;
×
2102
      QUERY_CHECK_CODE(code, lino, _end);
×
2103
    }
2104
    *ppReq = pReq;
2,592,577✔
2105

2106
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
2,592,577✔
2107
      code = terrno;
×
2108
      QUERY_CHECK_CODE(code, lino, _end);
×
2109
    }
2110
  }
2111

2112
  if (pDataBlock) {
2,594,149✔
2113
    colNum = taosArrayGetSize(pDataBlock->pDataBlock);
663,703✔
2114
    rows = pDataBlock->info.rows;
663,703✔
2115
  }
2116

2117
  tbData->flags |= SUBMIT_REQ_SCHEMA_RES;
2,594,149✔
2118

2119
  if (tbDataInfo->isFirstBlock) {
2,594,149✔
2120
    if (pInserterInfo->isAutoCreateTable) {
2,592,577✔
2121
      code = initTableInfo(pInserter, pInserterInfo);
309,723✔
2122
      QUERY_CHECK_CODE(code, lino, _end);
309,723✔
2123
      if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
309,723✔
2124
        code = buildNormalTableCreateReq(pInserter, pInsertParam, tbData);
97,485✔
2125
      } else if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
212,238✔
2126
        code = buildStreamSubTableCreateReq(pTask, pInserter, pInsertParam, pInserterInfo, tbData);
212,238✔
2127
      } else {
2128
        code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
2129
        ST_TASK_ELOG("buildStreamSubmitReqFromBlock, unknown table type %d", pInsertParam->tbType);
×
2130
      }
2131
      QUERY_CHECK_CODE(code, lino, _end);
309,723✔
2132
    }
2133
  }
2134

2135
  code = getStreamInsertTableInfo(pInserterInfo->streamId, pInserterInfo->groupId, &ppTbInfo);
2,594,149✔
2136
  QUERY_CHECK_CODE(code, lino, _end);
2,593,914✔
2137

2138
  pTbInfo =  *ppTbInfo;
2,593,914✔
2139
  if (tbDataInfo->isFirstBlock) {
2,593,914✔
2140
    if (!pInserterInfo->isAutoCreateTable) {
2,592,143✔
2141
      tstrncpy(pInserterInfo->tbName, pTbInfo->tbname, TSDB_TABLE_NAME_LEN);
2,282,619✔
2142
    }
2143

2144
    tbData->uid = pTbInfo->uid;
2,592,378✔
2145
    tbData->sver = pTbInfo->version;
2,592,577✔
2146

2147
    if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
2,592,373✔
2148
      tbData->suid = pInsertParam->suid;
2,171,839✔
2149
    }
2150

2151
    pTSchema = pTbInfo->pSchema;
2,592,577✔
2152
  } else {
2153
    pTSchema = pTbInfo->pSchema;
1,572✔
2154
  }
2155

2156
  code = getTableVgInfo(pInserter, pInsertParam->dbFName, pTbInfo->tbname, vgInfo);
2,594,149✔
2157
  QUERY_CHECK_CODE(code, lino, _end);
2,593,945✔
2158

2159
  ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64 " tbname:%s autoCreate:%d uid:%" PRId64 " suid:%" PRId64
2,593,945✔
2160
               " sver:%d vgid:%d isLastBlock:%d",
2161
               pInserter, pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, tbData->uid,
2162
               tbData->suid, tbData->sver, vgInfo->vgId, tbDataInfo->isFirstBlock);
2163

2164
  code = appendInsertData(pInsertParam, pDataBlock, tbData, pTSchema, tbDataInfo);
2,593,945✔
2165
  QUERY_CHECK_CODE(code, lino, _end);
2,593,945✔
2166

2167
_end:
2,593,945✔
2168
  releaseStreamInsertTableInfo(ppTbInfo);
2,593,945✔
2169
  if (code != TSDB_CODE_SUCCESS) {
2,594,149✔
2170
    ST_TASK_ELOG("buildStreamSubmitReqFromBlock, code:0x%0x, groupId:%" PRId64 " tbname:%s autoCreate:%d", code,
×
2171
                 pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable);
2172
  }
2173
  return code;
2,594,149✔
2174
}
2175

2176
int32_t streamDataBlocksToSubmitReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
2,592,577✔
2177
                                    SStreamDataInserterInfo* pInserterInfo, void** pMsg, int32_t* msgLen,
2178
                                    SVgroupInfo* vgInfo) {
2179
  int32_t code = 0;
2,592,577✔
2180
  int32_t lino = 0;
2,592,577✔
2181

2182
  const SArray*        pBlocks = pInserter->pDataBlocks;
2,592,577✔
2183
  int32_t              sz = taosArrayGetSize(pBlocks);
2,592,577✔
2184
  SSubmitReq2*         pReq = NULL;
2,592,577✔
2185
  SBuildInsertDataInfo tbDataInfo = {0};
2,592,577✔
2186

2187
  int32_t rows = 0;
2,592,577✔
2188
  for (int32_t i = 0; i < sz; i++) {
5,186,726✔
2189
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
2,594,149✔
2190
    if (NULL == pDataBlock) {
2,594,149✔
2191
      stDebug("data block is NULL, just create empty table");
1,930,446✔
2192
      continue;
1,930,446✔
2193
    }
2194
    rows += pDataBlock->info.rows;
663,703✔
2195
  }
2196
  code = initInsertProcessInfo(&tbDataInfo, rows);
2,592,577✔
2197
  if (code != TSDB_CODE_SUCCESS) {
2,592,577✔
2198
    ST_TASK_ELOG("streamDataBlocksToSubmitReq, initInsertDataInfo failed, code:%d", code);
×
2199
    return code;
×
2200
  }
2201

2202
  for (int32_t i = 0; i < sz; i++) {
5,186,726✔
2203
    tbDataInfo.isFirstBlock = (i == 0);
2,594,149✔
2204
    tbDataInfo.isLastBlock = (i == sz - 1);
2,594,149✔
2205
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
2,594,149✔
2206
    ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64
2,594,149✔
2207
            " tbname:%s autoCreate:%d block: %d/%d rows:%" PRId64,
2208
            pInserter, pInserterInfo->groupId, pInserterInfo->tbName,
2209
            pInserterInfo->isAutoCreateTable, i + 1, sz, (pDataBlock != NULL ? pDataBlock->info.rows : 0));
2210
    code = buildStreamSubmitReqFromBlock(pTask, pInserter, pInserterInfo, &pReq, pDataBlock, vgInfo, &tbDataInfo);
2,594,149✔
2211
    QUERY_CHECK_CODE(code, lino, _end);
2,594,149✔
2212
  }
2213

2214
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbDataInfo.pTbData)) {
5,185,154✔
2215
    code = terrno;
×
2216
    QUERY_CHECK_CODE(code, lino, _end);
×
2217
  }
2218

2219
  code = submitReqToMsg(vgInfo->vgId, pReq, pMsg, msgLen);
2,592,577✔
2220
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
2,592,378✔
2221
  taosMemoryFree(pReq);
2,592,204✔
2222
  ST_TASK_DLOG("[data inserter], submit req, vgid:%d, GROUP:%" PRId64 " tbname:%s autoCreate:%d code:%d ", vgInfo->vgId,
2,592,142✔
2223
               pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, code);
2224

2225
_end:
2,590,516✔
2226
  if (code != 0) {
2,592,577✔
2227
    tDestroySubmitTbData(&tbDataInfo.pTbData, TSDB_MSG_FLG_ENCODE);
×
2228
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
2229
    taosMemoryFree(pReq);
×
2230
  }
2231

2232
  return code;
2,592,577✔
2233
}
2234

2235
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
477,851✔
2236
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
477,851✔
2237
  if (!pInserter->explain) {
477,851✔
2238
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
946,936✔
2239
      return terrno;
×
2240
    }
2241
    if (pInserter->isStbInserter) {
473,468✔
2242
      SArray* pMsgs = taosArrayInit(4, sizeof(POINTER_BYTES));
2,268✔
2243
      if (NULL == pMsgs) {
2,268✔
2244
        return terrno;
×
2245
      }
2246
      int32_t code = dataBlocksToSubmitReqArray(pInserter, pMsgs);
2,268✔
2247
      if (code) {
2,268✔
2248
        taosArrayDestroyP(pMsgs, destroySSubmitTbDataSendInfo);
×
2249
        return code;
×
2250
      }
2251
      taosArrayClear(pInserter->pDataBlocks);
2,268✔
2252
      for (int32_t i = 0; i < taosArrayGetSize(pMsgs); ++i) {
6,804✔
2253
        SSubmitTbDataSendInfo* pSendInfo = taosArrayGetP(pMsgs, i);
4,536✔
2254
        code = sendSubmitRequest(pInserter, NULL, pSendInfo->msg.pData, pSendInfo->msg.len,
9,072✔
2255
                                 pInserter->pParam->readHandle->pMsgCb->clientRpc, &pSendInfo->epSet);
4,536✔
2256
        taosMemoryFree(pSendInfo);
4,536✔
2257
        if (code) {
4,536✔
2258
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
2259
            SSubmitTbDataSendInfo* pSendInfo2 = taosArrayGetP(pMsgs, j);
×
2260
            destroySSubmitTbDataSendInfo(pSendInfo2);
×
2261
          }
2262
          taosArrayDestroy(pMsgs);
×
2263
          return code;
×
2264
        }
2265
        QRY_ERR_RET(tsem_wait(&pInserter->ready));
4,536✔
2266

2267
        if (pInserter->submitRes.code) {
4,536✔
2268
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
2269
            SSubmitTbDataSendInfo* pSendInfo2 = taosArrayGetP(pMsgs, j);
×
2270
            destroySSubmitTbDataSendInfo(pSendInfo2);
×
2271
          }
2272
          taosArrayDestroy(pMsgs);
×
2273
          return pInserter->submitRes.code;
×
2274
        }
2275
      }
2276

2277
      taosArrayDestroy(pMsgs);
2,268✔
2278

2279
    } else {
2280
      void*   pMsg = NULL;
471,200✔
2281
      int32_t msgLen = 0;
471,200✔
2282
      int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
471,200✔
2283
      if (code) {
471,200✔
2284
        return code;
7,332✔
2285
      }
2286

2287
      taosArrayClear(pInserter->pDataBlocks);
463,868✔
2288

2289
      code = sendSubmitRequest(pInserter, NULL, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc,
463,868✔
2290
                               &pInserter->pNode->epSet);
463,868✔
2291
      if (code) {
463,868✔
2292
        return code;
×
2293
      }
2294

2295
      QRY_ERR_RET(tsem_wait(&pInserter->ready));
463,868✔
2296

2297
      if (pInserter->submitRes.code) {
463,868✔
2298
        return pInserter->submitRes.code;
×
2299
      }
2300
    }
2301
  }
2302

2303
  *pContinue = true;
470,519✔
2304

2305
  return TSDB_CODE_SUCCESS;
470,519✔
2306
}
2307

2308
static int32_t resetInserterTbVersion(SDataInserterHandle* pInserter, const SInputData* pInput) {
25,259✔
2309
  SInsertTableInfo** ppTbInfo = NULL;
25,259✔
2310
  int32_t           code = getStreamInsertTableInfo(pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId, &ppTbInfo);
25,259✔
2311
  if (code != TSDB_CODE_SUCCESS) {
25,259✔
2312
    return code;
×
2313
  }
2314

2315
  SInsertTableInfo*  pTbInfo  = *ppTbInfo;
25,259✔
2316
  stDebug("resetInserterTbVersion, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbName:%s, uid:%" PRId64 ", version:%d",
25,259✔
2317
          pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId,
2318
          pInput->pStreamDataInserterInfo->tbName, pTbInfo->uid, pTbInfo->version);
2319
  if (pInserter->pParam->streamInserterParam->tbType != TSDB_NORMAL_TABLE) {
25,259✔
2320
    pInserter->pParam->streamInserterParam->sver = pTbInfo->version;
816✔
2321
  }
2322
  code = releaseStreamInsertTableInfo(ppTbInfo);
25,259✔
2323
  return code;
25,259✔
2324
}
2325

2326
static int32_t putStreamDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
2,567,318✔
2327
  int32_t              code = 0;
2,567,318✔
2328
  int32_t              lino = 0;
2,567,318✔
2329
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
2,567,318✔
2330
  SStreamRunnerTask*   pTask = pInput->pTask;
2,567,318✔
2331
  if (!pInserter || !pInserter->pParam || !pInserter->pParam->streamInserterParam) {
2,567,318✔
2332
    ST_TASK_ELOG("putStreamDataBlock invalid param, pInserter: %p, pParam:%p", pInserter,
×
2333
                 pInserter ? pInserter->pParam : NULL);
2334
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2335
  }
2336
  if (!pInserter->explain) {
2,567,318✔
2337
    code = TSDB_CODE_SUCCESS;
2,567,318✔
2338
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
5,134,636✔
2339
      return terrno;
×
2340
    }
2341
    void*       pMsg = NULL;
2,567,318✔
2342
    int32_t     msgLen = 0;
2,567,318✔
2343
    SVgroupInfo vgInfo = {0};
2,567,318✔
2344

2345
    code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
2,567,318✔
2346
    QUERY_CHECK_CODE(code, lino, _return);
2,567,318✔
2347

2348
    code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
2,567,318✔
2349
                             pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
2,567,318✔
2350
    QUERY_CHECK_CODE(code, lino, _return);
2,567,318✔
2351

2352
    code = tsem_wait(&pInserter->ready);
2,567,318✔
2353
    QUERY_CHECK_CODE(code, lino, _return);
2,567,318✔
2354

2355
    if (pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
2,567,318✔
2356
      pInput->pStreamDataInserterInfo->isAutoCreateTable = false;
25,259✔
2357
      code = resetInserterTbVersion(pInserter, pInput);
25,259✔
2358
      QUERY_CHECK_CODE(code, lino, _return);
25,259✔
2359

2360
      code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
25,259✔
2361
      QUERY_CHECK_CODE(code, lino, _return);
25,259✔
2362

2363
      code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
25,259✔
2364
                               pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
25,259✔
2365
      QUERY_CHECK_CODE(code, lino, _return);
25,259✔
2366

2367
      code = tsem_wait(&pInserter->ready);
25,259✔
2368
      QUERY_CHECK_CODE(code, lino, _return);
25,259✔
2369
    }
2370

2371
    if (pInput->pStreamDataInserterInfo->isAutoCreateTable &&
2,567,318✔
2372
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
284,464✔
2373
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
1,572✔
2374
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
1,572✔
2375
                   ", tbName:%s. so reset dbVgInfo and try again",
2376
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2377
      return putStreamDataBlock(pHandle, pInput, pContinue);
1,572✔
2378
    }
2379

2380
    if ((pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_NOT_EXIST &&
2,565,746✔
2381
         !pInput->pStreamDataInserterInfo->isAutoCreateTable) ||
599✔
2382
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
2,565,147✔
2383
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
599✔
2384
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
599✔
2385
                   ", tbName:%s. so reset dbVgInfo",
2386
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2387
      code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
599✔
2388
      QUERY_CHECK_CODE(code, lino, _return);
599✔
2389
    }
2390

2391
    if (pInserter->submitRes.code) {
2,565,147✔
2392
      code = pInserter->submitRes.code;
1,503✔
2393
      ST_TASK_ELOG("submitRes err:%s, code:%0x", tstrerror(pInserter->submitRes.code), pInserter->submitRes.code);
1,503✔
2394
      QUERY_CHECK_CODE(code, lino, _return);
1,503✔
2395
    }
2396

2397
    *pContinue = true;
2,563,644✔
2398

2399
  _return:
2,565,746✔
2400
    taosArrayClear(pInserter->pDataBlocks);
2,565,746✔
2401
    if (code == TSDB_CODE_STREAM_NO_DATA) {
2,565,746✔
2402
      ST_TASK_DLOG("putStreamDataBlock, no valid data to insert, skip this block, groupID:%" PRId64,
×
2403
                   pInput->pStreamDataInserterInfo->groupId);
2404
      code = TSDB_CODE_SUCCESS;
×
2405
    } else if (code) {
2,565,746✔
2406
      ST_TASK_ELOG("submitRes err:%s, code:%0x lino:%d", tstrerror(code), code, lino);
2,102✔
2407
      return code;
2,102✔
2408
    }
2409
    return code;
2,563,644✔
2410
  }
2411
  return TSDB_CODE_SUCCESS;
×
2412
}
2413

2414
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
99,997✔
2415
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
99,997✔
2416
  (void)taosThreadMutexLock(&pInserter->mutex);
99,997✔
2417
  pInserter->queryEnd = true;
99,997✔
2418
  pInserter->useconds = useconds;
99,997✔
2419
  (void)taosThreadMutexUnlock(&pInserter->mutex);
99,997✔
2420
}
99,997✔
2421

2422
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
99,997✔
2423
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
99,997✔
2424
  *pLen = pDispatcher->submitRes.affectedRows;
99,997✔
2425
  qDebug("got total affectedRows %" PRId64, *pLen);
99,997✔
2426
}
99,997✔
2427

2428
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
309,952✔
2429
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
309,952✔
2430
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
309,952✔
2431
  taosArrayDestroy(pInserter->pDataBlocks);
309,952✔
2432
  taosMemoryFree(pInserter->pSchema);
309,952✔
2433
  if (pInserter->pParam->streamInserterParam) {
309,952✔
2434
    destroyStreamInserterParam(pInserter->pParam->streamInserterParam);
202,056✔
2435
    taosMemoryFree(pInserter->pParam->readHandle); // only for stream
202,056✔
2436
  }
2437
  taosMemoryFree(pInserter->pParam);
309,952✔
2438
  taosHashCleanup(pInserter->pCols);
309,952✔
2439
  nodesDestroyNode((SNode*)pInserter->pNode);
309,952✔
2440
  pInserter->pNode = NULL;
309,952✔
2441

2442
  (void)taosThreadMutexDestroy(&pInserter->mutex);
309,952✔
2443

2444
  taosMemoryFree(pInserter->pManager);
309,716✔
2445

2446
  if (pInserter->dbVgInfoMap) {
309,952✔
2447
    taosHashSetFreeFp(pInserter->dbVgInfoMap, freeUseDbOutput_tmp);
2,268✔
2448
    taosHashCleanup(pInserter->dbVgInfoMap);
2,268✔
2449
  }
2450

2451
  if (pInserter->pTagSchema) {
309,952✔
2452
    taosMemoryFreeClear(pInserter->pTagSchema->pSchema);
2,835✔
2453
    taosMemoryFree(pInserter->pTagSchema);
2,835✔
2454
  }
2455

2456
  return TSDB_CODE_SUCCESS;
309,716✔
2457
}
2458

2459
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
2460
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
×
2461

2462
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
2463
  return TSDB_CODE_SUCCESS;
×
2464
}
2465

2466
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
107,329✔
2467
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
107,329✔
2468

2469
  *pFlags = atomic_load_64(&pDispatcher->flags);
107,329✔
2470
  return TSDB_CODE_SUCCESS;
107,329✔
2471
}
2472

2473
int32_t createDataInserter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
107,896✔
2474
                           void* pParam) {
2475
  SDataSinkNode*       pDataSink = *ppDataSink;
107,896✔
2476
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
107,896✔
2477
  if (NULL == inserter) {
107,896✔
2478
    taosMemoryFree(pParam);
×
2479
    goto _return;
×
2480
  }
2481

2482
  SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
107,896✔
2483
  inserter->sink.fPut = putDataBlock;
107,896✔
2484
  inserter->sink.fEndPut = endPut;
107,896✔
2485
  inserter->sink.fGetLen = getDataLength;
107,896✔
2486
  inserter->sink.fGetData = NULL;
107,896✔
2487
  inserter->sink.fDestroy = destroyDataSinker;
107,896✔
2488
  inserter->sink.fGetCacheSize = getCacheSize;
107,896✔
2489
  inserter->sink.fGetFlags = getSinkFlags;
107,896✔
2490
  inserter->pManager = pManager;
107,896✔
2491
  inserter->pNode = pInserterNode;
107,896✔
2492
  inserter->pParam = pParam;
107,896✔
2493
  inserter->status = DS_BUF_EMPTY;
107,896✔
2494
  inserter->queryEnd = false;
107,896✔
2495
  inserter->explain = pInserterNode->explain;
107,896✔
2496
  *ppDataSink = NULL;
107,896✔
2497

2498
  int64_t suid = 0;
107,896✔
2499
  int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
107,896✔
2500
                                                       &inserter->pSchema, &suid, &inserter->pTagSchema);
2501
  if (code) {
107,896✔
2502
    terrno = code;
×
2503
    goto _return;
×
2504
  }
2505

2506
  pManager->pAPI->metaFn.getBasicInfo(inserter->pParam->readHandle->vnode, &inserter->dbFName, NULL, NULL, NULL);
107,896✔
2507

2508
  if (pInserterNode->tableType == TSDB_SUPER_TABLE) {
107,896✔
2509
    inserter->isStbInserter = true;
2,835✔
2510
  }
2511

2512
  if (pInserterNode->stableId != suid) {
107,896✔
2513
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
×
2514
    goto _return;
×
2515
  }
2516

2517
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
107,896✔
2518
  if (NULL == inserter->pDataBlocks) {
107,896✔
2519
    goto _return;
×
2520
  }
2521
  QRY_ERR_JRET(taosThreadMutexInit(&inserter->mutex, NULL));
107,896✔
2522

2523
  inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;
107,896✔
2524

2525
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
107,896✔
2526
                                 false, HASH_NO_LOCK);
2527
  if (NULL == inserter->pCols) {
107,896✔
2528
    goto _return;
×
2529
  }
2530

2531
  SNode*  pNode = NULL;
107,896✔
2532
  int32_t i = 0;
107,896✔
2533
  bool    foundTbname = false;
107,896✔
2534
  FOREACH(pNode, pInserterNode->pCols) {
975,670✔
2535
    if (pNode->type == QUERY_NODE_FUNCTION && ((SFunctionNode*)pNode)->funcType == FUNCTION_TYPE_TBNAME) {
867,774✔
2536
      int16_t colId = 0;
2,268✔
2537
      int16_t slotId = 0;
2,268✔
2538
      QRY_ERR_JRET(taosHashPut(inserter->pCols, &colId, sizeof(colId), &slotId, sizeof(slotId)));
2,268✔
2539
      foundTbname = true;
2,268✔
2540
      continue;
2,268✔
2541
    }
2542
    SColumnNode* pCol = (SColumnNode*)pNode;
865,506✔
2543
    QRY_ERR_JRET(taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId)));
865,506✔
2544
    if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) {
865,506✔
2545
      inserter->fullOrderColList = false;
1,134✔
2546
    }
2547
    ++i;
865,506✔
2548
  }
2549

2550
  if (inserter->isStbInserter && !foundTbname) {
107,896✔
2551
    QRY_ERR_JRET(TSDB_CODE_PAR_TBNAME_ERROR);
567✔
2552
  }
2553

2554
  QRY_ERR_JRET(tsem_init(&inserter->ready, 0, 0));
107,329✔
2555

2556
  inserter->dbVgInfoMap = NULL;
107,329✔
2557

2558
  *pHandle = inserter;
107,329✔
2559
  return TSDB_CODE_SUCCESS;
107,329✔
2560

2561
_return:
567✔
2562

2563
  if (inserter) {
567✔
2564
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
567✔
2565
    taosMemoryFree(inserter);
567✔
2566
  } else {
2567
    taosMemoryFree(pManager);
×
2568
  }
2569

2570
  nodesDestroyNode((SNode*)*ppDataSink);
567✔
2571
  *ppDataSink = NULL;
567✔
2572

2573
  return terrno;
567✔
2574
}
2575

2576
                           
2577
static TdThreadOnce g_dbVgInfoMgrInit = PTHREAD_ONCE_INIT;
2578

2579
SDBVgInfoMgr g_dbVgInfoMgr = {0};
2580
                           
2581
void dbVgInfoMgrInitOnce() {
14,203✔
2582
  g_dbVgInfoMgr.dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
14,203✔
2583
  if (g_dbVgInfoMgr.dbVgInfoMap == NULL) {
14,203✔
2584
    stError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
2585
    return;
×
2586
  }
2587

2588
  taosHashSetFreeFp(g_dbVgInfoMgr.dbVgInfoMap, freeUseDbOutput_tmp);
14,203✔
2589
}
2590

2591

2592

2593
int32_t createStreamDataInserter(SDataSinkManager* pManager, DataSinkHandle* pHandle, void* pParam) {
201,826✔
2594
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
201,826✔
2595

2596
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
201,826✔
2597
  TSDB_CHECK_NULL(g_dbVgInfoMgr.dbVgInfoMap, code, lino, _exit, terrno);
202,056✔
2598

2599
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
202,056✔
2600
  TSDB_CHECK_NULL(inserter, code, lino, _exit, terrno);
202,056✔
2601

2602
  inserter->sink.fPut = putStreamDataBlock;
202,056✔
2603
  inserter->sink.fEndPut = endPut;
202,056✔
2604
  inserter->sink.fGetLen = getDataLength;
202,056✔
2605
  inserter->sink.fGetData = NULL;
202,056✔
2606
  inserter->sink.fDestroy = destroyDataSinker;
202,056✔
2607
  inserter->sink.fGetCacheSize = getCacheSize;
202,056✔
2608
  inserter->sink.fGetFlags = getSinkFlags;
202,056✔
2609
  inserter->pManager = pManager;
202,056✔
2610
  inserter->pNode = NULL;
202,056✔
2611
  inserter->pParam = pParam;
202,056✔
2612
  inserter->status = DS_BUF_EMPTY;
202,056✔
2613
  inserter->queryEnd = false;
202,056✔
2614
  inserter->explain = false;
202,056✔
2615

2616
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
202,056✔
2617
  TSDB_CHECK_NULL(inserter->pDataBlocks, code, lino, _exit, terrno);
202,056✔
2618
  
2619
  TAOS_CHECK_EXIT(taosThreadMutexInit(&inserter->mutex, NULL));
202,056✔
2620
  TAOS_CHECK_EXIT(tsem_init(&inserter->ready, 0, 0));
202,056✔
2621

2622
  inserter->dbVgInfoMap = NULL;
202,056✔
2623

2624
  *pHandle = inserter;
202,056✔
2625
  return TSDB_CODE_SUCCESS;
202,056✔
2626

2627
_exit:
×
2628

2629
  if (inserter) {
×
2630
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
×
2631
    taosMemoryFree(inserter);
×
2632
  } else {
2633
    taosMemoryFree(pManager);
×
2634
  }
2635

2636
  if (code) {
×
2637
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2638
  }
2639

2640
  return code;
×
2641
}
2642

2643
int32_t getDbVgInfoByTbName(void* clientRpc, const char* dbFName, SDBVgInfo** dbVgInfo) {
2,594,384✔
2644
  int32_t       code = TSDB_CODE_SUCCESS;
2,594,384✔
2645
  int32_t       line = 0;
2,594,384✔
2646
  SUseDbOutput* output = NULL;
2,594,384✔
2647

2648
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
2,594,384✔
2649

2650
  if (find == NULL) {
2,594,384✔
2651
    output = taosMemoryCalloc(1, sizeof(SUseDbOutput));
36,392✔
2652
    if (output == NULL) {
36,392✔
2653
      return TSDB_CODE_OUT_OF_MEMORY;
×
2654
    }
2655

2656
    code = buildDbVgInfoMap(clientRpc, dbFName, output);
36,392✔
2657
    QUERY_CHECK_CODE(code, line, _return);
36,392✔
2658

2659
    code = taosHashPut(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
36,392✔
2660
    if (code == TSDB_CODE_DUP_KEY) {
36,392✔
2661
      code = TSDB_CODE_SUCCESS;
5,251✔
2662
      // another thread has put the same dbFName, so we need to free the output
2663
      freeUseDbOutput_tmp(&output);
5,251✔
2664
      find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
5,251✔
2665
      if (find == NULL) {
5,251✔
2666
        QUERY_CHECK_CODE(code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, line, _return);
×
2667
      }
2668
      output = *find;
5,251✔
2669
    }
2670
    QUERY_CHECK_CODE(code, line, _return);
36,392✔
2671
  } else {
2672
    output = *find;
2,557,992✔
2673
  }
2674

2675
  *dbVgInfo = output->dbVgroup;
2,594,384✔
2676
  return code;
2,594,384✔
2677

2678
_return:
×
2679
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2680
  freeUseDbOutput_tmp(&output);
×
2681
  return code;
×
2682
}
2683

2684
int32_t getDbVgInfoForExec(void* clientRpc, const char* dbFName, const char* tbName, SVgroupInfo* pVgInfo) {
2,594,180✔
2685
  SDBVgInfo* dbInfo = NULL;
2,594,180✔
2686
  int32_t code = 0, lino = 0;
2,594,384✔
2687
  char tbFullName[TSDB_TABLE_FNAME_LEN];
2,592,323✔
2688
  snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbFName, tbName);
2,594,384✔
2689
  
2690
  taosRLockLatch(&g_dbVgInfoMgr.lock);
2,594,384✔
2691
  
2692
  TAOS_CHECK_EXIT(getDbVgInfoByTbName(clientRpc, dbFName, &dbInfo));
2,594,384✔
2693

2694
  TAOS_CHECK_EXIT(inserterGetVgInfo(dbInfo, tbFullName, pVgInfo));
2,594,384✔
2695

2696
_exit:
2,594,180✔
2697

2698
  taosRUnLockLatch(&g_dbVgInfoMgr.lock);
2,594,180✔
2699

2700
  if (code) {
2,594,180✔
2701
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2702
  }
2703

2704
  return code;
2,594,180✔
2705
}
2706

2707
void rmDbVgInfoFromCache(const char* dbFName) {
2,171✔
2708
  taosWLockLatch(&g_dbVgInfoMgr.lock);
2,171✔
2709

2710
  TAOS_UNUSED(taosHashRemove(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName)));
2,171✔
2711

2712
  taosWUnLockLatch(&g_dbVgInfoMgr.lock);
2,171✔
2713
}
2,171✔
2714

2715
static int32_t dropTableReqToMsg(int32_t vgId, SVDropTbBatchReq* pReq, void** pData, int32_t* pLen) {
235✔
2716
  int32_t code = TSDB_CODE_SUCCESS;
235✔
2717
  int32_t len = 0;
235✔
2718
  void*   pBuf = NULL;
235✔
2719
  tEncodeSize(tEncodeSVDropTbBatchReq, pReq, len, code);
235✔
2720
  if (TSDB_CODE_SUCCESS == code) {
235✔
2721
    SEncoder encoder;
235✔
2722
    len += sizeof(SMsgHead);
235✔
2723
    pBuf = taosMemoryMalloc(len);
235✔
2724
    if (NULL == pBuf) {
235✔
2725
      return terrno;
×
2726
    }
2727
    ((SDropTbDataMsg*)pBuf)->header.vgId = htonl(vgId);
235✔
2728
    ((SDropTbDataMsg*)pBuf)->header.contLen = htonl(len);
235✔
2729
    //((SDropTbDataMsg*)pBuf)->pData = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
2730
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
235✔
2731
    code = tEncodeSVDropTbBatchReq(&encoder, pReq);
235✔
2732
    tEncoderClear(&encoder);
235✔
2733
  }
2734

2735
  if (TSDB_CODE_SUCCESS == code) {
235✔
2736
    *pData = pBuf;
235✔
2737
    *pLen = len;
235✔
2738
  } else {
2739
    taosMemoryFree(pBuf);
×
2740
  }
2741

2742
  return code;
235✔
2743
}
2744

2745
int32_t dropTbCallback(void* param, SDataBuf* pMsg, int32_t code) {
235✔
2746
  SDropTbCtx* pCtx = (SDropTbCtx*)param;
235✔
2747
  if (code) {
235✔
2748
    stError("dropTbCallback, code:%d, stream:%" PRId64 " gid:%" PRId64, code, pCtx->req->streamId, pCtx->req->gid);
×
2749
  }
2750
  pCtx->code = code;
235✔
2751
  code = tsem_post(&pCtx->ready);
235✔
2752
  taosMemoryFree(pMsg->pData);
235✔
2753

2754
  return TSDB_CODE_SUCCESS;
235✔
2755
}
2756

2757
static int32_t sendDropTbRequest(SDropTbCtx* ctx, void* pMsg, int32_t msgLen, void* pTransporter, SEpSet* pEpset) {
235✔
2758
  // send the fetch remote task result reques
2759
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
235✔
2760
  if (NULL == pMsgSendInfo) {
235✔
2761
    return terrno;
×
2762
  }
2763

2764
  pMsgSendInfo->param = ctx;
235✔
2765
  pMsgSendInfo->paramFreeFp = NULL;
235✔
2766
  pMsgSendInfo->msgInfo.pData = pMsg;
235✔
2767
  pMsgSendInfo->msgInfo.len = msgLen;
235✔
2768
  pMsgSendInfo->msgType = TDMT_VND_SNODE_DROP_TABLE;
235✔
2769
  pMsgSendInfo->fp = dropTbCallback;
235✔
2770

2771
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
235✔
2772
}
2773

2774
int32_t doDropStreamTable(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq) {
235✔
2775
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
235✔
2776
  int32_t                  code = 0;
235✔
2777
  int32_t                  lino = 0;
235✔
2778
  SVDropTbBatchReq         req = {.nReqs = 1};
235✔
2779
  SVDropTbReq*             pDropReq = NULL;
235✔
2780
  int32_t                  msgLen = 0;
235✔
2781
  tsem_t*                  pSem = NULL;
235✔
2782
  SDropTbDataMsg*          pMsg = NULL;
235✔
2783

2784
  SInsertTableInfo** ppTbInfo = NULL;
235✔
2785
  int32_t            vgId = 0;
235✔
2786

2787
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
235✔
2788
  if (!req.pArray) return terrno;
235✔
2789

2790
  pDropReq = taosArrayGet(req.pArray, 0);
235✔
2791

2792
  code = getStreamInsertTableInfo(pReq->streamId, pReq->gid, &ppTbInfo);
235✔
2793
  if (TSDB_CODE_SUCCESS == code) {
235✔
2794
    pDropReq->name = taosStrdup((*ppTbInfo)->tbname);
235✔
2795
    pDropReq->suid = (*ppTbInfo)->uid;
235✔
2796
    pDropReq->uid = (*ppTbInfo)->uid;
235✔
2797
    pDropReq->igNotExists = true;
235✔
2798
    vgId = (*ppTbInfo)->vgid;
235✔
2799

2800
    int64_t key[2] = {pReq->streamId, pReq->gid};
235✔
2801
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
235✔
2802
  } else {
2803
    code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
2804
  }
2805
  QUERY_CHECK_CODE(code, lino, _end);
235✔
2806

2807
  code = dropTableReqToMsg(vgId, &req, (void**)&pMsg, &msgLen);
235✔
2808
  QUERY_CHECK_CODE(code, lino, _end);
235✔
2809

2810
  SVgroupInfo vgInfo = {0};
235✔
2811
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
235✔
2812
  QUERY_CHECK_CODE(code, lino, _end);
235✔
2813

2814
  SDropTbCtx ctx = {.req = pReq};
235✔
2815
  code = tsem_init(&ctx.ready, 0, 0);
235✔
2816
  QUERY_CHECK_CODE(code, lino, _end);
235✔
2817
  pSem = &ctx.ready;
235✔
2818

2819
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
235✔
2820
  QUERY_CHECK_CODE(code, lino, _end);
235✔
2821
  pMsg = NULL;  // now owned by sendDropTbRequest
235✔
2822

2823
  code = tsem_wait(&ctx.ready);
235✔
2824
  code = ctx.code;
235✔
2825
  stDebug("doDropStreamTable,  code:0x%" PRIx32 " req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
235✔
2826
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2827

2828
_end:
235✔
2829
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND) {
235✔
2830
    stError("doDropStreamTable, code:0x%" PRIx32 ", streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2831
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2832
    if (pMsg) {
×
2833
      taosMemoryFreeClear(pMsg);
×
2834
    }
2835
  }
2836
  if (pSem) tsem_destroy(pSem);
235✔
2837
  if (pDropReq && pDropReq->name) taosMemoryFreeClear(pDropReq->name);
235✔
2838
  if (ppTbInfo) releaseStreamInsertTableInfo(ppTbInfo);
235✔
2839
  taosArrayDestroy(req.pArray);
235✔
2840

2841
  return code;
235✔
2842
}
2843

2844
int32_t doDropStreamTableByTbName(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
2845
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
×
2846
  int32_t                  code = 0;
×
2847
  int32_t                  lino = 0;
×
2848
  SVDropTbBatchReq         req = {.nReqs = 1};
×
2849
  SVDropTbReq*             pDropReq = NULL;
×
2850
  int32_t                  msgLen = 0;
×
2851
  tsem_t*                  pSem = NULL;
×
2852
  SDropTbDataMsg*          pMsg = NULL;
×
2853

2854
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
×
2855

2856
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
×
2857
  if (!req.pArray) return terrno;
×
2858

2859
  pDropReq = taosArrayGet(req.pArray, 0);
×
2860

2861
  pDropReq->name = tbName;
×
2862
  pDropReq->igNotExists = true;
×
2863

2864
  int64_t key[2] = {pReq->streamId, pReq->gid};
×
2865
  TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
×
2866

2867
  SVgroupInfo vgInfo = {0};
×
2868
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
×
2869
  QUERY_CHECK_CODE(code, lino, _end);
×
2870

2871
  code = dropTableReqToMsg(vgInfo.vgId, &req, (void**)&pMsg, &msgLen);
×
2872
  QUERY_CHECK_CODE(code, lino, _end);
×
2873

2874
  SDropTbCtx ctx = {.req = pReq};
×
2875
  code = tsem_init(&ctx.ready, 0, 0);
×
2876
  QUERY_CHECK_CODE(code, lino, _end);
×
2877
  pSem = &ctx.ready;
×
2878

2879
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
×
2880
  QUERY_CHECK_CODE(code, lino, _end);
×
2881
  pMsg = NULL;  // now owned by sendDropTbRequest
×
2882

2883
  code = tsem_wait(&ctx.ready);
×
2884
  code = ctx.code;
×
2885
  stDebug("doDropStreamTableByTbName,  code:%d req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
×
2886
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2887

2888
_end:
×
2889
  if (code != TSDB_CODE_SUCCESS) {
×
2890
    stError("doDropStreamTableByTbName, code:%d, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2891
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2892
    if (pMsg) {
×
2893
      taosMemoryFreeClear(pMsg);
×
2894
    }
2895
  }
2896
  if (pSem) tsem_destroy(pSem);
×
2897
  taosArrayDestroy(req.pArray);
×
2898

2899
  return code;
×
2900
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc