• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4875

09 Dec 2025 01:22AM UTC coverage: 64.472% (-0.2%) from 64.623%
#4875

push

travis-ci

guanshengliang
fix: temporarily disable memory leak detection for UDF tests (#33856)

162014 of 251293 relevant lines covered (64.47%)

104318075.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.17
/source/libs/executor/src/dataInserter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdint.h>
17
#include <stdlib.h>
18
#include <string.h>
19
#include "dataSinkInt.h"
20
#include "dataSinkMgt.h"
21
#include "executor.h"
22
#include "executorInt.h"
23
#include "functionMgt.h"
24
#include "libs/new-stream/stream.h"
25
#include "osAtomic.h"
26
#include "osMemPool.h"
27
#include "osMemory.h"
28
#include "osSemaphore.h"
29
#include "planner.h"
30
#include "query.h"
31
#include "querytask.h"
32
#include "storageapi.h"
33
#include "taoserror.h"
34
#include "tarray.h"
35
#include "tcompression.h"
36
#include "tdatablock.h"
37
#include "tdataformat.h"
38
#include "tglobal.h"
39
#include "thash.h"
40
#include "tmsg.h"
41
#include "tqueue.h"
42

43
extern SDataSinkStat gDataSinkStat;
44
SHashObj*            gStreamGrpTableHash = NULL;
45
typedef struct SSubmitRes {
46
  int64_t      affectedRows;
47
  int32_t      code;
48
  SSubmitRsp2* pRsp;
49
} SSubmitRes;
50

51
typedef struct SSubmitTbDataMsg {
52
  int32_t vgId;
53
  int32_t len;
54
  void*   pData;
55
} SSubmitTbDataMsg;
56

57
static void destroySSubmitTbDataMsg(void* p) {
×
58
  if (p == NULL) return;
×
59
  SSubmitTbDataMsg* pVg = p;
×
60
  taosMemoryFree(pVg->pData);
×
61
  taosMemoryFree(pVg);
×
62
}
63

64
typedef struct SDataInserterHandle {
65
  SDataSinkHandle     sink;
66
  SDataSinkManager*   pManager;
67
  STSchema*           pSchema;
68
  SQueryInserterNode* pNode;
69
  SSubmitRes          submitRes;
70
  SInserterParam*     pParam;
71
  SArray*             pDataBlocks;
72
  SHashObj*           pCols;
73
  int32_t             status;
74
  bool                queryEnd;
75
  bool                fullOrderColList;
76
  uint64_t            useconds;
77
  uint64_t            cachedSize;
78
  uint64_t            flags;
79
  TdThreadMutex       mutex;
80
  tsem_t              ready;
81
  bool                explain;
82
  bool                isStbInserter;
83
  SSchemaWrapper*     pTagSchema;
84
  const char*         dbFName;
85
  SHashObj*           dbVgInfoMap;
86
  SUseDbRsp*          pRsp;
87
} SDataInserterHandle;
88

89
typedef struct SSubmitRspParam {
90
  SDataInserterHandle* pInserter;
91
  void*                putParam;
92
} SSubmitRspParam;
93

94
typedef struct SBuildInsertDataInfo {
95
  SSubmitTbData  pTbData;
96
  bool           isFirstBlock;
97
  bool           isLastBlock;
98
  int64_t        lastTs;
99
  bool           needSortMerge;
100
} SBuildInsertDataInfo;
101

102
typedef struct SDropTbCtx {
103
  SSTriggerDropRequest* req;
104
  tsem_t                ready;
105
  int32_t               code;
106
} SDropTbCtx;
107
typedef struct SDropTbDataMsg {
108
  SMsgHead header;
109
  void*    pData;
110
} SDropTbDataMsg;
111

112
typedef struct SRunnerDropTableInfo {
113
  SSTriggerDropRequest* pReq;
114
  int32_t               code;
115
} SRunnerDropTableInfo;
116

117
static int32_t initInsertProcessInfo(SBuildInsertDataInfo* pBuildInsertDataInfo, int32_t rows) {
5,021,326✔
118
  pBuildInsertDataInfo->isLastBlock = false;
5,021,326✔
119
  pBuildInsertDataInfo->lastTs = TSKEY_MIN;
5,022,499✔
120
  pBuildInsertDataInfo->isFirstBlock = true;
5,022,016✔
121
  pBuildInsertDataInfo->needSortMerge = false;
5,022,863✔
122

123
  if (!(pBuildInsertDataInfo->pTbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
5,021,977✔
124
    return terrno;
×
125
  }
126

127
  return TSDB_CODE_SUCCESS;
5,022,499✔
128
}
129

130
static void freeCacheTbInfo(void* pp) {
445,903✔
131
  if (pp == NULL || *(SInsertTableInfo**)pp == NULL) {
445,903✔
132
    return;
×
133
  }
134
  SInsertTableInfo* pTbInfo = *(SInsertTableInfo**)pp;
445,903✔
135
  if (pTbInfo->tbname) {
445,903✔
136
    taosMemFree(pTbInfo->tbname);
445,903✔
137
    pTbInfo->tbname = NULL;
445,903✔
138
  }
139
  if (pTbInfo->pSchema) {
445,903✔
140
    tDestroyTSchema(pTbInfo->pSchema);
445,903✔
141
    pTbInfo->pSchema = NULL;
445,903✔
142
  }
143
  taosMemoryFree(pTbInfo);
445,903✔
144
}
145

146
int32_t initInserterGrpInfo() {
680,207✔
147
  gStreamGrpTableHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
680,207✔
148
  if (NULL == gStreamGrpTableHash) {
680,207✔
149
    qError("failed to create stream group table hash");
×
150
    return terrno;
×
151
  }
152
  taosHashSetFreeFp(gStreamGrpTableHash, freeCacheTbInfo);
680,207✔
153
  return TSDB_CODE_SUCCESS;
680,207✔
154
}
155

156
void destroyInserterGrpInfo() {
680,207✔
157
  static int8_t destoryGrpInfo = 0;
158
  int8_t        flag = atomic_val_compare_exchange_8(&destoryGrpInfo, 0, 1);
680,207✔
159
  if (flag != 0) {
680,207✔
160
    return;
×
161
  }
162
  if (NULL != gStreamGrpTableHash) {
680,207✔
163
    taosHashCleanup(gStreamGrpTableHash);
680,207✔
164
    gStreamGrpTableHash = NULL;
680,207✔
165
  }
166
}
167

168
static int32_t checkResAndResetTableInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* res,
441,210✔
169
                                         bool* pSchemaChaned) {
170
  int32_t code = TSDB_CODE_SUCCESS;
441,210✔
171
  if (!pSubmitRes->pRsp) {
441,210✔
172
    stError("create table response is NULL");
×
173
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
174
  }
175
  if (pSubmitRes->pRsp->aCreateTbRsp->size < 1) {
441,210✔
176
    stError("create table response size is less than 1");
×
177
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
178
  }
179
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
441,210✔
180
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
441,210✔
181
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
182
    return pCreateTbRsp->code;
×
183
  }
184
  if (!pCreateTbRsp->pMeta || pCreateTbRsp->pMeta->tuid == 0) {
441,979✔
185
    stError("create table can not get tuid");
×
186
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
187
  }
188

189
  *pSchemaChaned = false;
441,210✔
190
  res->vgid = pCreateTbRsp->pMeta->vgId;
441,210✔
191
  res->uid = pCreateTbRsp->pMeta->tuid;
441,210✔
192

193
  if (pCreateTbRsp->pMeta->sversion != 0 && res->version != pCreateTbRsp->pMeta->sversion) {
441,979✔
194
    *pSchemaChaned = true;
798✔
195
  }
196

197
  stDebug("inserter callback, uid:%" PRId64 "  vgid: %" PRId64 ", version: %d", res->uid, res->vgid, res->version);
441,210✔
198

199
  return TSDB_CODE_SUCCESS;
441,979✔
200
}
201

202
static int32_t createNewInsertTbInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* pOldInsertTbInfo,
798✔
203
                                     SInsertTableInfo** ppNewInsertTbInfo) {
204
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
798✔
205
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
798✔
206
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
207
    return pCreateTbRsp->code;
×
208
  }
209

210
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
798✔
211
  if (res == NULL) {
798✔
212
    return terrno;
×
213
  }
214
  res->tbname = taosStrdup(pOldInsertTbInfo->tbname);
798✔
215
  if (res->tbname == NULL) {
798✔
216
    taosMemoryFree(res);
×
217
    stError("failed to allocate memory for table name");
×
218
    return terrno;
×
219
  }
220

221
  res->vgid = pCreateTbRsp->pMeta->vgId;
798✔
222

223
  res->uid = pCreateTbRsp->pMeta->tuid;
798✔
224
  res->vgid = pCreateTbRsp->pMeta->vgId;
798✔
225

226
  res->version = pCreateTbRsp->pMeta->sversion;
798✔
227
  res->pSchema = tBuildTSchema(pCreateTbRsp->pMeta->pSchemas, pCreateTbRsp->pMeta->numOfColumns, res->version);
798✔
228
  if (res->pSchema == NULL) {
798✔
229
    stError("failed to build schema for table:%s, uid:%" PRId64 ", vgid:%" PRId64 ", version:%d", res->tbname, res->uid,
×
230
            res->vgid, res->version);
231
    return terrno;
×
232
  }
233
  *ppNewInsertTbInfo = res;
798✔
234
  return TSDB_CODE_SUCCESS;
798✔
235
}
236

237
static int32_t updateInsertGrpTableInfo(SStreamDataInserterInfo* pInserterInfo, const SSubmitRes* pSubmitRes) {
441,210✔
238
  int32_t            code = TSDB_CODE_SUCCESS;
441,210✔
239
  int32_t            lino = 0;
441,210✔
240
  int64_t            key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
441,210✔
241
  SInsertTableInfo** ppTbRes = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
441,210✔
242
  if (NULL == ppTbRes || *ppTbRes == NULL) {
441,979✔
243
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
244
  }
245

246
  bool schemaChanged = false;
441,979✔
247
  code = checkResAndResetTableInfo(pSubmitRes, *ppTbRes, &schemaChanged);
441,979✔
248
  QUERY_CHECK_CODE(code, lino, _exit);
441,979✔
249

250
  if (schemaChanged) {
441,979✔
251
    SInsertTableInfo* pNewInfo = NULL;
798✔
252
    code = createNewInsertTbInfo(pSubmitRes, *ppTbRes, &pNewInfo);
798✔
253
    QUERY_CHECK_CODE(code, lino, _exit);
798✔
254

255
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
798✔
256

257
    code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &pNewInfo, sizeof(SInsertTableInfo*));
798✔
258

259
    if (code == TSDB_CODE_DUP_KEY) {
798✔
260
      freeCacheTbInfo(&pNewInfo);
×
261
      code = TSDB_CODE_SUCCESS;
×
262
      goto _exit;
×
263
    } else if (code != TSDB_CODE_SUCCESS) {
798✔
264
      freeCacheTbInfo(&pNewInfo);
×
265
      stError("failed to put new insert tbInfo for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
266
              pInserterInfo->streamId, pInserterInfo->groupId, code);
267
      QUERY_CHECK_CODE(code, lino, _exit);
×
268
    }
269

270
    stInfo("update table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", uid:%" PRId64 ", vgid:%" PRId64
798✔
271
           ", version:%d",
272
           pInserterInfo->streamId, pInserterInfo->groupId, pNewInfo->uid, pNewInfo->vgid, pNewInfo->version);
273
  }
274
  return TSDB_CODE_SUCCESS;
441,979✔
275

276
_exit:
×
277
  if (code != TSDB_CODE_SUCCESS) {
×
278
    stError("failed to check and reset table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
279
            pInserterInfo->streamId, pInserterInfo->groupId, code);
280
  }
281
  taosHashRelease(gStreamGrpTableHash, ppTbRes);
×
282
  return code;
×
283
}
284

285
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema);
286
static int32_t initTableInfo(SDataInserterHandle* pInserter, SStreamDataInserterInfo* pInserterInfo) {
445,105✔
287
  int32_t           code = TSDB_CODE_SUCCESS;
445,105✔
288
  int32_t           lino = 0;
445,105✔
289
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
445,105✔
290
  if (res == NULL) {
445,105✔
291
    return terrno;
×
292
  }
293

294
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
445,105✔
295
  res->uid = 0;
445,105✔
296
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
445,105✔
297
    res->version = 1;
126,868✔
298
  } else {
299
    res->version = pInsertParam->sver;
318,237✔
300
  }
301

302
  res->tbname = taosStrdup(pInserterInfo->tbName);
445,105✔
303
  if (res->tbname == NULL) {
445,105✔
304
    taosMemoryFree(res);
×
305
    stError("failed to allocate memory for table name");
×
306
    return terrno;
×
307
  }
308

309
  code = buildTSchmaFromInserter(pInserter->pParam->streamInserterParam, &res->pSchema);
445,105✔
310
  QUERY_CHECK_CODE(code, lino, _return);
444,318✔
311

312
  int64_t key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
444,318✔
313
  code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &res, sizeof(SInsertTableInfo*));
444,741✔
314
  if (code == TSDB_CODE_DUP_KEY) {
445,105✔
315
    freeCacheTbInfo(&res);
107,516✔
316
    return TSDB_CODE_SUCCESS;
107,516✔
317
  }
318

319
_return:
337,589✔
320
  if (code != TSDB_CODE_SUCCESS) {
337,589✔
321
    stError("failed to build table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
322
            pInserterInfo->streamId, pInserterInfo->groupId, code);
323
    freeCacheTbInfo(&res);
×
324
  }
325
  return code;
337,589✔
326
}
327

328
static bool colsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
28,071✔
329
  SArray* pCreatingFields = pInserterParam->pFields;
28,071✔
330

331
  for (int32_t i = 0; i < pCreatingFields->size; ++i) {
164,497✔
332
    SFieldWithOptions* pField = taosArrayGet(pCreatingFields, i);
136,028✔
333
    if (NULL == pField) {
136,028✔
334
      stError("isSupportedSTableSchema: failed to get field from array");
×
335
      return false;
×
336
    }
337

338
    for (int j = 0; j < pTableMetaRsp->numOfColumns; ++j) {
413,940✔
339
      if (strncmp(pTableMetaRsp->pSchemas[j].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
410,350✔
340
        if (pTableMetaRsp->pSchemas[j].type == pField->type && pTableMetaRsp->pSchemas[j].bytes == pField->bytes) {
133,234✔
341
          break;
342
        } else {
343
          return false;
×
344
        }
345
      }
346
    }
347
  }
348
  return true;
28,469✔
349
}
350

351
static bool TagsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
1,596✔
352
  SArray* pCreatingTags = pInserterParam->pTagFields;
1,596✔
353

354
  int32_t            tagIndexOffset = -1;
1,596✔
355
  SFieldWithOptions* pField = taosArrayGet(pCreatingTags, 0);
1,596✔
356
  if (NULL == pField) {
1,596✔
357
    stError("isSupportedSTableSchema: failed to get field from array");
×
358
    return false;
×
359
  }
360
  for (int32_t i = 0; i < pTableMetaRsp->numOfColumns + pTableMetaRsp->numOfTags; ++i) {
1,596✔
361
    if (strncmp(pTableMetaRsp->pSchemas[i].name, pField->name, TSDB_COL_NAME_LEN) != 0) {
798✔
362
      tagIndexOffset = i;
798✔
363
      break;
798✔
364
    }
365
  }
366
  if (tagIndexOffset == -1) {
1,596✔
367
    stError("isSupportedSTableSchema: failed to get tag index");
798✔
368
    return false;
798✔
369
  }
370

371
  for (int32_t i = 0; i < pTableMetaRsp->numOfTags; ++i) {
1,596✔
372
    int32_t            index = i + tagIndexOffset;
798✔
373
    SFieldWithOptions* pField = taosArrayGet(pCreatingTags, i);
798✔
374
    if (NULL == pField) {
798✔
375
      stError("isSupportedSTableSchema: failed to get field from array");
×
376
      return false;
×
377
    }
378

379
    for(int32_t j = 0; j < pTableMetaRsp->numOfTags; ++j) {
1,596✔
380
      if (strncmp(pTableMetaRsp->pSchemas[index].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
798✔
381
        if (pTableMetaRsp->pSchemas[index].type == pField->type &&
×
382
            pTableMetaRsp->pSchemas[index].bytes == pField->bytes) {
×
383
          break;
384
        } else {
385
          return false;
×
386
        }
387
      }
388
    }
389
  }
390
  return true;
798✔
391
}
392

393
static bool isSupportedSTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
1,596✔
394
  if (!colsIsSupported(pTableMetaRsp, pInserterParam)) {
1,596✔
395
    return false;
×
396
  }
397
  if (!TagsIsSupported(pTableMetaRsp, pInserterParam)) {
1,596✔
398
    return false;
798✔
399
  }
400
  return true;
798✔
401
}
402

403
static bool isSupportedNTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
26,873✔
404
  return colsIsSupported(pTableMetaRsp, pInserterParam);
26,873✔
405
}
406

407
static int32_t checkAndSaveCreateGrpTableInfo(SDataInserterHandle*     pInserthandle,
28,469✔
408
                                              SStreamDataInserterInfo* pInserterInfo) {
409
  int32_t     code = TSDB_CODE_SUCCESS;
28,469✔
410
  SSubmitRes* pSubmitRes = &pInserthandle->submitRes;
28,469✔
411
  int8_t      tbType = pInserthandle->pParam->streamInserterParam->tbType;
28,469✔
412

413
  SVCreateTbRsp*        pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
28,469✔
414
  SSchema*              pExistRow = pCreateTbRsp->pMeta->pSchemas;
28,469✔
415
  SStreamInserterParam* pInserterParam = pInserthandle->pParam->streamInserterParam;
28,469✔
416

417
  if (tbType == TSDB_CHILD_TABLE || tbType == TSDB_SUPER_TABLE) {
28,071✔
418
    if (!isSupportedSTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
1,198✔
419
      stError("create table failed, schema is not supported");
798✔
420
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
798✔
421
    }
422
  } else if (tbType == TSDB_NORMAL_TABLE) {
26,873✔
423
    if (!isSupportedNTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
26,873✔
424
      stError("create table failed, schema is not supported");
×
425
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
×
426
    }
427
  } else {
428
    stError("checkAndSaveCreateGrpTableInfo failed, tbType:%d is not supported", tbType);
×
429
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
430
  }
431

432
  return updateInsertGrpTableInfo(pInserterInfo, pSubmitRes);
27,671✔
433
}
434

435
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
5,078,446✔
436
  SSubmitRspParam*     pParam = (SSubmitRspParam*)param;
5,078,446✔
437
  SDataInserterHandle* pInserter = pParam->pInserter;
5,078,446✔
438
  int32_t              code2 = 0;
5,078,446✔
439

440
  if (code) {
5,078,446✔
441
    pInserter->submitRes.code = code;
33,720✔
442
  } else {
443
    pInserter->submitRes.code = TSDB_CODE_SUCCESS;
5,044,726✔
444
  }
445
  SDecoder coder = {0};
5,078,446✔
446

447
  if (code == TSDB_CODE_SUCCESS) {
5,078,446✔
448
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
5,044,726✔
449
    if (NULL == pInserter->submitRes.pRsp) {
5,044,726✔
450
      pInserter->submitRes.code = terrno;
×
451
      goto _return;
×
452
    }
453

454
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
5,044,726✔
455
    code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
5,044,726✔
456
    if (code) {
5,043,957✔
457
      tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
458
      taosMemoryFree(pInserter->submitRes.pRsp);
×
459
      pInserter->submitRes.code = code;
×
460
      goto _return;
×
461
    }
462

463
    if (pInserter->submitRes.pRsp->affectedRows > 0) {
5,043,957✔
464
      SArray* pCreateTbList = pInserter->submitRes.pRsp->aCreateTbRsp;
836,776✔
465
      int32_t numOfTables = taosArrayGetSize(pCreateTbList);
836,007✔
466

467
      for (int32_t i = 0; i < numOfTables; ++i) {
1,240,949✔
468
        SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i);
404,173✔
469
        if (NULL == pRsp) {
404,173✔
470
          pInserter->submitRes.code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
471
          goto _return;
×
472
        }
473
        if (TSDB_CODE_SUCCESS != pRsp->code) {
404,173✔
474
          code = pRsp->code;
×
475
          tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
476
          taosMemoryFree(pInserter->submitRes.pRsp);
×
477
          pInserter->submitRes.code = code;
×
478
          goto _return;
×
479
        }
480
      }
481
    }
482

483
    if (pParam->putParam != NULL && ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) {
5,043,957✔
484
      code2 = updateInsertGrpTableInfo((SStreamDataInserterInfo*)pParam->putParam, &pInserter->submitRes);
414,308✔
485
    }
486

487
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
5,043,957✔
488
    qDebug("submit rsp received, affectedRows:%d, total:%" PRId64, pInserter->submitRes.pRsp->affectedRows,
5,044,328✔
489
           pInserter->submitRes.affectedRows);
490
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
5,044,328✔
491
    taosMemoryFree(pInserter->submitRes.pRsp);
5,044,328✔
492
  } else if ((TSDB_CODE_TDB_TABLE_ALREADY_EXIST == code && pParam->putParam != NULL &&
33,720✔
493
              ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) ||
33,720✔
494
             TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER == code) {
495
    pInserter->submitRes.code = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
28,469✔
496
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
28,469✔
497
    if (NULL == pInserter->submitRes.pRsp) {
28,469✔
498
      code2 = terrno;
×
499
      goto _return;
×
500
    }
501

502
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
28,469✔
503
    code2 = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
28,469✔
504
    if (code2 == TSDB_CODE_SUCCESS) {
28,469✔
505
      code2 = checkAndSaveCreateGrpTableInfo(pInserter, (SStreamDataInserterInfo*)pParam->putParam);
28,469✔
506
    }
507
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
28,469✔
508
    taosMemoryFree(pInserter->submitRes.pRsp);
28,469✔
509
  }
510

511
_return:
5,061,178✔
512

513
  if (code2) {
5,078,048✔
514
    qError("update inserter table info failed, error:%s", tstrerror(code2));
798✔
515
  }
516
  tDecoderClear(&coder);
5,078,048✔
517
  TAOS_UNUSED(tsem_post(&pInserter->ready));
5,078,048✔
518

519
  taosMemoryFree(pMsg->pData);
5,078,446✔
520

521
  return TSDB_CODE_SUCCESS;
5,078,446✔
522
}
523

524
void freeUseDbOutput_tmp(void* ppOutput) {
15,236✔
525
  SUseDbOutput* pOut = *(SUseDbOutput**)ppOutput;
15,236✔
526
  if (NULL == ppOutput) {
15,236✔
527
    return;
×
528
  }
529

530
  if (pOut->dbVgroup) {
15,236✔
531
    freeVgInfo(pOut->dbVgroup);
15,236✔
532
  }
533
  taosMemFree(pOut);
15,236✔
534
  *(SUseDbOutput**)ppOutput = NULL;
15,236✔
535
}
536

537
static int32_t processUseDbRspForInserter(void* param, SDataBuf* pMsg, int32_t code) {
70,248✔
538
  int32_t       lino = 0;
70,248✔
539
  SDBVgInfoReq* pVgInfoReq = (SDBVgInfoReq*)param;
70,248✔
540

541
  if (TSDB_CODE_SUCCESS != code) {
70,248✔
542
    // pInserter->pTaskInfo->code = rpcCvtErrCode(code);
543
    // if (pInserter->pTaskInfo->code != code) {
544
    //   qError("load db info rsp received, error:%s, cvted error:%s", tstrerror(code),
545
    //          tstrerror(pInserter->pTaskInfo->code));
546
    // } else {
547
    //   qError("load db info rsp received, error:%s", tstrerror(code));
548
    // }
549
    goto _return;
×
550
  }
551

552
  pVgInfoReq->pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
70,248✔
553
  QUERY_CHECK_NULL(pVgInfoReq->pRsp, code, lino, _return, terrno);
70,248✔
554

555
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pVgInfoReq->pRsp);
70,248✔
556
  QUERY_CHECK_CODE(code, lino, _return);
70,248✔
557

558
_return:
70,248✔
559
  taosMemoryFreeClear(pMsg->pData);
70,248✔
560
  taosMemoryFreeClear(pMsg->pEpSet);
70,248✔
561
  if (code != 0){
70,248✔
562
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
563
  }
564
  int ret = tsem_post(&pVgInfoReq->ready);
70,248✔
565
  if (ret != 0) {
70,248✔
566
    qError("%s failed code: %d", __func__, ret);
×
567
  }
568
  return code;
70,248✔
569
}
570

571

572
int inserterVgInfoComp(const void* lp, const void* rp) {
86,793✔
573
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
86,793✔
574
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
86,793✔
575
  if (pLeft->hashBegin < pRight->hashBegin) {
86,793✔
576
    return -1;
77,197✔
577
  } else if (pLeft->hashBegin > pRight->hashBegin) {
9,596✔
578
    return 1;
9,596✔
579
  }
580

581
  return 0;
×
582
}
583

584
static int32_t buildDbVgInfoMap(void* clientRpc, const char* dbFName, SUseDbOutput* output) {
70,248✔
585
  int32_t      code = TSDB_CODE_SUCCESS;
70,248✔
586
  int32_t      lino = 0;
70,248✔
587
  char*        buf1 = NULL;
70,248✔
588
  SUseDbReq*   pReq = NULL;
70,248✔
589
  SDBVgInfoReq dbVgInfoReq = {0};
70,248✔
590
  code = tsem_init(&dbVgInfoReq.ready, 0, 0);
70,248✔
591
  if (code != TSDB_CODE_SUCCESS) {
70,248✔
592
    qError("tsem_init failed, error:%s", tstrerror(code));
×
593
    return code;
×
594
  }
595

596
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
70,248✔
597
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
69,862✔
598

599
  tstrncpy(pReq->db, dbFName, TSDB_DB_FNAME_LEN);
69,862✔
600
  QUERY_CHECK_CODE(code, lino, _return);
70,248✔
601

602
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
70,248✔
603
  buf1 = taosMemoryCalloc(1, contLen);
68,735✔
604
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
70,248✔
605

606
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
70,248✔
607
  if (tempRes < 0) {
70,248✔
608
    QUERY_CHECK_CODE(terrno, lino, _return);
×
609
  }
610

611
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
70,248✔
612
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
68,711✔
613

614
  SEpSet pEpSet = {0};
68,711✔
615
  QUERY_CHECK_CODE(getCurrentMnodeEpset(&pEpSet), lino, _return);
70,248✔
616

617
  pMsgSendInfo->param = &dbVgInfoReq;
69,884✔
618
  pMsgSendInfo->msgInfo.pData = buf1;
69,884✔
619
  buf1 = NULL;
69,884✔
620
  pMsgSendInfo->msgInfo.len = contLen;
69,884✔
621
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
69,520✔
622
  pMsgSendInfo->fp = processUseDbRspForInserter;
69,498✔
623
  // pMsgSendInfo->requestId = pTaskInfo->id.queryId;
624

625
  code = asyncSendMsgToServer(clientRpc, &pEpSet, NULL, pMsgSendInfo);
69,498✔
626
  QUERY_CHECK_CODE(code, lino, _return);
70,248✔
627

628
  code = tsem_wait(&dbVgInfoReq.ready);
70,248✔
629
  QUERY_CHECK_CODE(code, lino, _return);
70,248✔
630

631
  code = queryBuildUseDbOutput(output, dbVgInfoReq.pRsp);
70,248✔
632
  QUERY_CHECK_CODE(code, lino, _return);
70,248✔
633

634
  output->dbVgroup->vgArray = taosArrayInit(dbVgInfoReq.pRsp->vgNum, sizeof(SVgroupInfo));
70,248✔
635
  if (NULL == output->dbVgroup->vgArray) {
70,248✔
636
    code = terrno;
×
637
    QUERY_CHECK_CODE(code, lino, _return);
×
638
  }
639

640
  void* pIter = taosHashIterate(output->dbVgroup->vgHash, NULL);
70,248✔
641
  while (pIter) {
177,990✔
642
    if (NULL == taosArrayPush(output->dbVgroup->vgArray, pIter)) {
215,484✔
643
      taosHashCancelIterate(output->dbVgroup->vgHash, pIter);
×
644
      return terrno;
×
645
    }
646

647
    pIter = taosHashIterate(output->dbVgroup->vgHash, pIter);
107,742✔
648
  }
649

650
  taosArraySort(output->dbVgroup->vgArray, inserterVgInfoComp);
70,248✔
651

652
_return:
70,248✔
653

654
  if (code) {
70,248✔
655
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
656
    taosMemoryFree(buf1);
×
657
  }
658
  taosMemoryFree(pReq);
70,248✔
659
  TAOS_UNUSED(tsem_destroy(&dbVgInfoReq.ready));
70,248✔
660
  if (dbVgInfoReq.pRsp) {
70,248✔
661
    tFreeSUsedbRsp(dbVgInfoReq.pRsp);
70,248✔
662
    taosMemoryFreeClear(dbVgInfoReq.pRsp);
70,248✔
663
  }
664
  return code;
70,248✔
665
}
666

667
int32_t inserterBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
320,468✔
668
                                 SArray* tagName, uint8_t tagNum, int32_t ttl) {
669
  pTbReq->type = TD_CHILD_TABLE;
320,468✔
670
  pTbReq->ctb.pTag = (uint8_t*)pTag;
322,005✔
671
  pTbReq->name = taosStrdup(tname);
321,522✔
672
  if (!pTbReq->name) return terrno;
321,522✔
673
  pTbReq->ctb.suid = suid;
322,005✔
674
  pTbReq->ctb.tagNum = tagNum;
321,641✔
675
  if (sname) {
322,005✔
676
    pTbReq->ctb.stbName = taosStrdup(sname);
322,005✔
677
    if (!pTbReq->ctb.stbName) {
321,641✔
678
      taosMemoryFree(pTbReq->name);
×
679
      return terrno;
×
680
    }
681
  }
682
  pTbReq->ctb.tagName = tagName;
322,005✔
683
  pTbReq->ttl = ttl;
321,522✔
684
  pTbReq->commentLen = -1;
322,005✔
685

686
  return TSDB_CODE_SUCCESS;
322,005✔
687
}
688

689
int32_t inserterHashValueComp(void const* lp, void const* rp) {
6,982,043✔
690
  uint32_t*    key = (uint32_t*)lp;
6,982,043✔
691
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
6,982,043✔
692

693
  if (*key < pVg->hashBegin) {
6,982,043✔
694
    return -1;
1,755,134✔
695
  } else if (*key > pVg->hashEnd) {
5,226,023✔
696
    return 1;
198,346✔
697
  }
698

699
  return 0;
5,028,140✔
700
}
701

702

703
int32_t inserterGetVgInfo(SDBVgInfo* dbInfo, char* tbName, SVgroupInfo* pVgInfo) {
5,028,563✔
704
  if (NULL == dbInfo) {
5,028,563✔
705
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
706
  }
707

708
  if (NULL == dbInfo->vgArray) {
5,028,563✔
709
    qError("empty db vgArray, hashSize:%d", taosHashGetSize(dbInfo->vgHash));
×
710
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
711
  }
712

713
  uint32_t hashValue =
5,028,563✔
714
      taosGetTbHashVal(tbName, (int32_t)strlen(tbName), dbInfo->hashMethod, dbInfo->hashPrefix, dbInfo->hashSuffix);
5,028,563✔
715
  SVgroupInfo* vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, inserterHashValueComp, TD_EQ);
5,028,583✔
716
  if (NULL == vgInfo) {
5,027,737✔
717
    qError("no hash range found for hash value [%u], table:%s, numOfVgId:%d", hashValue, tbName,
×
718
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
719
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
720
  }
721
  
722
  *pVgInfo = *vgInfo;
5,027,737✔
723
  qDebug("insert get vgInfo, tbName:%s vgId:%d epset(%s:%d)", tbName, pVgInfo->vgId, pVgInfo->epSet.eps[0].fqdn,
5,027,391✔
724
        pVgInfo->epSet.eps[0].port);
725
        
726
  return TSDB_CODE_SUCCESS;
5,028,583✔
727
}
728

729
int32_t inserterGetVgId(SDBVgInfo* dbInfo, char* tbName, int32_t* vgId) {
3,768✔
730
  SVgroupInfo vgInfo = {0};
3,768✔
731
  int32_t     code = inserterGetVgInfo(dbInfo, tbName, &vgInfo);
3,768✔
732
  if (code != TSDB_CODE_SUCCESS) {
3,768✔
733
    qError("inserterGetVgId failed, code:%d", code);
×
734
    return code;
×
735
  }
736
  *vgId = vgInfo.vgId;
3,768✔
737

738
  return TSDB_CODE_SUCCESS;
3,768✔
739
}
740

741
int32_t inserterGetDbVgInfo(SDataInserterHandle* pInserter, const char* dbFName, SDBVgInfo** dbVgInfo) {
1,884✔
742
  int32_t       code = TSDB_CODE_SUCCESS;
1,884✔
743
  int32_t       line = 0;
1,884✔
744
  SUseDbOutput* output = NULL;
1,884✔
745

746
  // QRY_PARAM_CHECK(dbVgInfo);
747
  // QRY_PARAM_CHECK(pInserter);
748
  // QRY_PARAM_CHECK(name);
749

750
  if (pInserter->dbVgInfoMap == NULL) {
1,884✔
751
    pInserter->dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,884✔
752
    if (pInserter->dbVgInfoMap == NULL) {
1,884✔
753
      return TSDB_CODE_OUT_OF_MEMORY;
×
754
    }
755
  }
756

757
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(pInserter->dbVgInfoMap, dbFName, strlen(dbFName));
1,884✔
758

759
  if (find == NULL) {
1,884✔
760
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
1,884✔
761
    if (output == NULL) {
1,884✔
762
      return TSDB_CODE_OUT_OF_MEMORY;
×
763
    }
764

765
    code = buildDbVgInfoMap(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName, output);
1,884✔
766
    QUERY_CHECK_CODE(code, line, _return);
1,884✔
767

768
    code = taosHashPut(pInserter->dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
1,884✔
769
    QUERY_CHECK_CODE(code, line, _return);
1,884✔
770
  } else {
771
    output = *find;
×
772
  }
773

774
  *dbVgInfo = output->dbVgroup;
1,884✔
775
  return code;
1,884✔
776

777
_return:
×
778
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
779
  freeUseDbOutput_tmp(&output);
×
780
  return code;
×
781
}
782

783
int32_t getTableVgInfo(SDataInserterHandle* pInserter, const char* dbFName,
5,024,795✔
784
                       const char* tbName, SVgroupInfo* pVgInfo) {
785
  return getDbVgInfoForExec(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName,
5,024,795✔
786
                              tbName, pVgInfo);
787
}
788

789
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* putParam, void* pMsg, int32_t msgLen,
5,078,048✔
790
                                 void* pTransporter, SEpSet* pEpset) {
791
  // send the fetch remote task result reques
792
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,078,048✔
793
  if (NULL == pMsgSendInfo) {
5,078,446✔
794
    taosMemoryFreeClear(pMsg);
×
795
    return terrno;
×
796
  }
797

798
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
5,078,446✔
799
  if (NULL == pParam) {
5,078,446✔
800
    taosMemoryFreeClear(pMsg);
×
801
    taosMemoryFreeClear(pMsgSendInfo);
×
802
    return terrno;
×
803
  }
804
  pParam->pInserter = pInserter;
5,078,446✔
805
  pParam->putParam = putParam;
5,078,446✔
806

807
  pMsgSendInfo->param = pParam;
5,078,446✔
808
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,078,446✔
809
  pMsgSendInfo->msgInfo.pData = pMsg;
5,078,043✔
810
  pMsgSendInfo->msgInfo.len = msgLen;
5,077,242✔
811
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
5,077,645✔
812
  pMsgSendInfo->fp = inserterCallback;
5,078,043✔
813

814
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
5,078,043✔
815
}
816

817
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
5,078,446✔
818
  int32_t code = TSDB_CODE_SUCCESS;
5,078,446✔
819
  int32_t len = 0;
5,078,446✔
820
  void*   pBuf = NULL;
5,078,446✔
821
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
5,078,446✔
822
  if (TSDB_CODE_SUCCESS == code) {
5,077,176✔
823
    SEncoder encoder;
5,060,306✔
824
    len += sizeof(SSubmitReq2Msg);
5,077,236✔
825
    pBuf = taosMemoryMalloc(len);
5,077,236✔
826
    if (NULL == pBuf) {
5,075,924✔
827
      return terrno;
×
828
    }
829
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
5,075,924✔
830
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
5,077,600✔
831
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
5,075,984✔
832
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
5,077,659✔
833
    code = tEncodeSubmitReq(&encoder, pReq);
5,076,838✔
834
    tEncoderClear(&encoder);
5,078,048✔
835
  }
836

837
  if (TSDB_CODE_SUCCESS == code) {
5,077,591✔
838
    *pData = pBuf;
5,077,591✔
839
    *pLen = len;
5,077,591✔
840
  } else {
841
    taosMemoryFree(pBuf);
×
842
  }
843

844
  return code;
5,074,286✔
845
}
846

847
int32_t buildSubmitReqFromStbBlock(SDataInserterHandle* pInserter, SHashObj* pHash, const SSDataBlock* pDataBlock,
1,884✔
848
                                   const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid) {
849
  SArray* pVals = NULL;
1,884✔
850
  SArray* pTagVals = NULL;
1,884✔
851
  SSubmitReq2** ppReq = NULL;
1,884✔
852
  int32_t numOfBlks = 0;
1,884✔
853

854
  terrno = TSDB_CODE_SUCCESS;
1,884✔
855

856
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
1,884✔
857
  int32_t rows = pDataBlock->info.rows;
1,884✔
858

859
  if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
1,884✔
860
    goto _end;
×
861
  }
862

863
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
1,884✔
864
    goto _end;
×
865
  }
866

867
  SDBVgInfo* dbInfo = NULL;
1,884✔
868
  int32_t    code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
1,884✔
869
  if (code != TSDB_CODE_SUCCESS) {
1,884✔
870
    terrno = code;
×
871
    goto _end;
×
872
  }
873

874
  for (int32_t j = 0; j < rows; ++j) {
5,652✔
875
    SSubmitTbData tbData = {0};
3,768✔
876
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
3,768✔
877
      goto _end;
×
878
    }
879
    tbData.suid = suid;
3,768✔
880
    tbData.uid = uid;
3,768✔
881
    tbData.sver = pTSchema->version;
3,768✔
882

883
    int64_t lastTs = TSKEY_MIN;
3,768✔
884

885
    taosArrayClear(pVals);
3,768✔
886

887
    int32_t offset = 0;
3,768✔
888
    taosArrayClear(pTagVals);
3,768✔
889
    tbData.uid = 0;
3,768✔
890
    tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
3,768✔
891
    if (NULL == tbData.pCreateTbReq) {
3,768✔
892
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
893
      goto _end;
×
894
    }
895
    tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
3,768✔
896

897
    SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
3,768✔
898
    if (NULL == tbname) {
3,768✔
899
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
900
      qError("Insert into stable must have tbname column");
×
901
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
902
      goto _end;
×
903
    }
904
    if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
3,768✔
905
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
906
      qError("tbname column must be binary");
×
907
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
908
      goto _end;
×
909
    }
910

911
    if (colDataIsNull_s(tbname, j)) {
7,536✔
912
      qError("insert into stable tbname column is null");
×
913
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
914
      goto _end;
×
915
    }
916
    void*   data = colDataGetVarData(tbname, j);
3,768✔
917
    SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data), .pData = varDataVal(data)};
3,768✔
918
    SColVal cv = COL_VAL_VALUE(0, sv);
3,768✔
919

920
    char tbFullName[TSDB_TABLE_FNAME_LEN];
3,768✔
921
    char tableName[TSDB_TABLE_FNAME_LEN];
3,768✔
922
    memcpy(tableName, sv.pData, sv.nData);
3,768✔
923
    tableName[sv.nData] = '\0';
3,768✔
924

925
    int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
3,768✔
926
    if (len >= TSDB_TABLE_FNAME_LEN) {
3,768✔
927
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
928
      qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
929
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
930
      goto _end;
×
931
    }
932
    int32_t vgIdForTbName = 0;
3,768✔
933
    code = inserterGetVgId(dbInfo, tbFullName, &vgIdForTbName);
3,768✔
934
    if (code != TSDB_CODE_SUCCESS) {
3,768✔
935
      terrno = code;
×
936
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
937
      goto _end;
×
938
    }
939
    SSubmitReq2* pReq = NULL;
3,768✔
940
    ppReq = taosHashGet(pHash, &vgIdForTbName, sizeof(int32_t));
3,768✔
941
    if (ppReq == NULL) {
3,768✔
942
      pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
3,768✔
943
      if (NULL == pReq) {
3,768✔
944
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
945
        goto _end;
×
946
      }
947

948
      if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
3,768✔
949
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
950
        goto _end;
×
951
      }
952
      code = taosHashPut(pHash, &vgIdForTbName, sizeof(int32_t), &pReq, POINTER_BYTES);
3,768✔
953
    } else {
954
      pReq = *ppReq;
×
955
    }
956

957
    if (code != TSDB_CODE_SUCCESS) {
3,768✔
958
      terrno = code;
×
959
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
960
      goto _end;
×
961
    }
962
    SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
3,768✔
963
    if (!TagNames) {
3,768✔
964
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
965
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
966
      goto _end;
×
967
    }
968
    for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
11,304✔
969
      SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
7,536✔
970
      int16_t  colIdx = tSchema->colId;
7,536✔
971
      int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
7,536✔
972
      if (NULL == slotId) {
7,536✔
973
        continue;
5,024✔
974
      }
975
      if (NULL == taosArrayPush(TagNames, tSchema->name)) {
5,024✔
976
        taosArrayDestroy(TagNames);
×
977
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
978
        goto _end;
×
979
      }
980

981
      colIdx = *slotId;
2,512✔
982
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
2,512✔
983
      if (NULL == pColInfoData) {
2,512✔
984
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
985
        taosArrayDestroy(TagNames);
×
986
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
987
        goto _end;
×
988
      }
989
      // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
990
      switch (pColInfoData->info.type) {
2,512✔
991
        case TSDB_DATA_TYPE_NCHAR:
2,512✔
992
        case TSDB_DATA_TYPE_VARBINARY:
993
        case TSDB_DATA_TYPE_VARCHAR: {
994
          if (pColInfoData->info.type != tSchema->type) {
2,512✔
995
            qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
996
                   tSchema->type);
997
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
998
            taosArrayDestroy(TagNames);
×
999
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1000
            goto _end;
×
1001
          }
1002
          if (colDataIsNull_s(pColInfoData, j)) {
5,024✔
1003
            continue;
×
1004
          } else {
1005
            void*   data = colDataGetVarData(pColInfoData, j);
2,512✔
1006
            STagVal tv = (STagVal){
2,512✔
1007
                .cid = tSchema->colId, .type = tSchema->type, .nData = varDataLen(data), .pData = varDataVal(data)};
2,512✔
1008
            if (NULL == taosArrayPush(pTagVals, &tv)) {
2,512✔
1009
              taosArrayDestroy(TagNames);
×
1010
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1011
              goto _end;
×
1012
            }
1013
          }
1014
          break;
2,512✔
1015
        }
1016
        case TSDB_DATA_TYPE_BLOB:
×
1017
        case TSDB_DATA_TYPE_JSON:
1018
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1019
          qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1020
          terrno = TSDB_CODE_APP_ERROR;
×
1021
          taosArrayDestroy(TagNames);
×
1022
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1023
          goto _end;
×
1024
          break;
1025
        default:
×
1026
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
1027
            if (colDataIsNull_s(pColInfoData, j)) {
×
1028
              continue;
×
1029
            } else {
1030
              void*   data = colDataGetData(pColInfoData, j);
×
1031
              STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
×
1032
              memcpy(&tv.i64, data, tSchema->bytes);
×
1033
              if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1034
                taosArrayDestroy(TagNames);
×
1035
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1036
                goto _end;
×
1037
              }
1038
            }
1039
          } else {
1040
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1041
            terrno = TSDB_CODE_APP_ERROR;
×
1042
            taosArrayDestroy(TagNames);
×
1043
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1044
            goto _end;
×
1045
          }
1046
          break;
×
1047
      }
1048
    }
1049
    STag* pTag = NULL;
3,768✔
1050
    code = tTagNew(pTagVals, 1, false, &pTag);
3,768✔
1051
    if (code != TSDB_CODE_SUCCESS) {
3,768✔
1052
      terrno = code;
×
1053
      qError("failed to create tag, error:%s", tstrerror(code));
×
1054
      taosArrayDestroy(TagNames);
×
1055
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1056
      goto _end;
×
1057
    }
1058

1059
    code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, suid, pInserter->pNode->tableName, TagNames,
3,768✔
1060
                                    pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
3,768✔
1061
    if (code != TSDB_CODE_SUCCESS) {
3,768✔
1062
      terrno = code;
×
1063
      qError("failed to build create table request, error:%s", tstrerror(code));
×
1064
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1065
      goto _end;
×
1066
    }
1067

1068
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {
18,840✔
1069
      int16_t         colIdx = k;
15,072✔
1070
      const STColumn* pCol = &pTSchema->columns[k];
15,072✔
1071
      int16_t*        slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
15,072✔
1072
      if (NULL == slotId) {
15,072✔
1073
        continue;
3,768✔
1074
      }
1075
      colIdx = *slotId;
11,304✔
1076

1077
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
11,304✔
1078
      if (NULL == pColInfoData) {
11,304✔
1079
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1080
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1081
        goto _end;
×
1082
      }
1083
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
11,304✔
1084

1085
      switch (pColInfoData->info.type) {
11,304✔
1086
        case TSDB_DATA_TYPE_NCHAR:
×
1087
        case TSDB_DATA_TYPE_VARBINARY:
1088
        case TSDB_DATA_TYPE_VARCHAR: {
1089
          if (pColInfoData->info.type != pCol->type) {
×
1090
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1091
                   pCol->type);
1092
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1093
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1094
            goto _end;
×
1095
          }
1096
          if (colDataIsNull_s(pColInfoData, j)) {
×
1097
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1098
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1099
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1100
              goto _end;
×
1101
            }
1102
          } else {
1103
            void*   data = colDataGetVarData(pColInfoData, j);
×
1104
            SValue  sv = (SValue){.type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};
×
1105
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
1106
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1107
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1108
              goto _end;
×
1109
            }
1110
          }
1111
          break;
×
1112
        }
1113
        case TSDB_DATA_TYPE_BLOB:
×
1114
        case TSDB_DATA_TYPE_JSON:
1115
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1116
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1117
          terrno = TSDB_CODE_APP_ERROR;
×
1118
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1119
          goto _end;
×
1120
          break;
1121
        default:
11,304✔
1122
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
11,304✔
1123
            if (colDataIsNull_s(pColInfoData, j)) {
22,608✔
1124
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
×
1125
                qError("Primary timestamp column should not be null");
×
1126
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1127
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1128
                goto _end;
×
1129
              }
1130

1131
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1132
              if (NULL == taosArrayPush(pVals, &cv)) {
×
1133
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1134
                goto _end;
×
1135
              }
1136
            } else {
1137
              // if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
1138
              //   if (*(int64_t*)var <= lastTs) {
1139
              //     needSortMerge = true;
1140
              //   } else {
1141
              //     lastTs = *(int64_t*)var;
1142
              //   }
1143
              // }
1144

1145
              SValue sv = {.type = pCol->type};
11,304✔
1146
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
11,304✔
1147
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
11,304✔
1148
              if (NULL == taosArrayPush(pVals, &cv)) {
11,304✔
1149
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1150
                goto _end;
×
1151
              }
1152
            }
1153
          } else {
1154
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1155
            terrno = TSDB_CODE_APP_ERROR;
×
1156
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1157
            goto _end;
×
1158
          }
1159
          break;
11,304✔
1160
      }
1161
    }
1162

1163
    SRow* pRow = NULL;
3,768✔
1164
    SRowBuildScanInfo sinfo = {0};
3,768✔
1165
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
3,768✔
1166
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1167
      goto _end;
×
1168
    }
1169
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
7,536✔
1170
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1171
      goto _end;
×
1172
    }
1173

1174
    if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
7,536✔
1175
      goto _end;
×
1176
    }
1177
  }
1178

1179
_end:
1,884✔
1180
  taosArrayDestroy(pTagVals);
1,884✔
1181
  taosArrayDestroy(pVals);
1,884✔
1182

1183
  return terrno;
1,884✔
1184
}
1185

1186
int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** ppReq, const SSDataBlock* pDataBlock,
59,735✔
1187
                                const STSchema* pTSchema, int64_t* uid, int32_t* vgId, tb_uid_t* suid) {
1188
  SSubmitReq2* pReq = *ppReq;
59,735✔
1189
  SArray*      pVals = NULL;
59,735✔
1190
  SArray*      pTagVals = NULL;
59,735✔
1191
  int32_t      numOfBlks = 0;
59,735✔
1192
  char*        tableName = NULL;
59,735✔
1193
  int32_t      code = 0, lino = 0;
59,735✔
1194

1195
  terrno = TSDB_CODE_SUCCESS;
59,735✔
1196

1197
  if (NULL == pReq) {
59,735✔
1198
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
59,735✔
1199
      goto _end;
×
1200
    }
1201

1202
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
59,735✔
1203
      goto _end;
×
1204
    }
1205
  }
1206

1207
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
59,735✔
1208
  int32_t rows = pDataBlock->info.rows;
59,735✔
1209

1210
  SSubmitTbData tbData = {0};
59,735✔
1211
  if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
59,735✔
1212
    goto _end;
×
1213
  }
1214
  tbData.suid = *suid;
59,735✔
1215
  tbData.uid = *uid;
59,735✔
1216
  tbData.sver = pTSchema->version;
59,735✔
1217

1218
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
59,735✔
1219
    taosArrayDestroy(tbData.aRowP);
×
1220
    goto _end;
×
1221
  }
1222

1223
  if (pInserter->isStbInserter) {
59,735✔
1224
    if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
×
1225
      taosArrayDestroy(tbData.aRowP);
×
1226
      goto _end;
×
1227
    }
1228
  }
1229

1230
  int64_t lastTs = TSKEY_MIN;
59,735✔
1231
  bool    needSortMerge = false;
59,735✔
1232

1233
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
2,425,726✔
1234
    taosArrayClear(pVals);
2,373,911✔
1235

1236
    int32_t offset = 0;
2,373,911✔
1237
    // 处理超级表的tbname和tags
1238
    if (pInserter->isStbInserter) {
2,373,911✔
1239
      taosArrayClear(pTagVals);
×
1240
      tbData.uid = 0;
×
1241
      *uid = 0;
×
1242
      tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
1243
      tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
×
1244

1245
      SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
×
1246
      if (NULL == tbname) {
×
1247
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1248
        qError("Insert into stable must have tbname column");
×
1249
        goto _end;
×
1250
      }
1251
      if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
×
1252
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1253
        qError("tbname column must be binary");
×
1254
        goto _end;
×
1255
      }
1256

1257
      if (colDataIsNull_s(tbname, j)) {
×
1258
        qError("insert into stable tbname column is null");
×
1259
        goto _end;
×
1260
      }
1261
      void*   data = colDataGetVarData(tbname, j);
×
1262
      SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data),
×
1263
                            .pData = varDataVal(data)};  // address copy, no value
×
1264
      SColVal cv = COL_VAL_VALUE(0, sv);
×
1265

1266
      // 获取子表vgId
1267
      SDBVgInfo* dbInfo = NULL;
×
1268
      code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
×
1269
      if (code != TSDB_CODE_SUCCESS) {
×
1270
        goto _end;
×
1271
      }
1272

1273
      char tbFullName[TSDB_TABLE_FNAME_LEN];
×
1274
      taosMemoryFreeClear(tableName);
×
1275
      tableName = taosMemoryCalloc(1, sv.nData + 1);
×
1276
      TSDB_CHECK_NULL(tableName, code, lino, _end, terrno);
×
1277
      tstrncpy(tableName, sv.pData, sv.nData);
×
1278
      tableName[sv.nData] = '\0';
×
1279

1280
      int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
×
1281
      if (len >= TSDB_TABLE_FNAME_LEN) {
×
1282
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1283
        qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
1284
        goto _end;
×
1285
      }
1286
      code = inserterGetVgId(dbInfo, tbFullName, vgId);
×
1287
      if (code != TSDB_CODE_SUCCESS) {
×
1288
        terrno = code;
×
1289
        goto _end;
×
1290
      }
1291
      // 解析tag
1292
      SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
×
1293
      if (!TagNames) {
×
1294
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1295
        goto _end;
×
1296
      }
1297
      for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
×
1298
        SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
×
1299
        int16_t  colIdx = tSchema->colId;
×
1300
        if (NULL == taosArrayPush(TagNames, tSchema->name)) {
×
1301
          goto _end;
×
1302
        }
1303
        int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
×
1304
        if (NULL == slotId) {
×
1305
          continue;
×
1306
        }
1307

1308
        colIdx = *slotId;
×
1309
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
×
1310
        if (NULL == pColInfoData) {
×
1311
          terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1312
          goto _end;
×
1313
        }
1314
        // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
1315
        switch (pColInfoData->info.type) {
×
1316
          case TSDB_DATA_TYPE_NCHAR:
×
1317
          case TSDB_DATA_TYPE_VARBINARY:
1318
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1319
            if (pColInfoData->info.type != tSchema->type) {
×
1320
              qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
1321
                     tSchema->type);
1322
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1323
              goto _end;
×
1324
            }
1325
            if (colDataIsNull_s(pColInfoData, j)) {
×
1326
              continue;
×
1327
            } else {
1328
              void*   data = colDataGetVarData(pColInfoData, j);
×
1329
              STagVal tv = (STagVal){.cid = tSchema->colId,
×
1330
                                     .type = tSchema->type,
×
1331
                                     .nData = varDataLen(data),
×
1332
                                     .pData = varDataVal(data)};  // address copy, no value
×
1333
              if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1334
                goto _end;
×
1335
              }
1336
            }
1337
            break;
×
1338
          }
1339
          case TSDB_DATA_TYPE_BLOB:
×
1340
          case TSDB_DATA_TYPE_JSON:
1341
          case TSDB_DATA_TYPE_MEDIUMBLOB:
1342
            qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1343
            terrno = TSDB_CODE_APP_ERROR;
×
1344
            goto _end;
×
1345
            break;
1346
          default:
×
1347
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
1348
              if (colDataIsNull_s(pColInfoData, j)) {
×
1349
                continue;
×
1350
              } else {
1351
                void*   data = colDataGetData(pColInfoData, j);
×
1352
                STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
×
1353
                memcpy(&tv.i64, data, tSchema->bytes);
×
1354
                if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1355
                  goto _end;
×
1356
                }
1357
              }
1358
            } else {
1359
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1360
              terrno = TSDB_CODE_APP_ERROR;
×
1361
              goto _end;
×
1362
            }
1363
            break;
×
1364
        }
1365
      }
1366
      STag* pTag = NULL;
×
1367
      code = tTagNew(pTagVals, 1, false, &pTag);
×
1368
      if (code != TSDB_CODE_SUCCESS) {
×
1369
        terrno = code;
×
1370
        qError("failed to create tag, error:%s", tstrerror(code));
×
1371
        goto _end;
×
1372
      }
1373

1374
      code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, *suid, pInserter->pNode->tableName, TagNames,
×
1375
                               pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
×
1376
    }
1377

1378
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {  // iterate by column
11,892,908✔
1379
      int16_t         colIdx = k;
9,518,997✔
1380
      const STColumn* pCol = &pTSchema->columns[k];
9,518,997✔
1381
      if (!pInserter->fullOrderColList) {
9,518,997✔
1382
        int16_t* slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
176,976✔
1383
        if (NULL == slotId) {
176,976✔
1384
          continue;
46,008✔
1385
        }
1386

1387
        colIdx = *slotId;
130,968✔
1388
      }
1389

1390
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
9,472,989✔
1391
      if (NULL == pColInfoData) {
9,472,989✔
1392
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1393
        goto _end;
×
1394
      }
1395
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
9,472,989✔
1396

1397
      switch (pColInfoData->info.type) {
9,472,989✔
1398
        case TSDB_DATA_TYPE_NCHAR:
139,146✔
1399
        case TSDB_DATA_TYPE_VARBINARY:
1400
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1401
          if (pColInfoData->info.type != pCol->type) {
139,146✔
1402
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1403
                   pCol->type);
1404
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1405
            goto _end;
×
1406
          }
1407
          if (colDataIsNull_s(pColInfoData, j)) {
278,292✔
1408
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
2,640✔
1409
            if (NULL == taosArrayPush(pVals, &cv)) {
2,640✔
1410
              goto _end;
×
1411
            }
1412
          } else {
1413
            void*  data = colDataGetVarData(pColInfoData, j);
136,506✔
1414
            SValue sv = (SValue){
409,518✔
1415
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
136,506✔
1416
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
136,506✔
1417
            if (NULL == taosArrayPush(pVals, &cv)) {
136,506✔
1418
              goto _end;
×
1419
            }
1420
          }
1421
          break;
139,146✔
1422
        }
1423
        case TSDB_DATA_TYPE_BLOB:
×
1424
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1425
        case TSDB_DATA_TYPE_JSON:
1426
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1427
          terrno = TSDB_CODE_APP_ERROR;
×
1428
          goto _end;
×
1429
          break;
1430
        default:
9,333,843✔
1431
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
9,333,843✔
1432
            if (colDataIsNull_s(pColInfoData, j)) {
18,667,686✔
1433
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
7,788✔
1434
                qError("Primary timestamp column should not be null");
×
1435
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1436
                goto _end;
×
1437
              }
1438

1439
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
7,788✔
1440
              if (NULL == taosArrayPush(pVals, &cv)) {
7,788✔
1441
                goto _end;
×
1442
              }
1443
            } else {
1444
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
9,326,055✔
1445
                if (*(int64_t*)var <= lastTs) {
2,328,723✔
1446
                  needSortMerge = true;
10,965✔
1447
                } else {
1448
                  lastTs = *(int64_t*)var;
2,317,758✔
1449
                }
1450
              }
1451

1452
              SValue sv = {.type = pCol->type};
9,326,055✔
1453
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
9,326,055✔
1454
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
9,326,055✔
1455
              if (NULL == taosArrayPush(pVals, &cv)) {
9,326,055✔
1456
                goto _end;
×
1457
              }
1458
            }
1459
          } else {
1460
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1461
            terrno = TSDB_CODE_APP_ERROR;
×
1462
            goto _end;
×
1463
          }
1464
          break;
9,333,843✔
1465
      }
1466
    }
1467

1468
    SRow*             pRow = NULL;
2,373,911✔
1469
    SRowBuildScanInfo sinfo = {0};
2,373,911✔
1470
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
2,373,911✔
1471
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
7,920✔
1472
      goto _end;
7,920✔
1473
    }
1474
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
4,731,982✔
1475
      goto _end;
×
1476
    }
1477
  }
1478

1479
  if (needSortMerge) {
51,815✔
1480
    if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
10,965✔
1481
        (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
10,965✔
1482
      goto _end;
×
1483
    }
1484
  }
1485

1486
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
103,630✔
1487
    goto _end;
×
1488
  }
1489

1490
_end:
59,735✔
1491

1492
  taosMemoryFreeClear(tableName);
59,735✔
1493

1494
  taosArrayDestroy(pTagVals);
59,735✔
1495
  taosArrayDestroy(pVals);
59,735✔
1496
  if (terrno != 0) {
59,735✔
1497
    *ppReq = NULL;
7,920✔
1498
    if (pReq) {
7,920✔
1499
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
7,920✔
1500
      taosMemoryFree(pReq);
7,920✔
1501
    }
1502

1503
    return terrno;
7,920✔
1504
  }
1505
  *ppReq = pReq;
51,815✔
1506

1507
  return TSDB_CODE_SUCCESS;
51,815✔
1508
}
1509

1510
static void destroySubmitReqWrapper(void* p) {
3,768✔
1511
  SSubmitReq2* pReq = *(SSubmitReq2**)p;
3,768✔
1512
  if (pReq != NULL) {
3,768✔
1513
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
3,768✔
1514
    taosMemoryFree(pReq);
3,768✔
1515
  }
1516
}
3,768✔
1517

1518
int32_t dataBlocksToSubmitReqArray(SDataInserterHandle* pInserter, SArray* pMsgs) {
1,884✔
1519
  const SArray*   pBlocks = pInserter->pDataBlocks;
1,884✔
1520
  const STSchema* pTSchema = pInserter->pSchema;
1,884✔
1521
  int64_t         uid = pInserter->pNode->tableId;
1,884✔
1522
  int64_t         suid = pInserter->pNode->stableId;
1,884✔
1523
  int32_t         vgId = pInserter->pNode->vgId;
1,884✔
1524
  int32_t         sz = taosArrayGetSize(pBlocks);
1,884✔
1525
  int32_t         code = 0;
1,884✔
1526

1527
  SHashObj* pHash = NULL;
1,884✔
1528
  void*     iterator = NULL;
1,884✔
1529

1530
  for (int32_t i = 0; i < sz; i++) {
3,768✔
1531
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
1,884✔
1532
    if (NULL == pDataBlock) {
1,884✔
1533
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1534
    }
1535
    if (pHash == NULL) {
1,884✔
1536
      pHash = taosHashInit(sz * pDataBlock->info.rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false,
1,884✔
1537
                           HASH_ENTRY_LOCK);
1538
      if (NULL == pHash) {
1,884✔
1539
        return terrno;
×
1540
      }
1541
      taosHashSetFreeFp(pHash, destroySubmitReqWrapper);
1,884✔
1542
    }
1543
    code = buildSubmitReqFromStbBlock(pInserter, pHash, pDataBlock, pTSchema, uid, vgId, suid);
1,884✔
1544
    if (code != TSDB_CODE_SUCCESS) {
1,884✔
1545
      goto _end;
×
1546
    }
1547
  }
1548

1549
  size_t keyLen = 0;
1,884✔
1550
  while ((iterator = taosHashIterate(pHash, iterator))) {
5,652✔
1551
    SSubmitReq2* pReq = *(SSubmitReq2**)iterator;
3,768✔
1552
    int32_t*     ctbVgId = taosHashGetKey(iterator, &keyLen);
3,768✔
1553

1554
    SSubmitTbDataMsg* pMsg = taosMemoryCalloc(1, sizeof(SSubmitTbDataMsg));
3,768✔
1555
    if (NULL == pMsg) {
3,768✔
1556
      code = terrno;
×
1557
      goto _end;
×
1558
    }
1559
    code = submitReqToMsg(*ctbVgId, pReq, &pMsg->pData, &pMsg->len);
3,768✔
1560
    if (code != TSDB_CODE_SUCCESS) {
3,768✔
1561
      goto _end;
×
1562
    }
1563
    if (NULL == taosArrayPush(pMsgs, &pMsg)) {
3,768✔
1564
      code = terrno;
×
1565
      goto _end;
×
1566
    }
1567
  }
1568

1569
_end:
1,884✔
1570
  if (pHash != NULL) {
1,884✔
1571
    taosHashCleanup(pHash);
1,884✔
1572
  }
1573

1574
  return code;
1,884✔
1575
}
1576

1577
int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32_t* msgLen) {
59,735✔
1578
  const SArray*   pBlocks = pInserter->pDataBlocks;
59,735✔
1579
  const STSchema* pTSchema = pInserter->pSchema;
59,735✔
1580
  int64_t         uid = pInserter->pNode->tableId;
59,735✔
1581
  int64_t         suid = pInserter->pNode->stableId;
59,735✔
1582
  int32_t         vgId = pInserter->pNode->vgId;
59,735✔
1583
  int32_t         sz = taosArrayGetSize(pBlocks);
59,735✔
1584
  int32_t         code = 0;
59,735✔
1585
  SSubmitReq2*    pReq = NULL;
59,735✔
1586

1587
  for (int32_t i = 0; i < sz; i++) {
111,550✔
1588
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
59,735✔
1589
    if (NULL == pDataBlock) {
59,735✔
1590
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1591
    }
1592
    code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, &uid, &vgId, &suid);
59,735✔
1593
    if (code) {
59,735✔
1594
      if (pReq) {
7,920✔
1595
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
1596
        taosMemoryFree(pReq);
×
1597
      }
1598

1599
      return code;
7,920✔
1600
    }
1601
  }
1602

1603
  code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
51,815✔
1604
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
51,815✔
1605
  taosMemoryFree(pReq);
51,815✔
1606

1607
  return code;
51,815✔
1608
}
1609

1610
int32_t getStreamInsertTableInfo(int64_t streamId, int64_t groupId, SInsertTableInfo*** ppTbInfo) {
5,052,536✔
1611
  int64_t            key[2] = {streamId, groupId};
5,052,536✔
1612
  SInsertTableInfo** pTmp = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
5,053,687✔
1613
  if (NULL == pTmp || *pTmp == NULL) {
5,053,323✔
1614
    return TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
1615
  }
1616

1617
  *ppTbInfo = pTmp;
5,053,323✔
1618
  return TSDB_CODE_SUCCESS;
5,053,323✔
1619
}
1620

1621
static int32_t releaseStreamInsertTableInfo(SInsertTableInfo** ppTbInfo) {
5,053,687✔
1622
  taosHashRelease(gStreamGrpTableHash, ppTbInfo);
5,053,687✔
1623
  return TSDB_CODE_SUCCESS;
5,053,687✔
1624
}
1625

1626
int32_t buildNormalTableCreateReq(SDataInserterHandle* pInserter, SStreamInserterParam* pInsertParam,
126,868✔
1627
                                  SSubmitTbData* tbData) {
1628
  int32_t code = TSDB_CODE_SUCCESS;
126,868✔
1629

1630
  tbData->suid = 0;
126,868✔
1631

1632
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
126,868✔
1633
  if (NULL == tbData->pCreateTbReq) {
126,868✔
1634
    goto _end;
×
1635
  }
1636
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
126,868✔
1637
  tbData->pCreateTbReq->type = TSDB_NORMAL_TABLE;
126,868✔
1638
  tbData->pCreateTbReq->flags |= (TD_CREATE_NORMAL_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
126,868✔
1639
  tbData->pCreateTbReq->uid = 0;
126,868✔
1640
  tbData->sver = pInsertParam->sver;
126,868✔
1641

1642
  tbData->pCreateTbReq->name = taosStrdup(pInsertParam->tbname);
126,868✔
1643
  if (!tbData->pCreateTbReq->name) return terrno;
126,868✔
1644

1645
  int32_t numOfCols = pInsertParam->pFields->size;
126,868✔
1646
  tbData->pCreateTbReq->ntb.schemaRow.nCols = numOfCols;
126,868✔
1647
  tbData->pCreateTbReq->ntb.schemaRow.version = 1;
126,868✔
1648

1649
  tbData->pCreateTbReq->ntb.schemaRow.pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema));
126,868✔
1650
  if (NULL == tbData->pCreateTbReq->ntb.schemaRow.pSchema) {
126,868✔
1651
    goto _end;
×
1652
  }
1653
  for (int32_t i = 0; i < numOfCols; ++i) {
722,858✔
1654
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
595,990✔
1655
    if (NULL == pField) {
595,990✔
1656
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1657
      goto _end;
×
1658
    }
1659
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].colId = i + 1;
595,990✔
1660
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].type = pField->type;
595,990✔
1661
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].bytes = pField->bytes;
595,990✔
1662
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].flags = pField->flags;
595,990✔
1663
    if (i == 0 && pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
595,990✔
1664
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1665
      qError("buildNormalTableCreateReq, the first column must be timestamp.");
×
1666
      goto _end;
×
1667
    }
1668
    snprintf(tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].name, TSDB_COL_NAME_LEN, "%s", pField->name);
595,990✔
1669
    if (IS_DECIMAL_TYPE(pField->type)) {
595,990✔
1670
      if (!tbData->pCreateTbReq->pExtSchemas) {
×
1671
        tbData->pCreateTbReq->pExtSchemas = taosMemoryCalloc(numOfCols, sizeof(SExtSchema));
×
1672
        if (NULL == tbData->pCreateTbReq->pExtSchemas) {
×
1673
          tdDestroySVCreateTbReq(tbData->pCreateTbReq);
×
1674
          tbData->pCreateTbReq = NULL;
×
1675
          return terrno;
×
1676
        }
1677
      }
1678
      tbData->pCreateTbReq->pExtSchemas[i].typeMod = pField->typeMod;
×
1679
    }
1680
  }
1681
  return TSDB_CODE_SUCCESS;
126,868✔
1682
_end:
×
1683
  return code;
×
1684
}
1685

1686
// reference tBuildTSchema funciton
1687
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema) {
444,682✔
1688
  int32_t code = TSDB_CODE_SUCCESS;
444,682✔
1689

1690
  int32_t   numOfCols = pInsertParam->pFields->size;
444,682✔
1691
  STSchema* pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
445,105✔
1692
  if (NULL == pTSchema) {
445,105✔
1693
    return terrno;
×
1694
  }
1695
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
445,105✔
1696
    pTSchema->version =
126,868✔
1697
        1;  // normal table version start from 1, if has exist table, it will be reset by resetInserterTbVersion
1698
  } else {
1699
    pTSchema->version = pInsertParam->sver;
318,237✔
1700
  }
1701
  pTSchema->numOfCols = numOfCols;
445,105✔
1702

1703
  SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, 0);
445,105✔
1704
  if (NULL == pField) {
445,105✔
1705
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1706
    goto _end;
×
1707
  }
1708
  pTSchema->columns[0].colId = PRIMARYKEY_TIMESTAMP_COL_ID;
445,105✔
1709
  pTSchema->columns[0].type = pField->type;
445,105✔
1710
  pTSchema->columns[0].flags = pField->flags;
445,105✔
1711
  pTSchema->columns[0].bytes = TYPE_BYTES[pField->type];
445,105✔
1712
  pTSchema->columns[0].offset = -1;
445,105✔
1713

1714
  pTSchema->tlen = 0;
445,105✔
1715
  pTSchema->flen = 0;
445,105✔
1716
  for (int32_t i = 1; i < numOfCols; ++i) {
2,118,315✔
1717
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
1,673,633✔
1718
    if (NULL == pField) {
1,673,210✔
1719
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1720
      goto _end;
×
1721
    }
1722
    pTSchema->columns[i].colId = i + 1;
1,673,210✔
1723
    pTSchema->columns[i].type = pField->type;
1,673,210✔
1724
    pTSchema->columns[i].flags = pField->flags;
1,673,210✔
1725
    pTSchema->columns[i].bytes = pField->bytes;
1,672,787✔
1726
    pTSchema->columns[i].offset = pTSchema->flen;
1,673,210✔
1727

1728
    if (IS_VAR_DATA_TYPE(pField->type)) {
1,673,210✔
1729
      pTSchema->columns[i].bytes = pField->bytes;
26,781✔
1730
      pTSchema->tlen += (TYPE_BYTES[pField->type] + pField->bytes);
27,204✔
1731
    } else {
1732
      pTSchema->columns[i].bytes = TYPE_BYTES[pField->type];
1,646,793✔
1733
      pTSchema->tlen += TYPE_BYTES[pField->type];
1,646,429✔
1734
    }
1735

1736
    pTSchema->flen += TYPE_BYTES[pField->type];
1,673,210✔
1737
  }
1738

1739
#if 1
1740
  pTSchema->tlen += (int32_t)TD_BITMAP_BYTES(numOfCols);
444,682✔
1741
#endif
1742

1743
_end:
444,318✔
1744
  if (code != TSDB_CODE_SUCCESS) {
444,318✔
1745
    taosMemoryFree(pTSchema);
×
1746
    *ppTSchema = NULL;
×
1747
  } else {
1748
    *ppTSchema = pTSchema;
444,318✔
1749
  }
1750
  return code;
444,318✔
1751
}
1752

1753
static int32_t getTagValsFromStreamInserterInfo(SStreamDataInserterInfo* pInserterInfo, int32_t preCols,
317,487✔
1754
                                                SArray** ppTagVals) {
1755
  int32_t code = TSDB_CODE_SUCCESS;
317,487✔
1756
  int32_t nTags = pInserterInfo->pTagVals->size;
317,487✔
1757
  *ppTagVals = taosArrayInit(nTags, sizeof(STagVal));
317,026✔
1758
  if (!ppTagVals) {
318,237✔
1759
    return terrno;
×
1760
  }
1761
  for (int32_t i = 0; i < pInserterInfo->pTagVals->size; ++i) {
857,401✔
1762
    SStreamTagInfo* pTagInfo = taosArrayGet(pInserterInfo->pTagVals, i);
539,528✔
1763
    STagVal         tagVal = {
540,011✔
1764
                .cid = preCols + i + 1,
540,011✔
1765
                .type = pTagInfo->val.data.type,
540,011✔
1766
    };
1767
    if (!pTagInfo->val.isNull) {
540,011✔
1768
      if (IS_VAR_DATA_TYPE(pTagInfo->val.data.type)) {
539,528✔
1769
        tagVal.nData = pTagInfo->val.data.nData;
371,320✔
1770
        tagVal.pData = pTagInfo->val.data.pData;
371,417✔
1771
      } else {
1772
        tagVal.i64 = pTagInfo->val.data.val;
168,208✔
1773
      }
1774

1775
      if (NULL == taosArrayPush(*ppTagVals, &tagVal)) {
1,078,789✔
1776
        code = terrno;
×
1777
        goto _end;
×
1778
      }
1779
    }
1780
  }
1781
_end:
317,851✔
1782
  if (code != TSDB_CODE_SUCCESS) {
317,851✔
1783
    taosArrayDestroy(*ppTagVals);
×
1784
    *ppTagVals = NULL;
×
1785
  }
1786
  return code;
317,368✔
1787
}
1788

1789
static int32_t buildStreamSubTableCreateReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
317,450✔
1790
                                            SStreamInserterParam* pInsertParam, SStreamDataInserterInfo* pInserterInfo,
1791
                                            SSubmitTbData* tbData) {
1792
  int32_t code = TSDB_CODE_SUCCESS;
317,450✔
1793
  STag*   pTag = NULL;
317,450✔
1794
  SArray* pTagVals = NULL;
318,237✔
1795
  SArray* TagNames = NULL;
318,237✔
1796

1797
  if (pInsertParam->pTagFields == NULL) {
318,237✔
1798
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagFields is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
×
1799
                 pInsertParam->sver);
1800
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1801
  }
1802
  if (pInserterInfo->pTagVals == NULL || pInserterInfo->pTagVals->size == 0) {
318,237✔
1803
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagVals is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
386✔
1804
                 pInsertParam->sver);
1805
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1806
  }
1807
  if (pInsertParam->suid <= 0 || pInsertParam->sver <= 0) {
317,851✔
1808
    ST_TASK_ELOG("buildStreamSubTableCreateReq, suid:%" PRId64
×
1809
                 ", sver:%d"
1810
                 " must be greater than 0",
1811
                 pInsertParam->suid, pInsertParam->sver);
1812
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1813
  }
1814
  int32_t nTags = pInserterInfo->pTagVals->size;
317,851✔
1815

1816
  TagNames = taosArrayInit(nTags, TSDB_COL_NAME_LEN);
318,237✔
1817
  if (!TagNames) {
317,873✔
1818
    code = terrno;
×
1819
    goto _end;
×
1820
  }
1821
  for (int32_t i = 0; i < nTags; ++i) {
857,884✔
1822
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pTagFields, i);
539,647✔
1823
    if (NULL == taosArrayPush(TagNames, pField->name)) {
1,079,658✔
1824
      code = terrno;
×
1825
      goto _end;
×
1826
    }
1827
  }
1828

1829
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
318,237✔
1830
  tbData->uid = 0;
317,873✔
1831
  tbData->suid = pInsertParam->suid;
317,873✔
1832
  tbData->sver = pInsertParam->sver;
317,873✔
1833

1834
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
317,873✔
1835
  if (NULL == tbData->pCreateTbReq) {
317,873✔
1836
    code = terrno;
×
1837
    goto _end;
×
1838
  }
1839
  tbData->pCreateTbReq->type = TSDB_CHILD_TABLE;
317,873✔
1840
  tbData->pCreateTbReq->flags |= (TD_CREATE_SUB_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
317,123✔
1841

1842
  code = getTagValsFromStreamInserterInfo(pInserterInfo, pInsertParam->pFields->size, &pTagVals);
317,873✔
1843
  if (code != TSDB_CODE_SUCCESS) {
317,368✔
1844
    goto _end;
×
1845
  }
1846

1847
  code = tTagNew(pTagVals, pInsertParam->sver, false, &pTag);
317,368✔
1848
  if (code != TSDB_CODE_SUCCESS) {
317,390✔
1849
    ST_TASK_ELOG("failed to create tag, error:%s", tstrerror(code));
×
1850
    goto _end;
×
1851
  }
1852
  code = inserterBuildCreateTbReq(tbData->pCreateTbReq, pInserterInfo->tbName, pTag, tbData->suid,
317,873✔
1853
                                  pInsertParam->stbname, TagNames, nTags, TSDB_DEFAULT_TABLE_TTL);
317,390✔
1854
  if (code != TSDB_CODE_SUCCESS) {
317,390✔
1855
    ST_TASK_ELOG("failed to build create table request, error:%s", tstrerror(code));
×
1856
    goto _end;
×
1857
  }
1858

1859
_end:
317,390✔
1860
  if (code != TSDB_CODE_SUCCESS) {
317,873✔
1861
    ST_TASK_ELOG("buildStreamSubTableCreateReq failed, error:%s", tstrerror(code));
×
1862
    if (tbData->pCreateTbReq) {
×
1863
      taosMemoryFreeClear(tbData->pCreateTbReq->name);
×
1864
      taosMemoryFreeClear(tbData->pCreateTbReq);
×
1865
    }
1866
    if (TagNames) {
×
1867
      taosArrayDestroy(TagNames);
×
1868
    }
1869
  }
1870

1871
  if (pTagVals) {
317,873✔
1872
    taosArrayDestroy(pTagVals);
317,873✔
1873
  }
1874
  return code;
317,754✔
1875
}
1876

1877
static int32_t appendInsertData(SStreamInserterParam* pInsertParam, const SSDataBlock* pDataBlock,
5,024,795✔
1878
                                SSubmitTbData* tbData, STSchema* pTSchema, SBuildInsertDataInfo* dataInsertInfo) {
1879
  int32_t code = TSDB_CODE_SUCCESS;
5,024,795✔
1880
  int32_t lino = 0;
5,024,795✔
1881

1882
  int32_t rows = pDataBlock ? pDataBlock->info.rows : 0;
5,024,795✔
1883
  int32_t numOfCols = pInsertParam->pFields->size;
5,024,795✔
1884
  int32_t colNum = pDataBlock ? taosArrayGetSize(pDataBlock->pDataBlock) : 0;
5,024,392✔
1885

1886
  SArray* pVals = NULL;
5,024,795✔
1887
  if (!(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
5,024,795✔
1888
    code = terrno;
×
1889
    QUERY_CHECK_CODE(code, lino, _end);
×
1890
  }
1891

1892
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
50,776,744✔
1893
    taosArrayClear(pVals);
45,752,370✔
1894

1895
    bool tsOrPrimaryKeyIsNull = false;
45,750,678✔
1896
    for (int32_t k = 0; k < numOfCols; ++k) {  // iterate by column
155,100,206✔
1897
      int16_t colIdx = k + 1;
109,306,576✔
1898

1899
      SFieldWithOptions* pCol = taosArrayGet(pInsertParam->pFields, k);
109,306,576✔
1900
      if (PRIMARYKEY_TIMESTAMP_COL_ID != colIdx && TSDB_DATA_TYPE_NULL == pCol->type) {
109,313,819✔
1901
        SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
×
1902
        if (NULL == taosArrayPush(pVals, &cv)) {
×
1903
          code = terrno;
×
1904
          QUERY_CHECK_CODE(code, lino, _end);
×
1905
        }
1906
        continue;
×
1907
      }
1908

1909
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
109,312,550✔
1910
      if (NULL == pColInfoData) {
109,267,614✔
1911
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1912
        QUERY_CHECK_CODE(code, lino, _end);
×
1913
      }
1914
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
109,267,614✔
1915

1916
      if (colDataIsNull_s(pColInfoData, j) && (pCol->flags & COL_IS_KEY)) {
218,725,886✔
1917
        tsOrPrimaryKeyIsNull = true;
52,292✔
1918
        qDebug("Primary key column should not be null, skip this row");
52,292✔
1919
        break;
52,292✔
1920
      }
1921
      switch (pColInfoData->info.type) {
109,363,998✔
1922
        case TSDB_DATA_TYPE_NCHAR:
6,192,365✔
1923
        case TSDB_DATA_TYPE_VARBINARY:
1924
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1925
          if (pColInfoData->info.type != pCol->type) {
6,192,365✔
1926
            qError("tb:%s column:%d type:%d in block dismatch with schema col:%d type:%d", pInsertParam->tbname, k,
×
1927
                   pColInfoData->info.type, k, pCol->type);
1928
            code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1929
            QUERY_CHECK_CODE(code, lino, _end);
×
1930
          }
1931
          if (colDataIsNull_s(pColInfoData, j)) {
12,384,730✔
1932
            SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
3,972✔
1933
            if (NULL == taosArrayPush(pVals, &cv)) {
3,972✔
1934
              code = terrno;
×
1935
              QUERY_CHECK_CODE(code, lino, _end);
×
1936
            }
1937
          } else {
1938
            if (pColInfoData->pData == NULL) {
6,188,393✔
1939
              qError("build insert tb:%s, column:%d data is NULL in block", pInsertParam->tbname, k);
×
1940
              code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1941
              QUERY_CHECK_CODE(code, lino, _end);
×
1942
            }
1943
            void*  data = colDataGetVarData(pColInfoData, j);
6,188,393✔
1944
            SValue sv = (SValue){
18,565,179✔
1945
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
6,188,393✔
1946
            SColVal cv = COL_VAL_VALUE(colIdx, sv);
6,188,393✔
1947
            if (NULL == taosArrayPush(pVals, &cv)) {
6,188,393✔
1948
              code = terrno;
×
1949
              QUERY_CHECK_CODE(code, lino, _end);
×
1950
            }
1951
          }
1952
          break;
6,192,365✔
1953
        }
1954
        case TSDB_DATA_TYPE_BLOB:
×
1955
        case TSDB_DATA_TYPE_JSON:
1956
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1957
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1958
          code = TSDB_CODE_APP_ERROR;
×
1959
          QUERY_CHECK_CODE(code, lino, _end);
×
1960
          break;
×
1961
        default:
103,174,171✔
1962
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
103,174,171✔
1963
            if (colDataIsNull_s(pColInfoData, j)) {
206,346,187✔
1964
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx) {
1,141,564✔
1965
                tsOrPrimaryKeyIsNull = true;
4,005✔
1966
                qDebug("Primary timestamp column should not be null, skip this row");
4,005✔
1967
                break;
4,005✔
1968
              }
1969

1970
              SColVal cv = COL_VAL_NULL(colIdx, pCol->type);  // should use pCol->type
1,137,559✔
1971
              if (NULL == taosArrayPush(pVals, &cv)) {
1,137,962✔
1972
                code = terrno;
×
1973
                QUERY_CHECK_CODE(code, lino, _end);
×
1974
              }
1975
            } else {
1976
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx && !dataInsertInfo->needSortMerge) {
102,030,069✔
1977
                if (*(int64_t*)var <= dataInsertInfo->lastTs) {
3,128,788✔
1978
                  dataInsertInfo->needSortMerge = true;
35,063✔
1979
                } else {
1980
                  dataInsertInfo->lastTs = *(int64_t*)var;
3,093,322✔
1981
                }
1982
              }
1983

1984
              SValue sv = {.type = pCol->type};
102,030,915✔
1985
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
102,005,585✔
1986
              SColVal cv = COL_VAL_VALUE(colIdx, sv);
102,011,990✔
1987
              if (NULL == taosArrayPush(pVals, &cv)) {
102,003,555✔
1988
                code = terrno;
×
1989
                QUERY_CHECK_CODE(code, lino, _end);
×
1990
              }
1991
            }
1992
          } else {
1993
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1994
            code = TSDB_CODE_APP_ERROR;
16,054✔
1995
            QUERY_CHECK_CODE(code, lino, _end);
16,054✔
1996
          }
1997
          break;
103,158,412✔
1998
      }
1999
      if (tsOrPrimaryKeyIsNull) break;  // skip remaining columns because the primary key is null
109,353,533✔
2000
    }
2001
    if (tsOrPrimaryKeyIsNull) continue;  // skip this row if primary key is null
45,849,927✔
2002
    SRow*             pRow = NULL;
45,793,630✔
2003
    SRowBuildScanInfo sinfo = {0};
45,698,611✔
2004
    if ((code = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) != TSDB_CODE_SUCCESS) {
45,698,611✔
2005
      QUERY_CHECK_CODE(code, lino, _end);
×
2006
    }
2007
    if (NULL == taosArrayPush(tbData->aRowP, &pRow)) {
91,384,955✔
2008
      taosMemFree(pRow);
×
2009
      code = terrno;
×
2010
      QUERY_CHECK_CODE(code, lino, _end);
×
2011
    }
2012
  }
2013
  if (dataInsertInfo->isLastBlock) {
5,024,374✔
2014
    int32_t nRows = taosArrayGetSize(tbData->aRowP);
5,022,039✔
2015
    if (taosArrayGetSize(tbData->aRowP) == 0) {
5,022,039✔
2016
      tbData->flags |= SUBMIT_REQ_ONLY_CREATE_TABLE;
4,208,459✔
2017
      stDebug("no valid data to insert, try to only create tabale:%s", pInsertParam->tbname);
4,208,459✔
2018
    }
2019
    stDebug("appendInsertData, isLastBlock:%d, needSortMerge:%d, totalRows:%d", dataInsertInfo->isLastBlock,
5,021,613✔
2020
            dataInsertInfo->needSortMerge, nRows);
2021
    if (dataInsertInfo->needSortMerge) {
5,022,460✔
2022
      if ((tRowSort(tbData->aRowP) != TSDB_CODE_SUCCESS) ||
70,126✔
2023
          (code = tRowMerge(tbData->aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
35,063✔
2024
        QUERY_CHECK_CODE(code, lino, _end);
×
2025
      }
2026
    }
2027
    nRows = taosArrayGetSize(tbData->aRowP);
5,022,863✔
2028
    stDebug("appendInsertData, after merge, totalRows:%d", nRows);
5,022,057✔
2029
  }
2030

2031
_end:
267,920✔
2032
  taosArrayDestroy(pVals);
5,024,392✔
2033
  return code;
5,023,937✔
2034
}
2035

2036
int32_t buildStreamSubmitReqFromBlock(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
5,024,795✔
2037
                                      SStreamDataInserterInfo* pInserterInfo, SSubmitReq2** ppReq,
2038
                                      const SSDataBlock* pDataBlock, SVgroupInfo* vgInfo,
2039
                                      SBuildInsertDataInfo* tbDataInfo) {
2040
  SSubmitReq2* pReq = *ppReq;
5,024,795✔
2041
  int32_t      numOfBlks = 0;
5,024,392✔
2042

2043
  int32_t               code = TSDB_CODE_SUCCESS;
5,024,392✔
2044
  int32_t               lino = 0;
5,024,392✔
2045
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
5,024,392✔
2046
  SInsertTableInfo**    ppTbInfo = NULL;
5,024,392✔
2047
  SInsertTableInfo*     pTbInfo = NULL;
5,024,392✔
2048
  STSchema*             pTSchema = NULL;
5,024,392✔
2049
  SSubmitTbData*        tbData = &tbDataInfo->pTbData;
5,024,392✔
2050
  int32_t               colNum = 0;
5,024,795✔
2051
  int32_t               rows = 0;
5,024,795✔
2052

2053
  if (NULL == pReq) {
5,024,795✔
2054
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
5,022,460✔
2055
      code = terrno;
×
2056
      QUERY_CHECK_CODE(code, lino, _end);
×
2057
    }
2058
    *ppReq = pReq;
5,022,460✔
2059

2060
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
5,022,460✔
2061
      code = terrno;
×
2062
      QUERY_CHECK_CODE(code, lino, _end);
×
2063
    }
2064
  }
2065

2066
  if (pDataBlock) {
5,024,775✔
2067
    colNum = taosArrayGetSize(pDataBlock->pDataBlock);
819,094✔
2068
    rows = pDataBlock->info.rows;
818,691✔
2069
  }
2070

2071
  tbData->flags |= SUBMIT_REQ_SCHEMA_RES;
5,024,372✔
2072

2073
  if (tbDataInfo->isFirstBlock) {
5,024,392✔
2074
    if (pInserterInfo->isAutoCreateTable) {
5,022,863✔
2075
      code = initTableInfo(pInserter, pInserterInfo);
445,105✔
2076
      QUERY_CHECK_CODE(code, lino, _end);
445,105✔
2077
      if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
445,105✔
2078
        code = buildNormalTableCreateReq(pInserter, pInsertParam, tbData);
126,868✔
2079
      } else if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
318,237✔
2080
        code = buildStreamSubTableCreateReq(pTask, pInserter, pInsertParam, pInserterInfo, tbData);
318,237✔
2081
      } else {
2082
        code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
2083
        ST_TASK_ELOG("buildStreamSubmitReqFromBlock, unknown table type %d", pInsertParam->tbType);
×
2084
      }
2085
      QUERY_CHECK_CODE(code, lino, _end);
444,622✔
2086
    }
2087
  }
2088

2089
  code = getStreamInsertTableInfo(pInserterInfo->streamId, pInserterInfo->groupId, &ppTbInfo);
5,023,486✔
2090
  pTbInfo = *ppTbInfo;
5,024,431✔
2091
  if (tbDataInfo->isFirstBlock) {
5,024,431✔
2092
    if (!pInserterInfo->isAutoCreateTable) {
5,022,499✔
2093
      tstrncpy(pInserterInfo->tbName, pTbInfo->tbname, TSDB_TABLE_NAME_LEN);
4,577,758✔
2094
    }
2095

2096
    tbData->uid = pTbInfo->uid;
5,022,499✔
2097
    tbData->sver = pTbInfo->version;
5,022,499✔
2098

2099
    if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
5,021,977✔
2100
      tbData->suid = pInsertParam->suid;
4,619,355✔
2101
    }
2102

2103
    pTSchema = pTbInfo->pSchema;
5,023,227✔
2104
  } else {
2105
    pTSchema = pTbInfo->pSchema;
1,932✔
2106
  }
2107

2108
  code = getTableVgInfo(pInserter, pInsertParam->dbFName, pTbInfo->tbname, vgInfo);
5,024,795✔
2109
  QUERY_CHECK_CODE(code, lino, _end);
5,024,392✔
2110

2111
  ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64 " tbname:%s autoCreate:%d uid:%" PRId64 " suid:%" PRId64
5,024,392✔
2112
               " sver:%d vgid:%d isLastBlock:%d",
2113
               pInserter, pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, tbData->uid,
2114
               tbData->suid, tbData->sver, vgInfo->vgId, tbDataInfo->isFirstBlock);
2115

2116
  code = appendInsertData(pInsertParam, pDataBlock, tbData, pTSchema, tbDataInfo);
5,024,392✔
2117
  QUERY_CHECK_CODE(code, lino, _end);
5,024,058✔
2118

2119
_end:
5,024,058✔
2120
  releaseStreamInsertTableInfo(ppTbInfo);
5,024,392✔
2121
  if (code != TSDB_CODE_SUCCESS) {
5,024,795✔
2122
    ST_TASK_ELOG("buildStreamSubmitReqFromBlock, code:0x%0x, groupId:%" PRId64 " tbname:%s autoCreate:%d", code,
×
2123
                 pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable);
2124
  }
2125
  return code;
5,024,795✔
2126
}
2127

2128
int32_t streamDataBlocksToSubmitReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
5,021,326✔
2129
                                    SStreamDataInserterInfo* pInserterInfo, void** pMsg, int32_t* msgLen,
2130
                                    SVgroupInfo* vgInfo) {
2131
  int32_t code = 0;
5,021,326✔
2132
  int32_t lino = 0;
5,021,326✔
2133

2134
  const SArray*        pBlocks = pInserter->pDataBlocks;
5,021,326✔
2135
  int32_t              sz = taosArrayGetSize(pBlocks);
5,022,380✔
2136
  SSubmitReq2*         pReq = NULL;
5,022,380✔
2137
  SBuildInsertDataInfo tbDataInfo = {0};
5,022,380✔
2138

2139
  int32_t rows = 0;
5,021,957✔
2140
  for (int32_t i = 0; i < sz; i++) {
10,046,752✔
2141
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
5,023,889✔
2142
    if (NULL == pDataBlock) {
5,023,889✔
2143
      stDebug("data block is NULL, just create empty table");
4,205,278✔
2144
      continue;
4,205,701✔
2145
    }
2146
    rows += pDataBlock->info.rows;
818,611✔
2147
  }
2148
  code = initInsertProcessInfo(&tbDataInfo, rows);
5,022,863✔
2149
  if (code != TSDB_CODE_SUCCESS) {
5,022,499✔
2150
    ST_TASK_ELOG("streamDataBlocksToSubmitReq, initInsertDataInfo failed, code:%d", code);
364✔
2151
    return code;
×
2152
  }
2153

2154
  for (int32_t i = 0; i < sz; i++) {
10,046,930✔
2155
    tbDataInfo.isFirstBlock = (i == 0);
5,023,681✔
2156
    tbDataInfo.isLastBlock = (i == sz - 1);
5,023,681✔
2157
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
5,023,681✔
2158
    ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64
5,023,681✔
2159
            " tbname:%s autoCreate:%d block: %d/%d rows:%" PRId64,
2160
            pInserter, pInserterInfo->groupId, pInserterInfo->tbName,
2161
            pInserterInfo->isAutoCreateTable, i + 1, sz, (pDataBlock != NULL ? pDataBlock->info.rows : 0));
2162
    code = buildStreamSubmitReqFromBlock(pTask, pInserter, pInserterInfo, &pReq, pDataBlock, vgInfo, &tbDataInfo);
5,025,278✔
2163
    QUERY_CHECK_CODE(code, lino, _end);
5,024,795✔
2164
  }
2165

2166
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbDataInfo.pTbData)) {
10,046,112✔
2167
    code = terrno;
×
2168
    QUERY_CHECK_CODE(code, lino, _end);
×
2169
  }
2170

2171
  code = submitReqToMsg(vgInfo->vgId, pReq, pMsg, msgLen);
5,022,863✔
2172
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
5,019,140✔
2173
  taosMemoryFree(pReq);
5,021,241✔
2174
  ST_TASK_DLOG("[data inserter], submit req, vgid:%d, GROUP:%" PRId64 " tbname:%s autoCreate:%d code:%d ", vgInfo->vgId,
5,018,188✔
2175
               pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, code);
2176

2177
_end:
5,005,192✔
2178
  if (code != 0) {
5,022,460✔
2179
    tDestroySubmitTbData(&tbDataInfo.pTbData, TSDB_MSG_FLG_ENCODE);
×
2180
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
2181
    taosMemoryFree(pReq);
×
2182
  }
2183

2184
  return code;
5,022,465✔
2185
}
2186

2187
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
66,776✔
2188
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
66,776✔
2189
  if (!pInserter->explain) {
66,776✔
2190
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
123,238✔
2191
      return terrno;
×
2192
    }
2193
    if (pInserter->isStbInserter) {
61,619✔
2194
      SArray* pMsgs = taosArrayInit(4, sizeof(POINTER_BYTES));
1,884✔
2195
      if (NULL == pMsgs) {
1,884✔
2196
        return terrno;
×
2197
      }
2198
      int32_t code = dataBlocksToSubmitReqArray(pInserter, pMsgs);
1,884✔
2199
      if (code) {
1,884✔
2200
        taosArrayDestroyP(pMsgs, destroySSubmitTbDataMsg);
×
2201
        return code;
×
2202
      }
2203
      taosArrayClear(pInserter->pDataBlocks);
1,884✔
2204
      for (int32_t i = 0; i < taosArrayGetSize(pMsgs); ++i) {
5,652✔
2205
        SSubmitTbDataMsg* pMsg = taosArrayGetP(pMsgs, i);
3,768✔
2206
        code = sendSubmitRequest(pInserter, NULL, pMsg->pData, pMsg->len,
7,536✔
2207
                                 pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
7,536✔
2208
        taosMemoryFree(pMsg);
3,768✔
2209
        if (code) {
3,768✔
2210
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
2211
            SSubmitTbDataMsg* pMsg2 = taosArrayGetP(pMsgs, j);
×
2212
            destroySSubmitTbDataMsg(pMsg2);
×
2213
          }
2214
          taosArrayDestroy(pMsgs);
×
2215
          return code;
×
2216
        }
2217
        QRY_ERR_RET(tsem_wait(&pInserter->ready));
3,768✔
2218

2219
        if (pInserter->submitRes.code) {
3,768✔
2220
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
2221
            SSubmitTbDataMsg* pMsg2 = taosArrayGetP(pMsgs, j);
×
2222
            destroySSubmitTbDataMsg(pMsg2);
×
2223
          }
2224
          taosArrayDestroy(pMsgs);
×
2225
          return pInserter->submitRes.code;
×
2226
        }
2227
      }
2228

2229
      taosArrayDestroy(pMsgs);
1,884✔
2230

2231
    } else {
2232
      void*   pMsg = NULL;
59,735✔
2233
      int32_t msgLen = 0;
59,735✔
2234
      int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
59,735✔
2235
      if (code) {
59,735✔
2236
        return code;
7,920✔
2237
      }
2238

2239
      taosArrayClear(pInserter->pDataBlocks);
51,815✔
2240

2241
      code = sendSubmitRequest(pInserter, NULL, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc,
51,815✔
2242
                               &pInserter->pNode->epSet);
51,815✔
2243
      if (code) {
51,815✔
2244
        return code;
×
2245
      }
2246

2247
      QRY_ERR_RET(tsem_wait(&pInserter->ready));
51,815✔
2248

2249
      if (pInserter->submitRes.code) {
51,815✔
2250
        return pInserter->submitRes.code;
×
2251
      }
2252
    }
2253
  }
2254

2255
  *pContinue = true;
58,856✔
2256

2257
  return TSDB_CODE_SUCCESS;
58,856✔
2258
}
2259

2260
static int32_t resetInserterTbVersion(SDataInserterHandle* pInserter, const SInputData* pInput) {
28,469✔
2261
  SInsertTableInfo** ppTbInfo = NULL;
28,469✔
2262
  int32_t           code = getStreamInsertTableInfo(pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId, &ppTbInfo);
28,469✔
2263
  if (code != TSDB_CODE_SUCCESS) {
28,469✔
2264
    return code;
×
2265
  }
2266

2267
  SInsertTableInfo*  pTbInfo  = *ppTbInfo;
28,469✔
2268
  stDebug("resetInserterTbVersion, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbName:%s, uid:%" PRId64 ", version:%d",
28,469✔
2269
          pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId,
2270
          pInput->pStreamDataInserterInfo->tbName, pTbInfo->uid, pTbInfo->version);
2271
  if (pInserter->pParam->streamInserterParam->tbType != TSDB_NORMAL_TABLE) {
28,469✔
2272
    pInserter->pParam->streamInserterParam->sver = pTbInfo->version;
1,596✔
2273
  }
2274
  code = releaseStreamInsertTableInfo(ppTbInfo);
28,469✔
2275
  return code;
28,469✔
2276
}
2277

2278
static int32_t putStreamDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
4,993,221✔
2279
  int32_t              code = 0;
4,993,221✔
2280
  int32_t              lino = 0;
4,993,221✔
2281
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
4,993,221✔
2282
  SStreamRunnerTask*   pTask = pInput->pTask;
4,993,221✔
2283
  if (!pInserter || !pInserter->pParam || !pInserter->pParam->streamInserterParam) {
4,993,911✔
2284
    ST_TASK_ELOG("putStreamDataBlock invalid param, pInserter: %p, pParam:%p", pInserter,
×
2285
                 pInserter ? pInserter->pParam : NULL);
2286
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2287
  }
2288
  if (!pInserter->explain) {
4,993,991✔
2289
    code = TSDB_CODE_SUCCESS;
4,993,480✔
2290
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
9,987,451✔
2291
      return terrno;
×
2292
    }
2293
    void*       pMsg = NULL;
4,993,971✔
2294
    int32_t     msgLen = 0;
4,993,971✔
2295
    SVgroupInfo vgInfo = {0};
4,993,971✔
2296

2297
    code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
4,993,971✔
2298
    QUERY_CHECK_CODE(code, lino, _return);
4,993,593✔
2299

2300
    code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
4,993,598✔
2301
                             pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
4,993,593✔
2302
    QUERY_CHECK_CODE(code, lino, _return);
4,994,394✔
2303

2304
    code = tsem_wait(&pInserter->ready);
4,994,394✔
2305
    QUERY_CHECK_CODE(code, lino, _return);
4,994,394✔
2306

2307
    if (pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
4,994,394✔
2308
      pInput->pStreamDataInserterInfo->isAutoCreateTable = false;
28,469✔
2309
      code = resetInserterTbVersion(pInserter, pInput);
28,469✔
2310
      QUERY_CHECK_CODE(code, lino, _return);
28,469✔
2311

2312
      code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
28,469✔
2313
      QUERY_CHECK_CODE(code, lino, _return);
28,469✔
2314

2315
      code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
28,469✔
2316
                               pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
28,469✔
2317
      QUERY_CHECK_CODE(code, lino, _return);
28,469✔
2318

2319
      code = tsem_wait(&pInserter->ready);
28,469✔
2320
      QUERY_CHECK_CODE(code, lino, _return);
28,469✔
2321
    }
2322

2323
    if (pInput->pStreamDataInserterInfo->isAutoCreateTable &&
4,994,394✔
2324
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
416,636✔
2325
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
1,932✔
2326
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
1,932✔
2327
                   ", tbName:%s. so reset dbVgInfo and try again",
2328
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2329
      return putStreamDataBlock(pHandle, pInput, pContinue);
1,932✔
2330
    }
2331

2332
    if ((pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_NOT_EXIST &&
4,992,462✔
2333
         !pInput->pStreamDataInserterInfo->isAutoCreateTable) ||
1,167✔
2334
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
4,991,295✔
2335
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
1,167✔
2336
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
1,167✔
2337
                   ", tbName:%s. so reset dbVgInfo",
2338
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2339
      code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
1,167✔
2340
      QUERY_CHECK_CODE(code, lino, _return);
1,167✔
2341
    }
2342

2343
    if (pInserter->submitRes.code) {
4,991,295✔
2344
      code = pInserter->submitRes.code;
2,152✔
2345
      ST_TASK_ELOG("submitRes err:%s, code:%0x", tstrerror(pInserter->submitRes.code), pInserter->submitRes.code);
2,152✔
2346
      QUERY_CHECK_CODE(code, lino, _return);
2,152✔
2347
    }
2348

2349
    *pContinue = true;
4,989,143✔
2350

2351
  _return:
4,992,462✔
2352
    taosArrayClear(pInserter->pDataBlocks);
4,992,462✔
2353
    if (code == TSDB_CODE_STREAM_NO_DATA) {
4,992,462✔
2354
      ST_TASK_DLOG("putStreamDataBlock, no valid data to insert, skip this block, groupID:%" PRId64,
×
2355
                   pInput->pStreamDataInserterInfo->groupId);
2356
      code = TSDB_CODE_SUCCESS;
×
2357
    } else if (code) {
4,992,462✔
2358
      ST_TASK_ELOG("submitRes err:%s, code:%0x lino:%d", tstrerror(code), code, lino);
3,319✔
2359
      return code;
3,319✔
2360
    }
2361
    return code;
4,989,143✔
2362
  }
2363
  return TSDB_CODE_SUCCESS;
914✔
2364
}
2365

2366
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
58,439✔
2367
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
58,439✔
2368
  (void)taosThreadMutexLock(&pInserter->mutex);
58,439✔
2369
  pInserter->queryEnd = true;
58,439✔
2370
  pInserter->useconds = useconds;
58,439✔
2371
  (void)taosThreadMutexUnlock(&pInserter->mutex);
58,439✔
2372
}
58,439✔
2373

2374
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
58,439✔
2375
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
58,439✔
2376
  *pLen = pDispatcher->submitRes.affectedRows;
58,439✔
2377
  qDebug("got total affectedRows %" PRId64, *pLen);
58,439✔
2378
}
58,439✔
2379

2380
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
376,467✔
2381
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
376,467✔
2382
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
376,467✔
2383
  taosArrayDestroy(pInserter->pDataBlocks);
376,044✔
2384
  taosMemoryFree(pInserter->pSchema);
376,467✔
2385
  if (pInserter->pParam->streamInserterParam) {
376,044✔
2386
    destroyStreamInserterParam(pInserter->pParam->streamInserterParam);
309,480✔
2387
    taosMemoryFree(pInserter->pParam->readHandle); // only for stream
309,480✔
2388
  }
2389
  taosMemoryFree(pInserter->pParam);
376,044✔
2390
  taosHashCleanup(pInserter->pCols);
376,467✔
2391
  nodesDestroyNode((SNode*)pInserter->pNode);
376,467✔
2392
  pInserter->pNode = NULL;
376,467✔
2393

2394
  (void)taosThreadMutexDestroy(&pInserter->mutex);
376,467✔
2395

2396
  taosMemoryFree(pInserter->pManager);
376,044✔
2397

2398
  if (pInserter->dbVgInfoMap) {
376,467✔
2399
    taosHashSetFreeFp(pInserter->dbVgInfoMap, freeUseDbOutput_tmp);
1,884✔
2400
    taosHashCleanup(pInserter->dbVgInfoMap);
1,884✔
2401
  }
2402

2403
  if (pInserter->pTagSchema) {
376,467✔
2404
    taosMemoryFreeClear(pInserter->pTagSchema->pSchema);
2,512✔
2405
    taosMemoryFree(pInserter->pTagSchema);
2,512✔
2406
  }
2407

2408
  return TSDB_CODE_SUCCESS;
376,044✔
2409
}
2410

2411
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
2412
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
×
2413

2414
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
2415
  return TSDB_CODE_SUCCESS;
×
2416
}
2417

2418
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
66,359✔
2419
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
66,359✔
2420

2421
  *pFlags = atomic_load_64(&pDispatcher->flags);
66,359✔
2422
  return TSDB_CODE_SUCCESS;
66,359✔
2423
}
2424

2425
int32_t createDataInserter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
66,987✔
2426
                           void* pParam) {
2427
  SDataSinkNode*       pDataSink = *ppDataSink;
66,987✔
2428
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
66,987✔
2429
  if (NULL == inserter) {
66,987✔
2430
    taosMemoryFree(pParam);
×
2431
    goto _return;
×
2432
  }
2433

2434
  SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
66,987✔
2435
  inserter->sink.fPut = putDataBlock;
66,987✔
2436
  inserter->sink.fEndPut = endPut;
66,987✔
2437
  inserter->sink.fGetLen = getDataLength;
66,987✔
2438
  inserter->sink.fGetData = NULL;
66,987✔
2439
  inserter->sink.fDestroy = destroyDataSinker;
66,987✔
2440
  inserter->sink.fGetCacheSize = getCacheSize;
66,987✔
2441
  inserter->sink.fGetFlags = getSinkFlags;
66,987✔
2442
  inserter->pManager = pManager;
66,987✔
2443
  inserter->pNode = pInserterNode;
66,987✔
2444
  inserter->pParam = pParam;
66,987✔
2445
  inserter->status = DS_BUF_EMPTY;
66,987✔
2446
  inserter->queryEnd = false;
66,987✔
2447
  inserter->explain = pInserterNode->explain;
66,987✔
2448
  *ppDataSink = NULL;
66,987✔
2449

2450
  int64_t suid = 0;
66,987✔
2451
  int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
66,987✔
2452
                                                       &inserter->pSchema, &suid, &inserter->pTagSchema);
2453
  if (code) {
66,987✔
2454
    terrno = code;
×
2455
    goto _return;
×
2456
  }
2457

2458
  pManager->pAPI->metaFn.getBasicInfo(inserter->pParam->readHandle->vnode, &inserter->dbFName, NULL, NULL, NULL);
66,987✔
2459

2460
  if (pInserterNode->tableType == TSDB_SUPER_TABLE) {
66,987✔
2461
    inserter->isStbInserter = true;
2,512✔
2462
  }
2463

2464
  if (pInserterNode->stableId != suid) {
66,987✔
2465
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
×
2466
    goto _return;
×
2467
  }
2468

2469
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
66,987✔
2470
  if (NULL == inserter->pDataBlocks) {
66,987✔
2471
    goto _return;
×
2472
  }
2473
  QRY_ERR_JRET(taosThreadMutexInit(&inserter->mutex, NULL));
66,987✔
2474

2475
  inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;
66,987✔
2476

2477
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
66,987✔
2478
                                 false, HASH_NO_LOCK);
2479
  if (NULL == inserter->pCols) {
66,987✔
2480
    goto _return;
×
2481
  }
2482

2483
  SNode*  pNode = NULL;
66,987✔
2484
  int32_t i = 0;
66,987✔
2485
  bool    foundTbname = false;
66,987✔
2486
  FOREACH(pNode, pInserterNode->pCols) {
278,216✔
2487
    if (pNode->type == QUERY_NODE_FUNCTION && ((SFunctionNode*)pNode)->funcType == FUNCTION_TYPE_TBNAME) {
211,229✔
2488
      int16_t colId = 0;
1,884✔
2489
      int16_t slotId = 0;
1,884✔
2490
      QRY_ERR_JRET(taosHashPut(inserter->pCols, &colId, sizeof(colId), &slotId, sizeof(slotId)));
1,884✔
2491
      foundTbname = true;
1,884✔
2492
      continue;
1,884✔
2493
    }
2494
    SColumnNode* pCol = (SColumnNode*)pNode;
209,345✔
2495
    QRY_ERR_JRET(taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId)));
209,345✔
2496
    if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) {
209,345✔
2497
      inserter->fullOrderColList = false;
1,256✔
2498
    }
2499
    ++i;
209,345✔
2500
  }
2501

2502
  if (inserter->isStbInserter && !foundTbname) {
66,987✔
2503
    QRY_ERR_JRET(TSDB_CODE_PAR_TBNAME_ERROR);
628✔
2504
  }
2505

2506
  QRY_ERR_JRET(tsem_init(&inserter->ready, 0, 0));
66,359✔
2507

2508
  inserter->dbVgInfoMap = NULL;
66,359✔
2509

2510
  *pHandle = inserter;
66,359✔
2511
  return TSDB_CODE_SUCCESS;
66,359✔
2512

2513
_return:
628✔
2514

2515
  if (inserter) {
628✔
2516
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
628✔
2517
    taosMemoryFree(inserter);
628✔
2518
  } else {
2519
    taosMemoryFree(pManager);
×
2520
  }
2521

2522
  nodesDestroyNode((SNode*)*ppDataSink);
628✔
2523
  *ppDataSink = NULL;
628✔
2524

2525
  return terrno;
628✔
2526
}
2527

2528
                           
2529
static TdThreadOnce g_dbVgInfoMgrInit = PTHREAD_ONCE_INIT;
2530

2531
SDBVgInfoMgr g_dbVgInfoMgr = {0};
2532
                           
2533
void dbVgInfoMgrInitOnce() {
27,250✔
2534
  g_dbVgInfoMgr.dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
27,250✔
2535
  if (g_dbVgInfoMgr.dbVgInfoMap == NULL) {
27,250✔
2536
    stError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
2537
    return;
×
2538
  }
2539

2540
  taosHashSetFreeFp(g_dbVgInfoMgr.dbVgInfoMap, freeUseDbOutput_tmp);
27,250✔
2541
}
2542

2543

2544

2545
int32_t createStreamDataInserter(SDataSinkManager* pManager, DataSinkHandle* pHandle, void* pParam) {
309,480✔
2546
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
309,480✔
2547

2548
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
309,480✔
2549
  TSDB_CHECK_NULL(g_dbVgInfoMgr.dbVgInfoMap, code, lino, _exit, terrno);
309,480✔
2550

2551
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
309,480✔
2552
  TSDB_CHECK_NULL(inserter, code, lino, _exit, terrno);
309,480✔
2553

2554
  inserter->sink.fPut = putStreamDataBlock;
309,480✔
2555
  inserter->sink.fEndPut = endPut;
309,480✔
2556
  inserter->sink.fGetLen = getDataLength;
309,480✔
2557
  inserter->sink.fGetData = NULL;
309,480✔
2558
  inserter->sink.fDestroy = destroyDataSinker;
309,480✔
2559
  inserter->sink.fGetCacheSize = getCacheSize;
309,480✔
2560
  inserter->sink.fGetFlags = getSinkFlags;
309,480✔
2561
  inserter->pManager = pManager;
309,480✔
2562
  inserter->pNode = NULL;
309,480✔
2563
  inserter->pParam = pParam;
309,480✔
2564
  inserter->status = DS_BUF_EMPTY;
309,480✔
2565
  inserter->queryEnd = false;
309,480✔
2566
  inserter->explain = false;
309,480✔
2567

2568
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
309,480✔
2569
  TSDB_CHECK_NULL(inserter->pDataBlocks, code, lino, _exit, terrno);
309,480✔
2570
  
2571
  TAOS_CHECK_EXIT(taosThreadMutexInit(&inserter->mutex, NULL));
309,480✔
2572
  TAOS_CHECK_EXIT(tsem_init(&inserter->ready, 0, 0));
309,480✔
2573

2574
  inserter->dbVgInfoMap = NULL;
309,480✔
2575

2576
  *pHandle = inserter;
309,480✔
2577
  return TSDB_CODE_SUCCESS;
309,480✔
2578

2579
_exit:
×
2580

2581
  if (inserter) {
×
2582
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
×
2583
    taosMemoryFree(inserter);
×
2584
  } else {
2585
    taosMemoryFree(pManager);
×
2586
  }
2587

2588
  if (code) {
×
2589
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2590
  }
2591

2592
  return code;
×
2593
}
2594

2595
int32_t getDbVgInfoByTbName(void* clientRpc, const char* dbFName, SDBVgInfo** dbVgInfo) {
5,025,218✔
2596
  int32_t       code = TSDB_CODE_SUCCESS;
5,025,218✔
2597
  int32_t       line = 0;
5,025,218✔
2598
  SUseDbOutput* output = NULL;
5,025,218✔
2599

2600
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
5,025,218✔
2601

2602
  if (find == NULL) {
5,025,218✔
2603
    output = taosMemoryCalloc(1, sizeof(SUseDbOutput));
68,364✔
2604
    if (output == NULL) {
68,364✔
2605
      return TSDB_CODE_OUT_OF_MEMORY;
×
2606
    }
2607

2608
    code = buildDbVgInfoMap(clientRpc, dbFName, output);
68,364✔
2609
    QUERY_CHECK_CODE(code, line, _return);
68,364✔
2610

2611
    code = taosHashPut(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
68,364✔
2612
    if (code == TSDB_CODE_DUP_KEY) {
68,364✔
2613
      code = TSDB_CODE_SUCCESS;
10,253✔
2614
      // another thread has put the same dbFName, so we need to free the output
2615
      freeUseDbOutput_tmp(&output);
10,253✔
2616
      find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
10,253✔
2617
      if (find == NULL) {
10,253✔
2618
        QUERY_CHECK_CODE(code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, line, _return);
×
2619
      }
2620
      output = *find;
10,253✔
2621
    }
2622
    QUERY_CHECK_CODE(code, line, _return);
68,364✔
2623
  } else {
2624
    output = *find;
4,956,854✔
2625
  }
2626

2627
  *dbVgInfo = output->dbVgroup;
5,024,795✔
2628
  return code;
5,024,605✔
2629

2630
_return:
×
2631
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2632
  freeUseDbOutput_tmp(&output);
×
2633
  return code;
×
2634
}
2635

2636
int32_t getDbVgInfoForExec(void* clientRpc, const char* dbFName, const char* tbName, SVgroupInfo* pVgInfo) {
5,024,490✔
2637
  SDBVgInfo* dbInfo = NULL;
5,024,490✔
2638
  int32_t code = 0, lino = 0;
5,024,490✔
2639
  char tbFullName[TSDB_TABLE_FNAME_LEN];
5,007,620✔
2640
  snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbFName, tbName);
5,024,490✔
2641
  
2642
  taosRLockLatch(&g_dbVgInfoMgr.lock);
5,024,490✔
2643
  
2644
  TAOS_CHECK_EXIT(getDbVgInfoByTbName(clientRpc, dbFName, &dbInfo));
5,025,218✔
2645

2646
  TAOS_CHECK_EXIT(inserterGetVgInfo(dbInfo, tbFullName, pVgInfo));
5,024,605✔
2647

2648
_exit:
5,024,815✔
2649

2650
  taosRUnLockLatch(&g_dbVgInfoMgr.lock);
5,024,815✔
2651

2652
  if (code) {
5,024,815✔
2653
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2654
  }
2655

2656
  return code;
5,024,815✔
2657
}
2658

2659
void rmDbVgInfoFromCache(const char* dbFName) {
3,099✔
2660
  taosWLockLatch(&g_dbVgInfoMgr.lock);
3,099✔
2661

2662
  TAOS_UNUSED(taosHashRemove(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName)));
3,099✔
2663

2664
  taosWUnLockLatch(&g_dbVgInfoMgr.lock);
3,099✔
2665
}
3,099✔
2666

2667
static int32_t dropTableReqToMsg(int32_t vgId, SVDropTbBatchReq* pReq, void** pData, int32_t* pLen) {
423✔
2668
  int32_t code = TSDB_CODE_SUCCESS;
423✔
2669
  int32_t len = 0;
423✔
2670
  void*   pBuf = NULL;
423✔
2671
  tEncodeSize(tEncodeSVDropTbBatchReq, pReq, len, code);
423✔
2672
  if (TSDB_CODE_SUCCESS == code) {
423✔
2673
    SEncoder encoder;
423✔
2674
    len += sizeof(SMsgHead);
423✔
2675
    pBuf = taosMemoryMalloc(len);
423✔
2676
    if (NULL == pBuf) {
423✔
2677
      return terrno;
×
2678
    }
2679
    ((SDropTbDataMsg*)pBuf)->header.vgId = htonl(vgId);
423✔
2680
    ((SDropTbDataMsg*)pBuf)->header.contLen = htonl(len);
423✔
2681
    //((SDropTbDataMsg*)pBuf)->pData = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
2682
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
423✔
2683
    code = tEncodeSVDropTbBatchReq(&encoder, pReq);
423✔
2684
    tEncoderClear(&encoder);
423✔
2685
  }
2686

2687
  if (TSDB_CODE_SUCCESS == code) {
423✔
2688
    *pData = pBuf;
423✔
2689
    *pLen = len;
423✔
2690
  } else {
2691
    taosMemoryFree(pBuf);
×
2692
  }
2693

2694
  return code;
423✔
2695
}
2696

2697
int32_t dropTbCallback(void* param, SDataBuf* pMsg, int32_t code) {
423✔
2698
  SDropTbCtx* pCtx = (SDropTbCtx*)param;
423✔
2699
  if (code) {
423✔
2700
    stError("dropTbCallback, code:%d, stream:%" PRId64 " gid:%" PRId64, code, pCtx->req->streamId, pCtx->req->gid);
×
2701
  }
2702
  pCtx->code = code;
423✔
2703
  code = tsem_post(&pCtx->ready);
423✔
2704
  taosMemoryFree(pMsg->pData);
423✔
2705

2706
  return TSDB_CODE_SUCCESS;
423✔
2707
}
2708

2709
static int32_t sendDropTbRequest(SDropTbCtx* ctx, void* pMsg, int32_t msgLen, void* pTransporter, SEpSet* pEpset) {
423✔
2710
  // send the fetch remote task result reques
2711
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
423✔
2712
  if (NULL == pMsgSendInfo) {
423✔
2713
    return terrno;
×
2714
  }
2715

2716
  pMsgSendInfo->param = ctx;
423✔
2717
  pMsgSendInfo->paramFreeFp = NULL;
423✔
2718
  pMsgSendInfo->msgInfo.pData = pMsg;
423✔
2719
  pMsgSendInfo->msgInfo.len = msgLen;
423✔
2720
  pMsgSendInfo->msgType = TDMT_VND_SNODE_DROP_TABLE;
423✔
2721
  pMsgSendInfo->fp = dropTbCallback;
423✔
2722

2723
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
423✔
2724
}
2725

2726
int32_t doDropStreamTable(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq) {
423✔
2727
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
423✔
2728
  int32_t                  code = 0;
423✔
2729
  int32_t                  lino = 0;
423✔
2730
  SVDropTbBatchReq         req = {.nReqs = 1};
423✔
2731
  SVDropTbReq*             pDropReq = NULL;
423✔
2732
  int32_t                  msgLen = 0;
423✔
2733
  tsem_t*                  pSem = NULL;
423✔
2734
  SDropTbDataMsg*          pMsg = NULL;
423✔
2735

2736
  SInsertTableInfo** ppTbInfo = NULL;
423✔
2737
  int32_t            vgId = 0;
423✔
2738

2739
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
423✔
2740
  if (!req.pArray) return terrno;
423✔
2741

2742
  pDropReq = taosArrayGet(req.pArray, 0);
423✔
2743

2744
  code = getStreamInsertTableInfo(pReq->streamId, pReq->gid, &ppTbInfo);
423✔
2745
  if (TSDB_CODE_SUCCESS == code) {
423✔
2746
    pDropReq->name = taosStrdup((*ppTbInfo)->tbname);
423✔
2747
    pDropReq->suid = (*ppTbInfo)->uid;
423✔
2748
    pDropReq->uid = (*ppTbInfo)->uid;
423✔
2749
    pDropReq->igNotExists = true;
423✔
2750
    vgId = (*ppTbInfo)->vgid;
423✔
2751

2752
    int64_t key[2] = {pReq->streamId, pReq->gid};
423✔
2753
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
423✔
2754
  } else {
2755
    code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
2756
  }
2757
  QUERY_CHECK_CODE(code, lino, _end);
423✔
2758

2759
  code = dropTableReqToMsg(vgId, &req, (void**)&pMsg, &msgLen);
423✔
2760
  QUERY_CHECK_CODE(code, lino, _end);
423✔
2761

2762
  SVgroupInfo vgInfo = {0};
423✔
2763
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
423✔
2764
  QUERY_CHECK_CODE(code, lino, _end);
423✔
2765

2766
  SDropTbCtx ctx = {.req = pReq};
423✔
2767
  code = tsem_init(&ctx.ready, 0, 0);
423✔
2768
  QUERY_CHECK_CODE(code, lino, _end);
423✔
2769
  pSem = &ctx.ready;
423✔
2770

2771
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
423✔
2772
  QUERY_CHECK_CODE(code, lino, _end);
423✔
2773
  pMsg = NULL;  // now owned by sendDropTbRequest
423✔
2774

2775
  code = tsem_wait(&ctx.ready);
423✔
2776
  code = ctx.code;
423✔
2777
  stDebug("doDropStreamTable,  code:0x%" PRIx32 " req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
423✔
2778
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2779

2780
_end:
423✔
2781
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND) {
423✔
2782
    stError("doDropStreamTable, code:0x%" PRIx32 ", streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2783
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2784
    if (pMsg) {
×
2785
      taosMemoryFreeClear(pMsg);
×
2786
    }
2787
  }
2788
  if (pSem) tsem_destroy(pSem);
423✔
2789
  if (pDropReq && pDropReq->name) taosMemoryFreeClear(pDropReq->name);
423✔
2790
  if (ppTbInfo) releaseStreamInsertTableInfo(ppTbInfo);
423✔
2791
  taosArrayDestroy(req.pArray);
423✔
2792

2793
  return code;
423✔
2794
}
2795

2796
int32_t doDropStreamTableByTbName(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
2797
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
×
2798
  int32_t                  code = 0;
×
2799
  int32_t                  lino = 0;
×
2800
  SVDropTbBatchReq         req = {.nReqs = 1};
×
2801
  SVDropTbReq*             pDropReq = NULL;
×
2802
  int32_t                  msgLen = 0;
×
2803
  tsem_t*                  pSem = NULL;
×
2804
  SDropTbDataMsg*          pMsg = NULL;
×
2805

2806
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
×
2807

2808
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
×
2809
  if (!req.pArray) return terrno;
×
2810

2811
  pDropReq = taosArrayGet(req.pArray, 0);
×
2812

2813
  pDropReq->name = tbName;
×
2814
  pDropReq->igNotExists = true;
×
2815

2816
  int64_t key[2] = {pReq->streamId, pReq->gid};
×
2817
  TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
×
2818

2819
  SVgroupInfo vgInfo = {0};
×
2820
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
×
2821
  QUERY_CHECK_CODE(code, lino, _end);
×
2822

2823
  code = dropTableReqToMsg(vgInfo.vgId, &req, (void**)&pMsg, &msgLen);
×
2824
  QUERY_CHECK_CODE(code, lino, _end);
×
2825

2826
  SDropTbCtx ctx = {.req = pReq};
×
2827
  code = tsem_init(&ctx.ready, 0, 0);
×
2828
  QUERY_CHECK_CODE(code, lino, _end);
×
2829
  pSem = &ctx.ready;
×
2830

2831
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
×
2832
  QUERY_CHECK_CODE(code, lino, _end);
×
2833
  pMsg = NULL;  // now owned by sendDropTbRequest
×
2834

2835
  code = tsem_wait(&ctx.ready);
×
2836
  code = ctx.code;
×
2837
  stDebug("doDropStreamTableByTbName,  code:%d req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
×
2838
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2839

2840
_end:
×
2841
  if (code != TSDB_CODE_SUCCESS) {
×
2842
    stError("doDropStreamTableByTbName, code:%d, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2843
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2844
    if (pMsg) {
×
2845
      taosMemoryFreeClear(pMsg);
×
2846
    }
2847
  }
2848
  if (pSem) tsem_destroy(pSem);
×
2849
  taosArrayDestroy(req.pArray);
×
2850

2851
  return code;
×
2852
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc