• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4926

13 Jan 2026 05:43AM UTC coverage: 66.053% (-0.05%) from 66.107%
#4926

push

travis-ci

web-flow
feat: [6654385780] show snap progress (#34203)

48 of 59 new or added lines in 7 files covered. (81.36%)

582 existing lines in 124 files now uncovered.

200362 of 303334 relevant lines covered (66.05%)

132283104.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.79
/source/libs/executor/src/dataInserter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdint.h>
17
#include <stdlib.h>
18
#include <string.h>
19
#include "dataSinkInt.h"
20
#include "dataSinkMgt.h"
21
#include "executor.h"
22
#include "executorInt.h"
23
#include "functionMgt.h"
24
#include "libs/new-stream/stream.h"
25
#include "osAtomic.h"
26
#include "osMemPool.h"
27
#include "osMemory.h"
28
#include "osSemaphore.h"
29
#include "planner.h"
30
#include "query.h"
31
#include "querytask.h"
32
#include "storageapi.h"
33
#include "taoserror.h"
34
#include "tarray.h"
35
#include "tcompression.h"
36
#include "tdatablock.h"
37
#include "tdataformat.h"
38
#include "tglobal.h"
39
#include "thash.h"
40
#include "tmsg.h"
41
#include "tqueue.h"
42

43
extern SDataSinkStat gDataSinkStat;
44
SHashObj*            gStreamGrpTableHash = NULL;
45
typedef struct SSubmitRes {
46
  int64_t      affectedRows;
47
  int32_t      code;
48
  SSubmitRsp2* pRsp;
49
} SSubmitRes;
50

51
typedef struct SSubmitTbDataMsg {
52
  int32_t vgId;
53
  int32_t len;
54
  void*   pData;
55
} SSubmitTbDataMsg;
56

57
typedef struct SSubmitTbDataSendInfo {
58
  SSubmitTbDataMsg msg;
59
  SEpSet epSet;
60
} SSubmitTbDataSendInfo;
61

62
typedef struct SSubmitReqSendInfo {
63
  SSubmitReq2* msg;
64
  SEpSet epSet;
65
} SSubmitReqSendInfo;
66

67
static void destroySSubmitTbDataMsg(void* p) {
×
68
  if (p == NULL) return;
×
69
  SSubmitTbDataMsg* pVg = p;
×
70
  taosMemoryFree(pVg->pData);
×
71
  taosMemoryFree(pVg);
×
72
}
73

74
static void destroySSubmitTbDataSendInfo(void* p) {
×
75
  if (p == NULL) return;
×
76
  SSubmitTbDataSendInfo* pSendInfo = p;
×
77
  taosMemoryFree(pSendInfo->msg.pData);
×
78
  taosMemoryFree(pSendInfo);
×
79
}
80

81
typedef struct SDataInserterHandle {
82
  SDataSinkHandle     sink;
83
  SDataSinkManager*   pManager;
84
  STSchema*           pSchema;
85
  SQueryInserterNode* pNode;
86
  SSubmitRes          submitRes;
87
  SInserterParam*     pParam;
88
  SArray*             pDataBlocks;
89
  SHashObj*           pCols;
90
  int32_t             status;
91
  bool                queryEnd;
92
  bool                fullOrderColList;
93
  uint64_t            useconds;
94
  uint64_t            cachedSize;
95
  uint64_t            flags;
96
  TdThreadMutex       mutex;
97
  tsem_t              ready;
98
  bool                explain;
99
  bool                isStbInserter;
100
  SSchemaWrapper*     pTagSchema;
101
  const char*         dbFName;
102
  SHashObj*           dbVgInfoMap;
103
  SUseDbRsp*          pRsp;
104
} SDataInserterHandle;
105

106
typedef struct SSubmitRspParam {
107
  SDataInserterHandle* pInserter;
108
  void*                putParam;
109
} SSubmitRspParam;
110

111
typedef struct SBuildInsertDataInfo {
112
  SSubmitTbData  pTbData;
113
  bool           isFirstBlock;
114
  bool           isLastBlock;
115
  int64_t        lastTs;
116
  bool           needSortMerge;
117
} SBuildInsertDataInfo;
118

119
typedef struct SDropTbCtx {
120
  SSTriggerDropRequest* req;
121
  tsem_t                ready;
122
  int32_t               code;
123
} SDropTbCtx;
124
typedef struct SDropTbDataMsg {
125
  SMsgHead header;
126
  void*    pData;
127
} SDropTbDataMsg;
128

129
typedef struct SRunnerDropTableInfo {
130
  SSTriggerDropRequest* pReq;
131
  int32_t               code;
132
} SRunnerDropTableInfo;
133

134
static int32_t initInsertProcessInfo(SBuildInsertDataInfo* pBuildInsertDataInfo, int32_t rows) {
2,662,502✔
135
  pBuildInsertDataInfo->isLastBlock = false;
2,662,502✔
136
  pBuildInsertDataInfo->lastTs = TSKEY_MIN;
2,662,750✔
137
  pBuildInsertDataInfo->isFirstBlock = true;
2,662,502✔
138
  pBuildInsertDataInfo->needSortMerge = false;
2,662,502✔
139

140
  if (!(pBuildInsertDataInfo->pTbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
2,662,750✔
141
    return terrno;
×
142
  }
143

144
  return TSDB_CODE_SUCCESS;
2,662,502✔
145
}
146

147
static void freeCacheTbInfo(void* pp) {
232,982✔
148
  if (pp == NULL || *(SInsertTableInfo**)pp == NULL) {
232,982✔
149
    return;
×
150
  }
151
  SInsertTableInfo* pTbInfo = *(SInsertTableInfo**)pp;
232,982✔
152
  if (pTbInfo->tbname) {
232,982✔
153
    taosMemFree(pTbInfo->tbname);
232,982✔
154
    pTbInfo->tbname = NULL;
232,982✔
155
  }
156
  if (pTbInfo->pSchema) {
232,982✔
157
    tDestroyTSchema(pTbInfo->pSchema);
232,982✔
158
    pTbInfo->pSchema = NULL;
232,982✔
159
  }
160
  taosMemoryFree(pTbInfo);
232,982✔
161
}
162

163
int32_t initInserterGrpInfo() {
573,473✔
164
  gStreamGrpTableHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
573,473✔
165
  if (NULL == gStreamGrpTableHash) {
573,473✔
166
    qError("failed to create stream group table hash");
×
167
    return terrno;
×
168
  }
169
  taosHashSetFreeFp(gStreamGrpTableHash, freeCacheTbInfo);
573,473✔
170
  return TSDB_CODE_SUCCESS;
573,473✔
171
}
172

173
void destroyInserterGrpInfo() {
573,473✔
174
  static int8_t destoryGrpInfo = 0;
175
  int8_t        flag = atomic_val_compare_exchange_8(&destoryGrpInfo, 0, 1);
573,473✔
176
  if (flag != 0) {
573,473✔
177
    return;
×
178
  }
179
  if (NULL != gStreamGrpTableHash) {
573,473✔
180
    taosHashCleanup(gStreamGrpTableHash);
573,473✔
181
    gStreamGrpTableHash = NULL;
573,473✔
182
  }
183
}
184

185
static int32_t checkResAndResetTableInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* res,
312,616✔
186
                                         bool* pSchemaChaned) {
187
  int32_t code = TSDB_CODE_SUCCESS;
312,616✔
188
  if (!pSubmitRes->pRsp) {
312,616✔
189
    stError("create table response is NULL");
×
190
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
191
  }
192
  if (pSubmitRes->pRsp->aCreateTbRsp->size < 1) {
312,616✔
193
    stError("create table response size is less than 1");
×
194
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
195
  }
196
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
312,616✔
197
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
312,616✔
198
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
199
    return pCreateTbRsp->code;
×
200
  }
201
  if (!pCreateTbRsp->pMeta || pCreateTbRsp->pMeta->tuid == 0) {
312,616✔
202
    stError("create table can not get tuid");
×
203
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
204
  }
205

206
  *pSchemaChaned = false;
312,616✔
207
  res->vgid = pCreateTbRsp->pMeta->vgId;
312,616✔
208
  res->uid = pCreateTbRsp->pMeta->tuid;
312,212✔
209

210
  if (pCreateTbRsp->pMeta->sversion != 0 && res->version != pCreateTbRsp->pMeta->sversion) {
312,616✔
211
    *pSchemaChaned = true;
430✔
212
  }
213

214
  stDebug("inserter callback, uid:%" PRId64 "  vgid: %" PRId64 ", version: %d", res->uid, res->vgid, res->version);
312,616✔
215

216
  return TSDB_CODE_SUCCESS;
312,616✔
217
}
218

219
static int32_t createNewInsertTbInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* pOldInsertTbInfo,
430✔
220
                                     SInsertTableInfo** ppNewInsertTbInfo) {
221
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
430✔
222
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
430✔
223
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
224
    return pCreateTbRsp->code;
×
225
  }
226

227
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
430✔
228
  if (res == NULL) {
430✔
229
    return terrno;
×
230
  }
231
  res->tbname = taosStrdup(pOldInsertTbInfo->tbname);
430✔
232
  if (res->tbname == NULL) {
430✔
233
    taosMemoryFree(res);
×
234
    stError("failed to allocate memory for table name");
×
235
    return terrno;
×
236
  }
237

238
  res->vgid = pCreateTbRsp->pMeta->vgId;
430✔
239

240
  res->uid = pCreateTbRsp->pMeta->tuid;
430✔
241
  res->vgid = pCreateTbRsp->pMeta->vgId;
430✔
242

243
  res->version = pCreateTbRsp->pMeta->sversion;
430✔
244
  res->pSchema = tBuildTSchema(pCreateTbRsp->pMeta->pSchemas, pCreateTbRsp->pMeta->numOfColumns, res->version);
430✔
245
  if (res->pSchema == NULL) {
430✔
246
    stError("failed to build schema for table:%s, uid:%" PRId64 ", vgid:%" PRId64 ", version:%d", res->tbname, res->uid,
×
247
            res->vgid, res->version);
248
    return terrno;
×
249
  }
250
  *ppNewInsertTbInfo = res;
430✔
251
  return TSDB_CODE_SUCCESS;
430✔
252
}
253

254
static int32_t updateInsertGrpTableInfo(SStreamDataInserterInfo* pInserterInfo, const SSubmitRes* pSubmitRes) {
312,616✔
255
  int32_t            code = TSDB_CODE_SUCCESS;
312,616✔
256
  int32_t            lino = 0;
312,616✔
257
  int64_t            key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
312,616✔
258
  SInsertTableInfo** ppTbRes = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
312,616✔
259
  if (NULL == ppTbRes || *ppTbRes == NULL) {
312,616✔
260
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
261
  }
262

263
  bool schemaChanged = false;
312,616✔
264
  code = checkResAndResetTableInfo(pSubmitRes, *ppTbRes, &schemaChanged);
312,616✔
265
  QUERY_CHECK_CODE(code, lino, _exit);
312,616✔
266

267
  if (schemaChanged) {
312,616✔
268
    SInsertTableInfo* pNewInfo = NULL;
430✔
269
    code = createNewInsertTbInfo(pSubmitRes, *ppTbRes, &pNewInfo);
430✔
270
    QUERY_CHECK_CODE(code, lino, _exit);
430✔
271

272
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
430✔
273

274
    code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &pNewInfo, sizeof(SInsertTableInfo*));
430✔
275

276
    if (code == TSDB_CODE_DUP_KEY) {
430✔
277
      freeCacheTbInfo(&pNewInfo);
×
278
      code = TSDB_CODE_SUCCESS;
×
279
      goto _exit;
×
280
    } else if (code != TSDB_CODE_SUCCESS) {
430✔
281
      freeCacheTbInfo(&pNewInfo);
×
282
      stError("failed to put new insert tbInfo for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
283
              pInserterInfo->streamId, pInserterInfo->groupId, code);
284
      QUERY_CHECK_CODE(code, lino, _exit);
×
285
    }
286

287
    stInfo("update table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", uid:%" PRId64 ", vgid:%" PRId64
430✔
288
           ", version:%d",
289
           pInserterInfo->streamId, pInserterInfo->groupId, pNewInfo->uid, pNewInfo->vgid, pNewInfo->version);
290
  }
291
  return TSDB_CODE_SUCCESS;
312,616✔
292

293
_exit:
×
294
  if (code != TSDB_CODE_SUCCESS) {
×
295
    stError("failed to check and reset table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
296
            pInserterInfo->streamId, pInserterInfo->groupId, code);
297
  }
298
  taosHashRelease(gStreamGrpTableHash, ppTbRes);
×
299
  return code;
×
300
}
301

302
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema);
303
static int32_t initTableInfo(SDataInserterHandle* pInserter, SStreamDataInserterInfo* pInserterInfo) {
314,680✔
304
  int32_t           code = TSDB_CODE_SUCCESS;
314,680✔
305
  int32_t           lino = 0;
314,680✔
306
  int64_t           key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
314,680✔
307
  
308
  // Check if key already exists to avoid unnecessary allocation
309
  SInsertTableInfo** ppExisting = taosHashGet(gStreamGrpTableHash, key, sizeof(key));
314,680✔
310
  if (ppExisting != NULL && *ppExisting != NULL) {
314,680✔
311
    return TSDB_CODE_SUCCESS;
82,128✔
312
  }
313

314
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
232,552✔
315
  if (res == NULL) {
232,552✔
316
    return terrno;
×
317
  }
318

319
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
232,552✔
320
  res->uid = 0;
232,552✔
321
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
232,552✔
322
    res->version = 1;
75,073✔
323
  } else {
324
    res->version = pInsertParam->sver;
157,479✔
325
  }
326

327
  res->tbname = taosStrdup(pInserterInfo->tbName);
232,552✔
328
  if (res->tbname == NULL) {
232,552✔
329
    taosMemoryFree(res);
×
330
    stError("failed to allocate memory for table name");
×
331
    return terrno;
×
332
  }
333

334
  code = buildTSchmaFromInserter(pInserter->pParam->streamInserterParam, &res->pSchema);
232,552✔
335
  QUERY_CHECK_CODE(code, lino, _return);
232,552✔
336

337
  code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &res, sizeof(SInsertTableInfo*));
232,552✔
338
  if (code == TSDB_CODE_DUP_KEY) {
232,552✔
339
    freeCacheTbInfo(&res);
×
340
    return TSDB_CODE_SUCCESS;
×
341
  }
342

343
_return:
232,552✔
344
  if (code != TSDB_CODE_SUCCESS) {
232,552✔
345
    stError("failed to build table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
346
            pInserterInfo->streamId, pInserterInfo->groupId, code);
347
    freeCacheTbInfo(&res);
×
348
  }
349
  return code;
232,552✔
350
}
351

352
static bool colsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
22,296✔
353
  SArray* pCreatingFields = pInserterParam->pFields;
22,296✔
354

355
  for (int32_t i = 0; i < pCreatingFields->size; ++i) {
125,951✔
356
    SFieldWithOptions* pField = taosArrayGet(pCreatingFields, i);
103,655✔
357
    if (NULL == pField) {
103,655✔
358
      stError("isSupportedSTableSchema: failed to get field from array");
×
359
      return false;
×
360
    }
361

362
    for (int j = 0; j < pTableMetaRsp->numOfColumns; ++j) {
311,526✔
363
      if (strncmp(pTableMetaRsp->pSchemas[j].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
309,806✔
364
        if (pTableMetaRsp->pSchemas[j].type == pField->type && pTableMetaRsp->pSchemas[j].bytes == pField->bytes) {
101,935✔
365
          break;
366
        } else {
367
          return false;
×
368
        }
369
      }
370
    }
371
  }
372
  return true;
22,296✔
373
}
374

375
static bool TagsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
860✔
376
  SArray* pCreatingTags = pInserterParam->pTagFields;
860✔
377

378
  int32_t            tagIndexOffset = -1;
860✔
379
  SFieldWithOptions* pField = taosArrayGet(pCreatingTags, 0);
860✔
380
  if (NULL == pField) {
860✔
381
    stError("isSupportedSTableSchema: failed to get field from array");
×
382
    return false;
×
383
  }
384
  for (int32_t i = 0; i < pTableMetaRsp->numOfColumns + pTableMetaRsp->numOfTags; ++i) {
860✔
385
    if (strncmp(pTableMetaRsp->pSchemas[i].name, pField->name, TSDB_COL_NAME_LEN) != 0) {
430✔
386
      tagIndexOffset = i;
430✔
387
      break;
430✔
388
    }
389
  }
390
  if (tagIndexOffset == -1) {
860✔
391
    stError("isSupportedSTableSchema: failed to get tag index");
430✔
392
    return false;
430✔
393
  }
394

395
  for (int32_t i = 0; i < pTableMetaRsp->numOfTags; ++i) {
860✔
396
    int32_t            index = i + tagIndexOffset;
430✔
397
    SFieldWithOptions* pField = taosArrayGet(pCreatingTags, i);
430✔
398
    if (NULL == pField) {
430✔
399
      stError("isSupportedSTableSchema: failed to get field from array");
×
400
      return false;
×
401
    }
402

403
    for(int32_t j = 0; j < pTableMetaRsp->numOfTags; ++j) {
860✔
404
      if (strncmp(pTableMetaRsp->pSchemas[index].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
430✔
405
        if (pTableMetaRsp->pSchemas[index].type == pField->type &&
×
406
            pTableMetaRsp->pSchemas[index].bytes == pField->bytes) {
×
407
          break;
408
        } else {
409
          return false;
×
410
        }
411
      }
412
    }
413
  }
414
  return true;
430✔
415
}
416

417
static bool isSupportedSTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
860✔
418
  if (!colsIsSupported(pTableMetaRsp, pInserterParam)) {
860✔
419
    return false;
×
420
  }
421
  if (!TagsIsSupported(pTableMetaRsp, pInserterParam)) {
860✔
422
    return false;
430✔
423
  }
424
  return true;
430✔
425
}
426

427
static bool isSupportedNTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
21,436✔
428
  return colsIsSupported(pTableMetaRsp, pInserterParam);
21,436✔
429
}
430

431
static int32_t checkAndSaveCreateGrpTableInfo(SDataInserterHandle*     pInserthandle,
22,296✔
432
                                              SStreamDataInserterInfo* pInserterInfo) {
433
  int32_t     code = TSDB_CODE_SUCCESS;
22,296✔
434
  SSubmitRes* pSubmitRes = &pInserthandle->submitRes;
22,296✔
435
  int8_t      tbType = pInserthandle->pParam->streamInserterParam->tbType;
22,296✔
436

437
  SVCreateTbRsp*        pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
22,296✔
438
  SSchema*              pExistRow = pCreateTbRsp->pMeta->pSchemas;
22,296✔
439
  SStreamInserterParam* pInserterParam = pInserthandle->pParam->streamInserterParam;
22,296✔
440

441
  if (tbType == TSDB_CHILD_TABLE || tbType == TSDB_SUPER_TABLE) {
22,296✔
442
    if (!isSupportedSTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
860✔
443
      stError("create table failed, schema is not supported");
430✔
444
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
430✔
445
    }
446
  } else if (tbType == TSDB_NORMAL_TABLE) {
21,436✔
447
    if (!isSupportedNTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
21,436✔
448
      stError("create table failed, schema is not supported");
×
449
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
×
450
    }
451
  } else {
452
    stError("checkAndSaveCreateGrpTableInfo failed, tbType:%d is not supported", tbType);
×
453
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
454
  }
455

456
  return updateInsertGrpTableInfo(pInserterInfo, pSubmitRes);
21,866✔
457
}
458

459
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
4,640,790✔
460
  SSubmitRspParam*     pParam = (SSubmitRspParam*)param;
4,640,790✔
461
  SDataInserterHandle* pInserter = pParam->pInserter;
4,640,790✔
462
  int32_t              code2 = 0;
4,640,790✔
463

464
  if (code) {
4,640,790✔
465
    pInserter->submitRes.code = code;
24,928✔
466
  } else {
467
    pInserter->submitRes.code = TSDB_CODE_SUCCESS;
4,615,862✔
468
  }
469
  SDecoder coder = {0};
4,640,790✔
470

471
  if (code == TSDB_CODE_SUCCESS) {
4,640,790✔
472
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
4,615,862✔
473
    if (NULL == pInserter->submitRes.pRsp) {
4,615,862✔
474
      pInserter->submitRes.code = terrno;
×
475
      goto _return;
×
476
    }
477

478
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
4,615,862✔
479
    code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
4,615,862✔
480
    if (code) {
4,615,615✔
481
      tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
482
      taosMemoryFree(pInserter->submitRes.pRsp);
×
483
      pInserter->submitRes.code = code;
×
484
      goto _return;
×
485
    }
486

487
    if (pInserter->submitRes.pRsp->affectedRows > 0) {
4,615,615✔
488
      SArray* pCreateTbList = pInserter->submitRes.pRsp->aCreateTbRsp;
2,560,136✔
489
      int32_t numOfTables = taosArrayGetSize(pCreateTbList);
2,560,136✔
490

491
      for (int32_t i = 0; i < numOfTables; ++i) {
2,852,202✔
492
        SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i);
292,066✔
493
        if (NULL == pRsp) {
292,066✔
494
          pInserter->submitRes.code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
495
          goto _return;
×
496
        }
497
        if (TSDB_CODE_SUCCESS != pRsp->code) {
292,066✔
498
          code = pRsp->code;
×
499
          tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
500
          taosMemoryFree(pInserter->submitRes.pRsp);
×
501
          pInserter->submitRes.code = code;
×
502
          goto _return;
×
503
        }
504
      }
505
    }
506

507
    if (pParam->putParam != NULL && ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) {
4,615,615✔
508
      code2 = updateInsertGrpTableInfo((SStreamDataInserterInfo*)pParam->putParam, &pInserter->submitRes);
290,750✔
509
    }
510

511
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
4,615,615✔
512
    qDebug("submit rsp received, affectedRows:%d, total:%" PRId64, pInserter->submitRes.pRsp->affectedRows,
4,615,862✔
513
           pInserter->submitRes.affectedRows);
514
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
4,615,862✔
515
    taosMemoryFree(pInserter->submitRes.pRsp);
4,615,645✔
516
  } else if ((TSDB_CODE_TDB_TABLE_ALREADY_EXIST == code && pParam->putParam != NULL &&
24,928✔
517
              ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) ||
24,928✔
518
             TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER == code) {
519
    pInserter->submitRes.code = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
22,296✔
520
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
22,296✔
521
    if (NULL == pInserter->submitRes.pRsp) {
22,296✔
522
      code2 = terrno;
×
523
      goto _return;
×
524
    }
525

526
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
22,296✔
527
    code2 = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
22,296✔
528
    if (code2 == TSDB_CODE_SUCCESS) {
22,296✔
529
      code2 = checkAndSaveCreateGrpTableInfo(pInserter, (SStreamDataInserterInfo*)pParam->putParam);
22,296✔
530
    }
531
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
22,296✔
532
    taosMemoryFree(pInserter->submitRes.pRsp);
22,296✔
533
  }
534

535
_return:
4,638,708✔
536

537
  if (code2) {
4,640,573✔
538
    qError("update inserter table info failed, error:%s", tstrerror(code2));
430✔
539
  }
540
  tDecoderClear(&coder);
4,640,573✔
541
  TAOS_UNUSED(tsem_post(&pInserter->ready));
4,640,790✔
542

543
  taosMemoryFree(pMsg->pData);
4,640,790✔
544

545
  return TSDB_CODE_SUCCESS;
4,640,790✔
546
}
547

548
void freeUseDbOutput_tmp(void* ppOutput) {
10,915✔
549
  SUseDbOutput* pOut = *(SUseDbOutput**)ppOutput;
10,915✔
550
  if (NULL == ppOutput) {
10,915✔
551
    return;
×
552
  }
553

554
  if (pOut->dbVgroup) {
10,915✔
555
    freeVgInfo(pOut->dbVgroup);
10,915✔
556
  }
557
  taosMemFree(pOut);
10,915✔
558
  *(SUseDbOutput**)ppOutput = NULL;
10,915✔
559
}
560

561
static int32_t processUseDbRspForInserter(void* param, SDataBuf* pMsg, int32_t code) {
40,917✔
562
  int32_t       lino = 0;
40,917✔
563
  SDBVgInfoReq* pVgInfoReq = (SDBVgInfoReq*)param;
40,917✔
564

565
  if (TSDB_CODE_SUCCESS != code) {
40,917✔
566
    // pInserter->pTaskInfo->code = rpcCvtErrCode(code);
567
    // if (pInserter->pTaskInfo->code != code) {
568
    //   qError("load db info rsp received, error:%s, cvted error:%s", tstrerror(code),
569
    //          tstrerror(pInserter->pTaskInfo->code));
570
    // } else {
571
    //   qError("load db info rsp received, error:%s", tstrerror(code));
572
    // }
573
    goto _return;
×
574
  }
575

576
  pVgInfoReq->pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
40,917✔
577
  QUERY_CHECK_NULL(pVgInfoReq->pRsp, code, lino, _return, terrno);
40,917✔
578

579
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pVgInfoReq->pRsp);
40,917✔
580
  QUERY_CHECK_CODE(code, lino, _return);
40,917✔
581

582
_return:
40,917✔
583
  taosMemoryFreeClear(pMsg->pData);
40,917✔
584
  taosMemoryFreeClear(pMsg->pEpSet);
40,917✔
585
  if (code != 0){
40,917✔
586
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
587
  }
588
  int ret = tsem_post(&pVgInfoReq->ready);
40,917✔
589
  if (ret != 0) {
40,917✔
590
    qError("%s failed code: %d", __func__, ret);
×
591
  }
592
  return code;
40,917✔
593
}
594

595

596
int inserterVgInfoComp(const void* lp, const void* rp) {
58,982✔
597
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
58,982✔
598
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
58,982✔
599
  if (pLeft->hashBegin < pRight->hashBegin) {
58,982✔
600
    return -1;
49,943✔
601
  } else if (pLeft->hashBegin > pRight->hashBegin) {
9,039✔
602
    return 1;
9,039✔
603
  }
604

605
  return 0;
×
606
}
607

608
static int32_t buildDbVgInfoMap(void* clientRpc, const char* dbFName, SUseDbOutput* output) {
40,917✔
609
  int32_t      code = TSDB_CODE_SUCCESS;
40,917✔
610
  int32_t      lino = 0;
40,917✔
611
  char*        buf1 = NULL;
40,917✔
612
  SUseDbReq*   pReq = NULL;
40,917✔
613
  SDBVgInfoReq dbVgInfoReq = {0};
40,917✔
614
  code = tsem_init(&dbVgInfoReq.ready, 0, 0);
40,917✔
615
  if (code != TSDB_CODE_SUCCESS) {
40,696✔
616
    qError("tsem_init failed, error:%s", tstrerror(code));
×
617
    return code;
×
618
  }
619

620
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
40,696✔
621
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
40,917✔
622

623
  tstrncpy(pReq->db, dbFName, TSDB_DB_FNAME_LEN);
40,917✔
624
  QUERY_CHECK_CODE(code, lino, _return);
40,917✔
625

626
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
40,917✔
627
  buf1 = taosMemoryCalloc(1, contLen);
40,917✔
628
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
40,917✔
629

630
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
40,917✔
631
  if (tempRes < 0) {
40,917✔
632
    QUERY_CHECK_CODE(terrno, lino, _return);
×
633
  }
634

635
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
40,917✔
636
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
40,917✔
637

638
  SEpSet pEpSet = {0};
40,917✔
639
  QUERY_CHECK_CODE(getCurrentMnodeEpset(&pEpSet), lino, _return);
40,917✔
640

641
  pMsgSendInfo->param = &dbVgInfoReq;
40,917✔
642
  pMsgSendInfo->msgInfo.pData = buf1;
40,917✔
643
  buf1 = NULL;
40,917✔
644
  pMsgSendInfo->msgInfo.len = contLen;
40,917✔
645
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
40,917✔
646
  pMsgSendInfo->fp = processUseDbRspForInserter;
40,917✔
647
  // pMsgSendInfo->requestId = pTaskInfo->id.queryId;
648

649
  code = asyncSendMsgToServer(clientRpc, &pEpSet, NULL, pMsgSendInfo);
40,917✔
650
  QUERY_CHECK_CODE(code, lino, _return);
40,917✔
651

652
  code = tsem_wait(&dbVgInfoReq.ready);
40,917✔
653
  QUERY_CHECK_CODE(code, lino, _return);
40,917✔
654

655
  code = queryBuildUseDbOutput(output, dbVgInfoReq.pRsp);
40,917✔
656
  QUERY_CHECK_CODE(code, lino, _return);
40,917✔
657

658
  output->dbVgroup->vgArray = taosArrayInit(dbVgInfoReq.pRsp->vgNum, sizeof(SVgroupInfo));
40,917✔
659
  if (NULL == output->dbVgroup->vgArray) {
40,917✔
660
    code = terrno;
×
661
    QUERY_CHECK_CODE(code, lino, _return);
×
662
  }
663

664
  void* pIter = taosHashIterate(output->dbVgroup->vgHash, NULL);
40,917✔
665
  while (pIter) {
107,631✔
666
    if (NULL == taosArrayPush(output->dbVgroup->vgArray, pIter)) {
133,428✔
667
      taosHashCancelIterate(output->dbVgroup->vgHash, pIter);
×
668
      code = terrno;
×
669
      QUERY_CHECK_CODE(code, lino, _return);
×
670
    }
671

672
    pIter = taosHashIterate(output->dbVgroup->vgHash, pIter);
66,714✔
673
  }
674

675
  taosArraySort(output->dbVgroup->vgArray, inserterVgInfoComp);
40,917✔
676

677
_return:
40,917✔
678

679
  if (code) {
40,917✔
680
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
681
    taosMemoryFree(buf1);
×
682
  }
683
  taosMemoryFree(pReq);
40,917✔
684
  TAOS_UNUSED(tsem_destroy(&dbVgInfoReq.ready));
40,917✔
685
  if (dbVgInfoReq.pRsp) {
40,917✔
686
    tFreeSUsedbRsp(dbVgInfoReq.pRsp);
40,917✔
687
    taosMemoryFreeClear(dbVgInfoReq.pRsp);
40,917✔
688
  }
689
  return code;
40,917✔
690
}
691

692
int32_t inserterBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
225,287✔
693
                                 SArray* tagName, uint8_t tagNum, int32_t ttl) {
694
  pTbReq->type = TD_CHILD_TABLE;
225,287✔
695
  pTbReq->ctb.pTag = (uint8_t*)pTag;
225,287✔
696
  pTbReq->name = taosStrdup(tname);
225,287✔
697
  if (!pTbReq->name) return terrno;
225,287✔
698
  pTbReq->ctb.suid = suid;
225,287✔
699
  pTbReq->ctb.tagNum = tagNum;
225,287✔
700
  if (sname) {
225,287✔
701
    pTbReq->ctb.stbName = taosStrdup(sname);
225,287✔
702
    if (!pTbReq->ctb.stbName) {
225,287✔
703
      taosMemoryFree(pTbReq->name);
×
704
      return terrno;
×
705
    }
706
  }
707
  pTbReq->ctb.tagName = tagName;
225,287✔
708
  pTbReq->ttl = ttl;
225,287✔
709
  pTbReq->commentLen = -1;
225,287✔
710

711
  return TSDB_CODE_SUCCESS;
225,066✔
712
}
713

714
int32_t inserterHashValueComp(void const* lp, void const* rp) {
3,808,362✔
715
  uint32_t*    key = (uint32_t*)lp;
3,808,362✔
716
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
3,808,362✔
717

718
  if (*key < pVg->hashBegin) {
3,808,362✔
719
    return -1;
942,738✔
720
  } else if (*key > pVg->hashEnd) {
2,865,408✔
721
    return 1;
194,197✔
722
  }
723

724
  return 0;
2,671,427✔
725
}
726

727

728
int32_t inserterGetVgInfo(SDBVgInfo* dbInfo, char* tbName, SVgroupInfo* pVgInfo) {
2,671,427✔
729
  if (NULL == dbInfo) {
2,671,427✔
730
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
731
  }
732

733
  if (NULL == dbInfo->vgArray) {
2,671,427✔
734
    qError("empty db vgArray, hashSize:%d", taosHashGetSize(dbInfo->vgHash));
×
735
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
736
  }
737

738
  uint32_t hashValue =
2,671,427✔
739
      taosGetTbHashVal(tbName, (int32_t)strlen(tbName), dbInfo->hashMethod, dbInfo->hashPrefix, dbInfo->hashSuffix);
2,671,427✔
740
  SVgroupInfo* vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, inserterHashValueComp, TD_EQ);
2,671,427✔
741
  if (NULL == vgInfo) {
2,671,427✔
742
    qError("no hash range found for hash value [%u], table:%s, numOfVgId:%d", hashValue, tbName,
×
743
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
744
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
745
  }
746
  
747
  *pVgInfo = *vgInfo;
2,671,427✔
748
  qDebug("insert get vgInfo, tbName:%s vgId:%d epset(%s:%d)", tbName, pVgInfo->vgId, pVgInfo->epSet.eps[0].fqdn,
2,671,427✔
749
        pVgInfo->epSet.eps[0].port);
750
        
751
  return TSDB_CODE_SUCCESS;
2,671,427✔
752
}
753

754
int32_t inserterGetVgId(SDBVgInfo* dbInfo, char* tbName, int32_t* vgId) {
×
755
  SVgroupInfo vgInfo = {0};
×
756
  int32_t     code = inserterGetVgInfo(dbInfo, tbName, &vgInfo);
×
757
  if (code != TSDB_CODE_SUCCESS) {
×
758
    qError("inserterGetVgId failed, code:%d", code);
×
759
    return code;
×
760
  }
761
  *vgId = vgInfo.vgId;
×
762

763
  return TSDB_CODE_SUCCESS;
×
764
}
765

766
int32_t inserterGetDbVgInfo(SDataInserterHandle* pInserter, const char* dbFName, SDBVgInfo** dbVgInfo) {
3,504✔
767
  int32_t       code = TSDB_CODE_SUCCESS;
3,504✔
768
  int32_t       line = 0;
3,504✔
769
  SUseDbOutput* output = NULL;
3,504✔
770

771
  // QRY_PARAM_CHECK(dbVgInfo);
772
  // QRY_PARAM_CHECK(pInserter);
773
  // QRY_PARAM_CHECK(name);
774

775
  if (pInserter->dbVgInfoMap == NULL) {
3,504✔
776
    pInserter->dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
3,504✔
777
    if (pInserter->dbVgInfoMap == NULL) {
3,504✔
778
      return TSDB_CODE_OUT_OF_MEMORY;
×
779
    }
780
  }
781

782
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(pInserter->dbVgInfoMap, dbFName, strlen(dbFName));
3,504✔
783

784
  if (find == NULL) {
3,504✔
785
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
3,504✔
786
    if (output == NULL) {
3,504✔
787
      return TSDB_CODE_OUT_OF_MEMORY;
×
788
    }
789

790
    code = buildDbVgInfoMap(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName, output);
3,504✔
791
    QUERY_CHECK_CODE(code, line, _return);
3,504✔
792

793
    code = taosHashPut(pInserter->dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
3,504✔
794
    QUERY_CHECK_CODE(code, line, _return);
3,504✔
795
  } else {
796
    output = *find;
×
797
  }
798

799
  *dbVgInfo = output->dbVgroup;
3,504✔
800
  return code;
3,504✔
801

802
_return:
×
803
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
804
  freeUseDbOutput_tmp(&output);
×
805
  return code;
×
806
}
807

808
int32_t getTableVgInfo(SDataInserterHandle* pInserter, const char* dbFName,
2,664,173✔
809
                       const char* tbName, SVgroupInfo* pVgInfo) {
810
  return getDbVgInfoForExec(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName,
2,664,173✔
811
                              tbName, pVgInfo);
812
}
813

814
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* putParam, void* pMsg, int32_t msgLen,
4,640,790✔
815
                                 void* pTransporter, SEpSet* pEpset) {
816
  // send the fetch remote task result reques
817
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
4,640,790✔
818
  if (NULL == pMsgSendInfo) {
4,640,790✔
819
    taosMemoryFreeClear(pMsg);
×
820
    return terrno;
×
821
  }
822

823
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
4,640,790✔
824
  if (NULL == pParam) {
4,640,790✔
825
    taosMemoryFreeClear(pMsg);
×
826
    taosMemoryFreeClear(pMsgSendInfo);
×
827
    return terrno;
×
828
  }
829
  pParam->pInserter = pInserter;
4,640,790✔
830
  pParam->putParam = putParam;
4,640,577✔
831

832
  pMsgSendInfo->param = pParam;
4,640,790✔
833
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
4,640,790✔
834
  pMsgSendInfo->msgInfo.pData = pMsg;
4,640,577✔
835
  pMsgSendInfo->msgInfo.len = msgLen;
4,640,577✔
836
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
4,640,577✔
837
  pMsgSendInfo->fp = inserterCallback;
4,640,577✔
838

839
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
4,640,577✔
840
}
841

842
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
4,640,577✔
843
  int32_t code = TSDB_CODE_SUCCESS;
4,640,577✔
844
  int32_t len = 0;
4,640,577✔
845
  void*   pBuf = NULL;
4,640,577✔
846
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
4,640,577✔
847
  if (TSDB_CODE_SUCCESS == code) {
4,640,329✔
848
    SEncoder encoder;
4,638,460✔
849
    len += sizeof(SSubmitReq2Msg);
4,640,790✔
850
    pBuf = taosMemoryMalloc(len);
4,640,790✔
851
    if (NULL == pBuf) {
4,640,542✔
852
      return terrno;
×
853
    }
854
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
4,640,542✔
855
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
4,640,542✔
856
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
4,640,790✔
857
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
4,640,577✔
858
    code = tEncodeSubmitReq(&encoder, pReq);
4,640,582✔
859
    tEncoderClear(&encoder);
4,640,790✔
860
  }
861

862
  if (TSDB_CODE_SUCCESS == code) {
4,640,577✔
863
    *pData = pBuf;
4,640,577✔
864
    *pLen = len;
4,640,577✔
865
  } else {
866
    taosMemoryFree(pBuf);
×
867
  }
868

869
  return code;
4,640,329✔
870
}
871

872
int32_t buildSubmitReqFromStbBlock(SDataInserterHandle* pInserter, SHashObj* pHash, const SSDataBlock* pDataBlock,
3,504✔
873
                                   const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid) {
874
  SArray* pVals = NULL;
3,504✔
875
  SArray* pTagVals = NULL;
3,504✔
876
  SSubmitReqSendInfo** ppSendInfo = NULL;
3,504✔
877
  int32_t numOfBlks = 0;
3,504✔
878

879
  terrno = TSDB_CODE_SUCCESS;
3,504✔
880

881
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
3,504✔
882
  int32_t rows = pDataBlock->info.rows;
3,504✔
883

884
  if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
3,504✔
885
    goto _end;
×
886
  }
887

888
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
3,504✔
889
    goto _end;
×
890
  }
891

892
  SDBVgInfo* dbInfo = NULL;
3,504✔
893
  int32_t    code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
3,504✔
894
  if (code != TSDB_CODE_SUCCESS) {
3,504✔
895
    terrno = code;
×
896
    goto _end;
×
897
  }
898

899
  for (int32_t j = 0; j < rows; ++j) {
10,512✔
900
    SSubmitTbData tbData = {0};
7,008✔
901
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
7,008✔
902
      goto _end;
×
903
    }
904
    tbData.suid = suid;
7,008✔
905
    tbData.uid = uid;
7,008✔
906
    tbData.sver = pTSchema->version;
7,008✔
907

908
    int64_t lastTs = TSKEY_MIN;
7,008✔
909

910
    taosArrayClear(pVals);
7,008✔
911

912
    int32_t offset = 0;
7,008✔
913
    taosArrayClear(pTagVals);
7,008✔
914
    tbData.uid = 0;
7,008✔
915
    tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
7,008✔
916
    if (NULL == tbData.pCreateTbReq) {
7,008✔
917
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
918
      goto _end;
×
919
    }
920
    tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
7,008✔
921

922
    SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
7,008✔
923
    if (NULL == tbname) {
7,008✔
924
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
925
      qError("Insert into stable must have tbname column");
×
926
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
927
      goto _end;
×
928
    }
929
    if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
7,008✔
930
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
931
      qError("tbname column must be binary");
×
932
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
933
      goto _end;
×
934
    }
935

936
    if (colDataIsNull_s(tbname, j)) {
14,016✔
937
      qError("insert into stable tbname column is null");
×
938
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
939
      goto _end;
×
940
    }
941
    void*   data = colDataGetVarData(tbname, j);
7,008✔
942
    SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data), .pData = varDataVal(data)};
7,008✔
943
    SColVal cv = COL_VAL_VALUE(0, sv);
7,008✔
944

945
    char tbFullName[TSDB_TABLE_FNAME_LEN];
7,008✔
946
    char tableName[TSDB_TABLE_FNAME_LEN];
7,008✔
947
    memcpy(tableName, sv.pData, sv.nData);
7,008✔
948
    tableName[sv.nData] = '\0';
7,008✔
949

950
    int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
7,008✔
951
    if (len >= TSDB_TABLE_FNAME_LEN) {
7,008✔
952
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
953
      qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
954
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
955
      goto _end;
×
956
    }
957
    SVgroupInfo vgInfo = {0};
7,008✔
958
    code = inserterGetVgInfo(dbInfo, tbFullName, &vgInfo);
7,008✔
959
    if (code != TSDB_CODE_SUCCESS) {
7,008✔
960
      terrno = code;
×
961
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
962
      goto _end;
×
963
    }
964
    SSubmitReq2* pReq = NULL;
7,008✔
965
    ppSendInfo = taosHashGet(pHash, &vgInfo.vgId, sizeof(int32_t));
7,008✔
966
    if (ppSendInfo == NULL) {
7,008✔
967
      SSubmitReqSendInfo* pSendInfo = taosMemoryCalloc(1, sizeof(SSubmitReqSendInfo));
6,424✔
968
      if (NULL == pSendInfo) {
6,424✔
969
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
970
        goto _end;
×
971
      }
972
      
973
      pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
6,424✔
974
      if (NULL == pReq) {
6,424✔
975
        taosMemoryFree(pSendInfo);
×
976
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
977
        goto _end;
×
978
      }
979

980
      if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
6,424✔
981
        taosMemoryFree(pReq);
×
982
        taosMemoryFree(pSendInfo);
×
983
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
984
        goto _end;
×
985
      }
986
      
987
      pSendInfo->msg = pReq;
6,424✔
988
      pSendInfo->epSet = vgInfo.epSet;
6,424✔
989
      code = taosHashPut(pHash, &vgInfo.vgId, sizeof(int32_t), &pSendInfo, POINTER_BYTES);
6,424✔
990
      if (code != TSDB_CODE_SUCCESS) {
6,424✔
991
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
992
        taosMemoryFree(pReq);
×
993
        taosMemoryFree(pSendInfo);
×
994
        terrno = code;
×
995
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
996
        goto _end;
×
997
      }
998
    } else {
999
      pReq = (*ppSendInfo)->msg;
584✔
1000
    }
1001
    SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
7,008✔
1002
    if (!TagNames) {
7,008✔
1003
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1004
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1005
      goto _end;
×
1006
    }
1007
    for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
18,688✔
1008
      SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
11,680✔
1009
      int16_t  colIdx = tSchema->colId;
11,680✔
1010
      int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
11,680✔
1011
      if (NULL == slotId) {
11,680✔
1012
        continue;
7,008✔
1013
      }
1014
      if (NULL == taosArrayPush(TagNames, tSchema->name)) {
9,344✔
1015
        taosArrayDestroy(TagNames);
×
1016
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1017
        goto _end;
×
1018
      }
1019

1020
      colIdx = *slotId;
4,672✔
1021
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
4,672✔
1022
      if (NULL == pColInfoData) {
4,672✔
1023
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1024
        taosArrayDestroy(TagNames);
×
1025
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1026
        goto _end;
×
1027
      }
1028
      // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
1029
      switch (pColInfoData->info.type) {
4,672✔
1030
        case TSDB_DATA_TYPE_NCHAR:
3,504✔
1031
        case TSDB_DATA_TYPE_VARBINARY:
1032
        case TSDB_DATA_TYPE_VARCHAR: {
1033
          if (pColInfoData->info.type != tSchema->type) {
3,504✔
1034
            qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
1035
                   tSchema->type);
1036
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1037
            taosArrayDestroy(TagNames);
×
1038
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1039
            goto _end;
×
1040
          }
1041
          if (colDataIsNull_s(pColInfoData, j)) {
7,008✔
1042
            continue;
×
1043
          } else {
1044
            void*   data = colDataGetVarData(pColInfoData, j);
3,504✔
1045
            STagVal tv = (STagVal){
3,504✔
1046
                .cid = tSchema->colId, .type = tSchema->type, .nData = varDataLen(data), .pData = varDataVal(data)};
3,504✔
1047
            if (NULL == taosArrayPush(pTagVals, &tv)) {
3,504✔
1048
              taosArrayDestroy(TagNames);
×
1049
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1050
              goto _end;
×
1051
            }
1052
          }
1053
          break;
3,504✔
1054
        }
1055
        case TSDB_DATA_TYPE_BLOB:
×
1056
        case TSDB_DATA_TYPE_JSON:
1057
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1058
          qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1059
          terrno = TSDB_CODE_APP_ERROR;
×
1060
          taosArrayDestroy(TagNames);
×
1061
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1062
          goto _end;
×
1063
          break;
1064
        default:
1,168✔
1065
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
1,168✔
1066
            if (colDataIsNull_s(pColInfoData, j)) {
2,336✔
1067
              continue;
×
1068
            } else {
1069
              void*   data = colDataGetData(pColInfoData, j);
1,168✔
1070
              STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
1,168✔
1071
              memcpy(&tv.i64, data, tSchema->bytes);
1,168✔
1072
              if (NULL == taosArrayPush(pTagVals, &tv)) {
1,168✔
1073
                taosArrayDestroy(TagNames);
×
1074
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1075
                goto _end;
×
1076
              }
1077
            }
1078
          } else {
1079
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1080
            terrno = TSDB_CODE_APP_ERROR;
×
1081
            taosArrayDestroy(TagNames);
×
1082
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1083
            goto _end;
×
1084
          }
1085
          break;
1,168✔
1086
      }
1087
    }
1088
    STag* pTag = NULL;
7,008✔
1089
    code = tTagNew(pTagVals, 1, false, &pTag);
7,008✔
1090
    if (code != TSDB_CODE_SUCCESS) {
7,008✔
1091
      terrno = code;
×
1092
      qError("failed to create tag, error:%s", tstrerror(code));
×
1093
      taosArrayDestroy(TagNames);
×
1094
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1095
      goto _end;
×
1096
    }
1097

1098
    code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, suid, pInserter->pNode->tableName, TagNames,
7,008✔
1099
                                    pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
7,008✔
1100
    if (code != TSDB_CODE_SUCCESS) {
7,008✔
1101
      terrno = code;
×
1102
      qError("failed to build create table request, error:%s", tstrerror(code));
×
1103
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1104
      goto _end;
×
1105
    }
1106

1107
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {
39,712✔
1108
      int16_t         colIdx = k;
32,704✔
1109
      const STColumn* pCol = &pTSchema->columns[k];
32,704✔
1110
      int16_t*        slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
32,704✔
1111
      if (NULL == slotId) {
32,704✔
1112
        continue;
15,184✔
1113
      }
1114
      colIdx = *slotId;
17,520✔
1115

1116
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
17,520✔
1117
      if (NULL == pColInfoData) {
17,520✔
1118
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1119
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1120
        goto _end;
×
1121
      }
1122
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
17,520✔
1123

1124
      switch (pColInfoData->info.type) {
17,520✔
1125
        case TSDB_DATA_TYPE_NCHAR:
×
1126
        case TSDB_DATA_TYPE_VARBINARY:
1127
        case TSDB_DATA_TYPE_VARCHAR: {
1128
          if (pColInfoData->info.type != pCol->type) {
×
1129
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1130
                   pCol->type);
1131
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1132
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1133
            goto _end;
×
1134
          }
1135
          if (colDataIsNull_s(pColInfoData, j)) {
×
1136
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1137
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1138
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1139
              goto _end;
×
1140
            }
1141
          } else {
1142
            void*   data = colDataGetVarData(pColInfoData, j);
×
1143
            SValue  sv = (SValue){.type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};
×
1144
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
1145
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1146
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1147
              goto _end;
×
1148
            }
1149
          }
1150
          break;
×
1151
        }
1152
        case TSDB_DATA_TYPE_BLOB:
×
1153
        case TSDB_DATA_TYPE_JSON:
1154
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1155
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1156
          terrno = TSDB_CODE_APP_ERROR;
×
1157
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1158
          goto _end;
×
1159
          break;
1160
        default:
17,520✔
1161
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
17,520✔
1162
            if (colDataIsNull_s(pColInfoData, j)) {
35,040✔
1163
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
×
1164
                qError("Primary timestamp column should not be null");
×
1165
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1166
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1167
                goto _end;
×
1168
              }
1169

1170
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1171
              if (NULL == taosArrayPush(pVals, &cv)) {
×
1172
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1173
                goto _end;
×
1174
              }
1175
            } else {
1176
              // if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
1177
              //   if (*(int64_t*)var <= lastTs) {
1178
              //     needSortMerge = true;
1179
              //   } else {
1180
              //     lastTs = *(int64_t*)var;
1181
              //   }
1182
              // }
1183

1184
              SValue sv = {.type = pCol->type};
17,520✔
1185
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
17,520✔
1186
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
17,520✔
1187
              if (NULL == taosArrayPush(pVals, &cv)) {
17,520✔
1188
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1189
                goto _end;
×
1190
              }
1191
            }
1192
          } else {
1193
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1194
            terrno = TSDB_CODE_APP_ERROR;
×
1195
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1196
            goto _end;
×
1197
          }
1198
          break;
17,520✔
1199
      }
1200
    }
1201

1202
    SRow* pRow = NULL;
7,008✔
1203
    SRowBuildScanInfo sinfo = {0};
7,008✔
1204
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
7,008✔
1205
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1206
      goto _end;
×
1207
    }
1208
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
14,016✔
1209
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1210
      goto _end;
×
1211
    }
1212

1213
    if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
14,016✔
1214
      goto _end;
×
1215
    }
1216
  }
1217

1218
_end:
3,504✔
1219
  taosArrayDestroy(pTagVals);
3,504✔
1220
  taosArrayDestroy(pVals);
3,504✔
1221

1222
  return terrno;
3,504✔
1223
}
1224

1225
int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** ppReq, const SSDataBlock* pDataBlock,
1,979,068✔
1226
                                const STSchema* pTSchema, int64_t* uid, int32_t* vgId, tb_uid_t* suid) {
1227
  SSubmitReq2* pReq = *ppReq;
1,979,068✔
1228
  SArray*      pVals = NULL;
1,979,068✔
1229
  SArray*      pTagVals = NULL;
1,979,068✔
1230
  int32_t      numOfBlks = 0;
1,979,068✔
1231
  char*        tableName = NULL;
1,979,068✔
1232
  int32_t      code = 0, lino = 0;
1,979,068✔
1233

1234
  terrno = TSDB_CODE_SUCCESS;
1,979,068✔
1235

1236
  if (NULL == pReq) {
1,979,068✔
1237
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
1,979,068✔
1238
      goto _end;
×
1239
    }
1240

1241
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
1,979,068✔
1242
      goto _end;
×
1243
    }
1244
  }
1245

1246
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
1,979,068✔
1247
  int32_t rows = pDataBlock->info.rows;
1,979,068✔
1248

1249
  SSubmitTbData tbData = {0};
1,979,068✔
1250
  if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
1,979,068✔
1251
    goto _end;
×
1252
  }
1253
  tbData.suid = *suid;
1,979,068✔
1254
  tbData.uid = *uid;
1,979,068✔
1255
  tbData.sver = pTSchema->version;
1,979,068✔
1256

1257
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
1,979,068✔
1258
    taosArrayDestroy(tbData.aRowP);
×
1259
    goto _end;
×
1260
  }
1261

1262
  if (pInserter->isStbInserter) {
1,979,068✔
1263
    if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
×
1264
      taosArrayDestroy(tbData.aRowP);
×
1265
      goto _end;
×
1266
    }
1267
  }
1268

1269
  int64_t lastTs = TSKEY_MIN;
1,979,068✔
1270
  bool    needSortMerge = false;
1,979,068✔
1271

1272
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
1,929,253,130✔
1273
    taosArrayClear(pVals);
1,927,281,514✔
1274

1275
    int32_t offset = 0;
1,927,281,514✔
1276
    // 处理超级表的tbname和tags
1277
    if (pInserter->isStbInserter) {
1,927,281,514✔
1278
      taosArrayClear(pTagVals);
×
1279
      tbData.uid = 0;
×
1280
      *uid = 0;
×
1281
      tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
1282
      tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
×
1283

1284
      SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
×
1285
      if (NULL == tbname) {
×
1286
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1287
        qError("Insert into stable must have tbname column");
×
1288
        goto _end;
×
1289
      }
1290
      if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
×
1291
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1292
        qError("tbname column must be binary");
×
1293
        goto _end;
×
1294
      }
1295

1296
      if (colDataIsNull_s(tbname, j)) {
×
1297
        qError("insert into stable tbname column is null");
×
1298
        goto _end;
×
1299
      }
1300
      void*   data = colDataGetVarData(tbname, j);
×
1301
      SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data),
×
1302
                            .pData = varDataVal(data)};  // address copy, no value
×
1303
      SColVal cv = COL_VAL_VALUE(0, sv);
×
1304

1305
      // 获取子表vgId
1306
      SDBVgInfo* dbInfo = NULL;
×
1307
      code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
×
1308
      if (code != TSDB_CODE_SUCCESS) {
×
1309
        goto _end;
×
1310
      }
1311

1312
      char tbFullName[TSDB_TABLE_FNAME_LEN];
×
1313
      taosMemoryFreeClear(tableName);
×
1314
      tableName = taosMemoryCalloc(1, sv.nData + 1);
×
1315
      TSDB_CHECK_NULL(tableName, code, lino, _end, terrno);
×
1316
      tstrncpy(tableName, sv.pData, sv.nData);
×
1317
      tableName[sv.nData] = '\0';
×
1318

1319
      int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
×
1320
      if (len >= TSDB_TABLE_FNAME_LEN) {
×
1321
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1322
        qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
1323
        goto _end;
×
1324
      }
1325
      code = inserterGetVgId(dbInfo, tbFullName, vgId);
×
1326
      if (code != TSDB_CODE_SUCCESS) {
×
1327
        terrno = code;
×
1328
        goto _end;
×
1329
      }
1330
      // 解析tag
1331
      SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
×
1332
      if (!TagNames) {
×
1333
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1334
        goto _end;
×
1335
      }
1336
      for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
×
1337
        SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
×
1338
        int16_t  colIdx = tSchema->colId;
×
1339
        if (NULL == taosArrayPush(TagNames, tSchema->name)) {
×
1340
          goto _end;
×
1341
        }
1342
        int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
×
1343
        if (NULL == slotId) {
×
1344
          continue;
×
1345
        }
1346

1347
        colIdx = *slotId;
×
1348
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
×
1349
        if (NULL == pColInfoData) {
×
1350
          terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1351
          goto _end;
×
1352
        }
1353
        // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
1354
        switch (pColInfoData->info.type) {
×
1355
          case TSDB_DATA_TYPE_NCHAR:
×
1356
          case TSDB_DATA_TYPE_VARBINARY:
1357
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1358
            if (pColInfoData->info.type != tSchema->type) {
×
1359
              qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
1360
                     tSchema->type);
1361
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1362
              goto _end;
×
1363
            }
1364
            if (colDataIsNull_s(pColInfoData, j)) {
×
1365
              continue;
×
1366
            } else {
1367
              void*   data = colDataGetVarData(pColInfoData, j);
×
1368
              STagVal tv = (STagVal){.cid = tSchema->colId,
×
1369
                                     .type = tSchema->type,
×
1370
                                     .nData = varDataLen(data),
×
1371
                                     .pData = varDataVal(data)};  // address copy, no value
×
1372
              if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1373
                goto _end;
×
1374
              }
1375
            }
1376
            break;
×
1377
          }
1378
          case TSDB_DATA_TYPE_BLOB:
×
1379
          case TSDB_DATA_TYPE_JSON:
1380
          case TSDB_DATA_TYPE_MEDIUMBLOB:
1381
            qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1382
            terrno = TSDB_CODE_APP_ERROR;
×
1383
            goto _end;
×
1384
            break;
1385
          default:
×
1386
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
1387
              if (colDataIsNull_s(pColInfoData, j)) {
×
1388
                continue;
×
1389
              } else {
1390
                void*   data = colDataGetData(pColInfoData, j);
×
1391
                STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
×
1392
                memcpy(&tv.i64, data, tSchema->bytes);
×
1393
                if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1394
                  goto _end;
×
1395
                }
1396
              }
1397
            } else {
1398
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1399
              terrno = TSDB_CODE_APP_ERROR;
×
1400
              goto _end;
×
1401
            }
1402
            break;
×
1403
        }
1404
      }
1405
      STag* pTag = NULL;
×
1406
      code = tTagNew(pTagVals, 1, false, &pTag);
×
1407
      if (code != TSDB_CODE_SUCCESS) {
×
1408
        terrno = code;
×
1409
        qError("failed to create tag, error:%s", tstrerror(code));
×
1410
        goto _end;
×
1411
      }
1412

1413
      code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, *suid, pInserter->pNode->tableName, TagNames,
×
1414
                               pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
×
1415
    }
1416

1417
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {  // iterate by column
2,147,483,647✔
1418
      int16_t         colIdx = k;
2,147,483,647✔
1419
      const STColumn* pCol = &pTSchema->columns[k];
2,147,483,647✔
1420
      if (!pInserter->fullOrderColList) {
2,147,483,647✔
1421
        int16_t* slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
132,720✔
1422
        if (NULL == slotId) {
132,720✔
1423
          continue;
32,304✔
1424
        }
1425

1426
        colIdx = *slotId;
100,416✔
1427
      }
1428

1429
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
2,147,483,647✔
1430
      if (NULL == pColInfoData) {
2,147,483,647✔
1431
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1432
        goto _end;
×
1433
      }
1434
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
2,147,483,647✔
1435

1436
      switch (pColInfoData->info.type) {
2,147,483,647✔
1437
        case TSDB_DATA_TYPE_NCHAR:
2,147,483,647✔
1438
        case TSDB_DATA_TYPE_VARBINARY:
1439
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1440
          if (pColInfoData->info.type != pCol->type) {
2,147,483,647✔
1441
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1442
                   pCol->type);
1443
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1444
            goto _end;
×
1445
          }
1446
          if (colDataIsNull_s(pColInfoData, j)) {
2,147,483,647✔
1447
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
2,484✔
1448
            if (NULL == taosArrayPush(pVals, &cv)) {
2,484✔
1449
              goto _end;
×
1450
            }
1451
          } else {
1452
            void*  data = colDataGetVarData(pColInfoData, j);
2,147,483,647✔
1453
            SValue sv = (SValue){
2,147,483,647✔
1454
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
2,147,483,647✔
1455
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
2,147,483,647✔
1456
            if (NULL == taosArrayPush(pVals, &cv)) {
2,147,483,647✔
1457
              goto _end;
×
1458
            }
1459
          }
1460
          break;
2,147,483,647✔
1461
        }
1462
        case TSDB_DATA_TYPE_BLOB:
×
1463
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1464
        case TSDB_DATA_TYPE_JSON:
1465
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1466
          terrno = TSDB_CODE_APP_ERROR;
×
1467
          goto _end;
×
1468
          break;
1469
        default:
2,147,483,647✔
1470
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
2,147,483,647✔
1471
            if (colDataIsNull_s(pColInfoData, j)) {
2,147,483,647✔
1472
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
20,928✔
1473
                qError("Primary timestamp column should not be null");
×
1474
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1475
                goto _end;
×
1476
              }
1477

1478
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
20,928✔
1479
              if (NULL == taosArrayPush(pVals, &cv)) {
20,928✔
1480
                goto _end;
×
1481
              }
1482
            } else {
1483
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
2,147,483,647✔
1484
                if (*(int64_t*)var <= lastTs) {
1,927,206,174✔
1485
                  needSortMerge = true;
11,378✔
1486
                } else {
1487
                  lastTs = *(int64_t*)var;
1,927,194,796✔
1488
                }
1489
              }
1490

1491
              SValue sv = {.type = pCol->type};
2,147,483,647✔
1492
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
2,147,483,647✔
1493
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
2,147,483,647✔
1494
              if (NULL == taosArrayPush(pVals, &cv)) {
2,147,483,647✔
1495
                goto _end;
×
1496
              }
1497
            }
1498
          } else {
1499
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1500
            terrno = TSDB_CODE_APP_ERROR;
×
1501
            goto _end;
×
1502
          }
1503
          break;
2,147,483,647✔
1504
      }
1505
    }
1506

1507
    SRow*             pRow = NULL;
1,927,281,514✔
1508
    SRowBuildScanInfo sinfo = {0};
1,927,281,514✔
1509
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
1,927,281,514✔
1510
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
7,452✔
1511
      goto _end;
7,452✔
1512
    }
1513
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
2,147,483,647✔
1514
      goto _end;
×
1515
    }
1516
  }
1517

1518
  if (needSortMerge) {
1,971,616✔
1519
    if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
11,378✔
1520
        (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
11,378✔
1521
      goto _end;
×
1522
    }
1523
  }
1524

1525
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
3,943,232✔
1526
    goto _end;
×
1527
  }
1528

1529
_end:
1,979,068✔
1530

1531
  taosMemoryFreeClear(tableName);
1,979,068✔
1532

1533
  taosArrayDestroy(pTagVals);
1,979,068✔
1534
  taosArrayDestroy(pVals);
1,979,068✔
1535

1536
  if (terrno != 0) {
1,979,068✔
1537
    *ppReq = NULL;
7,452✔
1538
    if (pReq) {
7,452✔
1539
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
7,452✔
1540
      taosMemoryFree(pReq);
7,452✔
1541
    }
1542

1543
    return terrno;
7,452✔
1544
  }
1545
  *ppReq = pReq;
1,971,616✔
1546

1547
  return TSDB_CODE_SUCCESS;
1,971,616✔
1548
}
1549

1550
static void destroySubmitReqWrapper(void* p) {
6,424✔
1551
  SSubmitReqSendInfo* pSendInfo = *(SSubmitReqSendInfo**)p;
6,424✔
1552
  if (pSendInfo != NULL) {
6,424✔
1553
    if (pSendInfo->msg != NULL) {
6,424✔
1554
      tDestroySubmitReq(pSendInfo->msg, TSDB_MSG_FLG_ENCODE);
6,424✔
1555
      taosMemoryFree(pSendInfo->msg);
6,424✔
1556
    }
1557
    taosMemoryFree(pSendInfo);
6,424✔
1558
  }
1559
}
6,424✔
1560

1561
int32_t dataBlocksToSubmitReqArray(SDataInserterHandle* pInserter, SArray* pMsgs) {
3,504✔
1562
  const SArray*   pBlocks = pInserter->pDataBlocks;
3,504✔
1563
  const STSchema* pTSchema = pInserter->pSchema;
3,504✔
1564
  int64_t         uid = pInserter->pNode->tableId;
3,504✔
1565
  int64_t         suid = pInserter->pNode->stableId;
3,504✔
1566
  int32_t         vgId = pInserter->pNode->vgId;
3,504✔
1567
  int32_t         sz = taosArrayGetSize(pBlocks);
3,504✔
1568
  int32_t         code = 0;
3,504✔
1569

1570
  SHashObj* pHash = NULL;
3,504✔
1571
  void*     iterator = NULL;
3,504✔
1572

1573
  for (int32_t i = 0; i < sz; i++) {
7,008✔
1574
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
3,504✔
1575
    if (NULL == pDataBlock) {
3,504✔
1576
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1577
    }
1578
    if (pHash == NULL) {
3,504✔
1579
      pHash = taosHashInit(sz * pDataBlock->info.rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false,
3,504✔
1580
                           HASH_ENTRY_LOCK);
1581
      if (NULL == pHash) {
3,504✔
1582
        return terrno;
×
1583
      }
1584
      taosHashSetFreeFp(pHash, destroySubmitReqWrapper);
3,504✔
1585
    }
1586
    code = buildSubmitReqFromStbBlock(pInserter, pHash, pDataBlock, pTSchema, uid, vgId, suid);
3,504✔
1587
    if (code != TSDB_CODE_SUCCESS) {
3,504✔
1588
      goto _end;
×
1589
    }
1590
  }
1591

1592
  size_t keyLen = 0;
3,504✔
1593
  while ((iterator = taosHashIterate(pHash, iterator))) {
9,928✔
1594
    SSubmitReqSendInfo* pReqSendInfo = *(SSubmitReqSendInfo**)iterator;
6,424✔
1595
    int32_t*            ctbVgId = taosHashGetKey(iterator, &keyLen);
6,424✔
1596

1597
    SSubmitTbDataSendInfo* pTbSendInfo = taosMemoryCalloc(1, sizeof(SSubmitTbDataSendInfo));
6,424✔
1598
    if (NULL == pTbSendInfo) {
6,424✔
1599
      code = terrno;
×
1600
      goto _end;
×
1601
    }
1602
    code = submitReqToMsg(*ctbVgId, pReqSendInfo->msg, &pTbSendInfo->msg.pData, &pTbSendInfo->msg.len);
6,424✔
1603
    if (code != TSDB_CODE_SUCCESS) {
6,424✔
1604
      taosMemoryFree(pTbSendInfo);
×
1605
      goto _end;
×
1606
    }
1607
    pTbSendInfo->epSet = pReqSendInfo->epSet;
6,424✔
1608
    if (NULL == taosArrayPush(pMsgs, &pTbSendInfo)) {
6,424✔
1609
      taosMemoryFree(pTbSendInfo->msg.pData);
×
1610
      taosMemoryFree(pTbSendInfo);
×
1611
      code = terrno;
×
1612
      goto _end;
×
1613
    }
1614
  }
1615

1616
_end:
3,504✔
1617
  if (pHash != NULL) {
3,504✔
1618
    taosHashCleanup(pHash);
3,504✔
1619
  }
1620

1621
  return code;
3,504✔
1622
}
1623

1624
int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32_t* msgLen) {
1,979,068✔
1625
  const SArray*   pBlocks = pInserter->pDataBlocks;
1,979,068✔
1626
  const STSchema* pTSchema = pInserter->pSchema;
1,979,068✔
1627
  int64_t         uid = pInserter->pNode->tableId;
1,979,068✔
1628
  int64_t         suid = pInserter->pNode->stableId;
1,979,068✔
1629
  int32_t         vgId = pInserter->pNode->vgId;
1,979,068✔
1630
  int32_t         sz = taosArrayGetSize(pBlocks);
1,979,068✔
1631
  int32_t         code = 0;
1,979,068✔
1632
  SSubmitReq2*    pReq = NULL;
1,979,068✔
1633

1634
  for (int32_t i = 0; i < sz; i++) {
3,950,684✔
1635
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
1,979,068✔
1636
    if (NULL == pDataBlock) {
1,979,068✔
1637
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1638
    }
1639
    code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, &uid, &vgId, &suid);
1,979,068✔
1640
    if (code) {
1,979,068✔
1641
      if (pReq) {
7,452✔
1642
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
1643
        taosMemoryFree(pReq);
×
1644
      }
1645

1646
      return code;
7,452✔
1647
    }
1648
  }
1649

1650
  code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
1,971,616✔
1651
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
1,971,616✔
1652
  taosMemoryFree(pReq);
1,971,616✔
1653

1654
  return code;
1,971,616✔
1655
}
1656

1657
int32_t getStreamInsertTableInfo(int64_t streamId, int64_t groupId, SInsertTableInfo*** ppTbInfo) {
2,686,715✔
1658
  int64_t            key[2] = {streamId, groupId};
2,686,715✔
1659
  SInsertTableInfo** pTmp = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
2,686,715✔
1660
  if (NULL == pTmp || *pTmp == NULL) {
2,686,715✔
1661
    return TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
1662
  }
1663

1664
  *ppTbInfo = pTmp;
2,686,715✔
1665
  return TSDB_CODE_SUCCESS;
2,686,715✔
1666
}
1667

1668
static int32_t releaseStreamInsertTableInfo(SInsertTableInfo** ppTbInfo) {
2,686,715✔
1669
  taosHashRelease(gStreamGrpTableHash, ppTbInfo);
2,686,715✔
1670
  return TSDB_CODE_SUCCESS;
2,686,715✔
1671
}
1672

1673
int32_t buildNormalTableCreateReq(SDataInserterHandle* pInserter, SStreamInserterParam* pInsertParam,
96,401✔
1674
                                  SSubmitTbData* tbData) {
1675
  int32_t code = TSDB_CODE_SUCCESS;
96,401✔
1676

1677
  tbData->suid = 0;
96,401✔
1678

1679
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
96,401✔
1680
  if (NULL == tbData->pCreateTbReq) {
96,401✔
1681
    goto _end;
×
1682
  }
1683
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
96,401✔
1684
  tbData->pCreateTbReq->type = TSDB_NORMAL_TABLE;
96,401✔
1685
  tbData->pCreateTbReq->flags |= (TD_CREATE_NORMAL_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
96,401✔
1686
  tbData->pCreateTbReq->uid = 0;
96,401✔
1687
  tbData->sver = pInsertParam->sver;
96,401✔
1688

1689
  tbData->pCreateTbReq->name = taosStrdup(pInsertParam->tbname);
96,401✔
1690
  if (!tbData->pCreateTbReq->name) return terrno;
96,401✔
1691

1692
  int32_t numOfCols = pInsertParam->pFields->size;
96,401✔
1693
  tbData->pCreateTbReq->ntb.schemaRow.nCols = numOfCols;
96,401✔
1694
  tbData->pCreateTbReq->ntb.schemaRow.version = 1;
96,401✔
1695

1696
  tbData->pCreateTbReq->ntb.schemaRow.pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema));
96,401✔
1697
  if (NULL == tbData->pCreateTbReq->ntb.schemaRow.pSchema) {
96,401✔
1698
    goto _end;
×
1699
  }
1700
  for (int32_t i = 0; i < numOfCols; ++i) {
543,581✔
1701
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
447,180✔
1702
    if (NULL == pField) {
447,180✔
1703
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1704
      goto _end;
×
1705
    }
1706
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].colId = i + 1;
447,180✔
1707
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].type = pField->type;
447,180✔
1708
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].bytes = pField->bytes;
447,180✔
1709
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].flags = pField->flags;
447,180✔
1710
    if (i == 0 && pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
447,180✔
1711
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1712
      qError("buildNormalTableCreateReq, the first column must be timestamp.");
×
1713
      goto _end;
×
1714
    }
1715
    snprintf(tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].name, TSDB_COL_NAME_LEN, "%s", pField->name);
447,180✔
1716
    if (IS_DECIMAL_TYPE(pField->type)) {
447,180✔
1717
      if (!tbData->pCreateTbReq->pExtSchemas) {
×
1718
        tbData->pCreateTbReq->pExtSchemas = taosMemoryCalloc(numOfCols, sizeof(SExtSchema));
×
1719
        if (NULL == tbData->pCreateTbReq->pExtSchemas) {
×
1720
          tdDestroySVCreateTbReq(tbData->pCreateTbReq);
×
1721
          tbData->pCreateTbReq = NULL;
×
1722
          return terrno;
×
1723
        }
1724
      }
1725
      tbData->pCreateTbReq->pExtSchemas[i].typeMod = pField->typeMod;
×
1726
    }
1727
  }
1728
  return TSDB_CODE_SUCCESS;
96,401✔
1729
_end:
×
1730
  return code;
×
1731
}
1732

1733
// reference tBuildTSchema funciton
1734
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema) {
232,552✔
1735
  int32_t code = TSDB_CODE_SUCCESS;
232,552✔
1736

1737
  int32_t   numOfCols = pInsertParam->pFields->size;
232,552✔
1738
  STSchema* pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
232,552✔
1739
  if (NULL == pTSchema) {
232,336✔
1740
    return terrno;
×
1741
  }
1742
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
232,336✔
1743
    pTSchema->version =
75,073✔
1744
        1;  // normal table version start from 1, if has exist table, it will be reset by resetInserterTbVersion
1745
  } else {
1746
    pTSchema->version = pInsertParam->sver;
157,479✔
1747
  }
1748
  pTSchema->numOfCols = numOfCols;
232,552✔
1749

1750
  SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, 0);
232,552✔
1751
  if (NULL == pField) {
232,552✔
1752
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1753
    goto _end;
×
1754
  }
1755
  pTSchema->columns[0].colId = PRIMARYKEY_TIMESTAMP_COL_ID;
232,552✔
1756
  pTSchema->columns[0].type = pField->type;
232,552✔
1757
  pTSchema->columns[0].flags = pField->flags;
232,336✔
1758
  pTSchema->columns[0].bytes = TYPE_BYTES[pField->type];
232,336✔
1759
  pTSchema->columns[0].offset = -1;
232,552✔
1760

1761
  pTSchema->tlen = 0;
232,552✔
1762
  pTSchema->flen = 0;
232,552✔
1763
  for (int32_t i = 1; i < numOfCols; ++i) {
1,105,471✔
1764
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
872,919✔
1765
    if (NULL == pField) {
872,919✔
1766
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1767
      goto _end;
×
1768
    }
1769
    pTSchema->columns[i].colId = i + 1;
872,919✔
1770
    pTSchema->columns[i].type = pField->type;
872,919✔
1771
    pTSchema->columns[i].flags = pField->flags;
872,698✔
1772
    pTSchema->columns[i].bytes = pField->bytes;
872,919✔
1773
    pTSchema->columns[i].offset = pTSchema->flen;
872,919✔
1774

1775
    if (IS_VAR_DATA_TYPE(pField->type)) {
872,919✔
1776
      pTSchema->columns[i].bytes = pField->bytes;
13,100✔
1777
      pTSchema->tlen += (TYPE_BYTES[pField->type] + pField->bytes);
13,100✔
1778
    } else {
1779
      pTSchema->columns[i].bytes = TYPE_BYTES[pField->type];
859,819✔
1780
      pTSchema->tlen += TYPE_BYTES[pField->type];
859,819✔
1781
    }
1782

1783
    pTSchema->flen += TYPE_BYTES[pField->type];
872,919✔
1784
  }
1785

1786
#if 1
1787
  pTSchema->tlen += (int32_t)TD_BITMAP_BYTES(numOfCols);
232,552✔
1788
#endif
1789

1790
_end:
232,552✔
1791
  if (code != TSDB_CODE_SUCCESS) {
232,552✔
1792
    taosMemoryFree(pTSchema);
×
1793
    *ppTSchema = NULL;
×
1794
  } else {
1795
    *ppTSchema = pTSchema;
232,552✔
1796
  }
1797
  return code;
232,552✔
1798
}
1799

1800
static int32_t getTagValsFromStreamInserterInfo(SStreamDataInserterInfo* pInserterInfo, int32_t preCols,
218,279✔
1801
                                                SArray** ppTagVals, SArray* pTagCids) {
1802
  int32_t code = TSDB_CODE_SUCCESS;
218,279✔
1803
  int32_t nTags = pInserterInfo->pTagVals->size;
218,279✔
1804
  *ppTagVals = taosArrayInit(nTags, sizeof(STagVal));
218,279✔
1805
  if (!ppTagVals) {
218,279✔
1806
    return terrno;
×
1807
  }
1808
  for (int32_t i = 0; i < pInserterInfo->pTagVals->size; ++i) {
577,436✔
1809
    SStreamTagInfo* pTagInfo = taosArrayGet(pInserterInfo->pTagVals, i);
359,378✔
1810
    STagVal         tagVal = {
362,090✔
1811
                .cid = pTagCids ? *(col_id_t*)taosArrayGet(pTagCids, i) : preCols + i + 1,
359,378✔
1812
                .type = pTagInfo->val.data.type,
359,378✔
1813
    };
1814
    if (!pTagInfo->val.isNull) {
359,378✔
1815
      if (IS_VAR_DATA_TYPE(pTagInfo->val.data.type)) {
359,378✔
1816
        tagVal.nData = pTagInfo->val.data.nData;
259,757✔
1817
        tagVal.pData = pTagInfo->val.data.pData;
259,757✔
1818
      } else {
1819
        tagVal.i64 = pTagInfo->val.data.val;
99,621✔
1820
      }
1821

1822
      if (NULL == taosArrayPush(*ppTagVals, &tagVal)) {
718,314✔
1823
        code = terrno;
×
1824
        goto _end;
×
1825
      }
1826
    }
1827
  }
1828
_end:
218,279✔
1829
  if (code != TSDB_CODE_SUCCESS) {
218,279✔
1830
    taosArrayDestroy(*ppTagVals);
×
1831
    *ppTagVals = NULL;
×
1832
  }
1833
  return code;
218,279✔
1834
}
1835

1836
static int32_t buildStreamSubTableCreateReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
218,279✔
1837
                                            SStreamInserterParam* pInsertParam, SStreamDataInserterInfo* pInserterInfo,
1838
                                            SSubmitTbData* tbData) {
1839
  int32_t code = TSDB_CODE_SUCCESS;
218,279✔
1840
  STag*   pTag = NULL;
218,279✔
1841
  SArray* pTagVals = NULL;
218,279✔
1842
  SArray* TagNames = NULL;
218,279✔
1843

1844
  if (pInsertParam->pTagFields == NULL) {
218,279✔
1845
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagFields is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
×
1846
                 pInsertParam->sver);
1847
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1848
  }
1849
  if (pInserterInfo->pTagVals == NULL || pInserterInfo->pTagVals->size == 0) {
218,279✔
1850
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagVals is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
221✔
1851
                 pInsertParam->sver);
1852
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1853
  }
1854
  if (pInsertParam->suid <= 0 || pInsertParam->sver <= 0) {
218,058✔
1855
    ST_TASK_ELOG("buildStreamSubTableCreateReq, suid:%" PRId64
×
1856
                 ", sver:%d"
1857
                 " must be greater than 0",
1858
                 pInsertParam->suid, pInsertParam->sver);
1859
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1860
  }
1861
  int32_t nTags = pInserterInfo->pTagVals->size;
218,279✔
1862

1863
  TagNames = taosArrayInit(nTags, TSDB_COL_NAME_LEN);
218,279✔
1864
  if (!TagNames) {
218,279✔
1865
    code = terrno;
×
1866
    goto _end;
×
1867
  }
1868
  for (int32_t i = 0; i < nTags; ++i) {
577,657✔
1869
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pTagFields, i);
359,378✔
1870
    if (NULL == taosArrayPush(TagNames, pField->name)) {
718,756✔
1871
      code = terrno;
×
1872
      goto _end;
×
1873
    }
1874
  }
1875

1876
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
218,279✔
1877
  tbData->uid = 0;
218,279✔
1878
  tbData->suid = pInsertParam->suid;
218,279✔
1879
  tbData->sver = pInsertParam->sver;
218,279✔
1880

1881
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
218,063✔
1882
  if (NULL == tbData->pCreateTbReq) {
218,279✔
1883
    code = terrno;
×
1884
    goto _end;
×
1885
  }
1886
  tbData->pCreateTbReq->type = TSDB_CHILD_TABLE;
218,279✔
1887
  tbData->pCreateTbReq->flags |= (TD_CREATE_SUB_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
218,279✔
1888

1889
  code = getTagValsFromStreamInserterInfo(pInserterInfo, pInsertParam->pFields->size, &pTagVals, pInsertParam->tagCids);
218,279✔
1890
  if (code != TSDB_CODE_SUCCESS) {
218,279✔
1891
    goto _end;
×
1892
  }
1893

1894
  code = tTagNew(pTagVals, pInsertParam->sver, false, &pTag);
218,279✔
1895
  if (code != TSDB_CODE_SUCCESS) {
218,058✔
1896
    ST_TASK_ELOG("failed to create tag, error:%s", tstrerror(code));
×
1897
    goto _end;
×
1898
  }
1899
  code = inserterBuildCreateTbReq(tbData->pCreateTbReq, pInserterInfo->tbName, pTag, tbData->suid,
218,279✔
1900
                                  pInsertParam->stbname, TagNames, nTags, TSDB_DEFAULT_TABLE_TTL);
218,058✔
1901
  if (code != TSDB_CODE_SUCCESS) {
217,955✔
1902
    ST_TASK_ELOG("failed to build create table request, error:%s", tstrerror(code));
×
1903
    goto _end;
×
1904
  }
1905

1906
_end:
217,955✔
1907
  if (code != TSDB_CODE_SUCCESS) {
218,279✔
1908
    ST_TASK_ELOG("buildStreamSubTableCreateReq failed, error:%s", tstrerror(code));
×
1909
    if (tbData->pCreateTbReq) {
×
1910
      taosMemoryFreeClear(tbData->pCreateTbReq->name);
×
1911
      taosMemoryFreeClear(tbData->pCreateTbReq);
×
1912
    }
1913
    if (TagNames) {
×
1914
      taosArrayDestroy(TagNames);
×
1915
    }
1916
  }
1917

1918
  if (pTagVals) {
218,279✔
1919
    taosArrayDestroy(pTagVals);
218,279✔
1920
  }
1921
  return code;
218,279✔
1922
}
1923

1924
static int32_t appendInsertData(SStreamInserterParam* pInsertParam, const SSDataBlock* pDataBlock,
2,664,173✔
1925
                                SSubmitTbData* tbData, STSchema* pTSchema, SBuildInsertDataInfo* dataInsertInfo) {
1926
  int32_t code = TSDB_CODE_SUCCESS;
2,664,173✔
1927
  int32_t lino = 0;
2,664,173✔
1928

1929
  int32_t rows = pDataBlock ? pDataBlock->info.rows : 0;
2,664,173✔
1930
  int32_t numOfCols = pInsertParam->pFields->size;
2,664,173✔
1931
  int32_t colNum = pDataBlock ? taosArrayGetSize(pDataBlock->pDataBlock) : 0;
2,664,173✔
1932

1933
  SArray* pVals = NULL;
2,663,957✔
1934
  if (!(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
2,663,957✔
1935
    code = terrno;
×
1936
    QUERY_CHECK_CODE(code, lino, _end);
×
1937
  }
1938

1939
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
34,920,051✔
1940
    taosArrayClear(pVals);
32,256,300✔
1941

1942
    bool tsOrPrimaryKeyIsNull = false;
32,258,268✔
1943
    for (int32_t k = 0; k < numOfCols; ++k) {  // iterate by column
131,250,392✔
1944
      int16_t colIdx = k + 1;
99,056,806✔
1945

1946
      SFieldWithOptions* pCol = taosArrayGet(pInsertParam->pFields, k);
99,056,806✔
1947
      if (PRIMARYKEY_TIMESTAMP_COL_ID != colIdx && TSDB_DATA_TYPE_NULL == pCol->type) {
98,988,263✔
1948
        SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
×
1949
        if (NULL == taosArrayPush(pVals, &cv)) {
×
1950
          code = terrno;
×
1951
          QUERY_CHECK_CODE(code, lino, _end);
×
1952
        }
1953
        continue;
×
1954
      }
1955

1956
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
98,983,084✔
1957
      if (NULL == pColInfoData) {
98,905,038✔
1958
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1959
        QUERY_CHECK_CODE(code, lino, _end);
×
1960
      }
1961
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
98,905,038✔
1962

1963
      if (colDataIsNull_s(pColInfoData, j) && (pCol->flags & COL_IS_KEY)) {
198,151,376✔
1964
        tsOrPrimaryKeyIsNull = true;
44,220✔
1965
        qDebug("Primary key column should not be null, skip this row");
44,220✔
1966
        break;
44,220✔
1967
      }
1968
      switch (pColInfoData->info.type) {
99,197,250✔
1969
        case TSDB_DATA_TYPE_NCHAR:
3,144,850✔
1970
        case TSDB_DATA_TYPE_VARBINARY:
1971
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1972
          if (pColInfoData->info.type != pCol->type) {
3,144,850✔
1973
            qError("tb:%s column:%d type:%d in block dismatch with schema col:%d type:%d", pInsertParam->tbname, k,
×
1974
                   pColInfoData->info.type, k, pCol->type);
1975
            code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1976
            QUERY_CHECK_CODE(code, lino, _end);
×
1977
          }
1978
          if (colDataIsNull_s(pColInfoData, j)) {
6,289,700✔
1979
            SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
3,017✔
1980
            if (NULL == taosArrayPush(pVals, &cv)) {
3,017✔
1981
              code = terrno;
×
1982
              QUERY_CHECK_CODE(code, lino, _end);
×
1983
            }
1984
          } else {
1985
            if (pColInfoData->pData == NULL) {
3,141,833✔
1986
              qError("build insert tb:%s, column:%d data is NULL in block", pInsertParam->tbname, k);
×
1987
              code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1988
              QUERY_CHECK_CODE(code, lino, _end);
×
1989
            }
1990
            void*  data = colDataGetVarData(pColInfoData, j);
3,141,833✔
1991
            SValue sv = (SValue){
9,425,499✔
1992
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
3,141,833✔
1993
            SColVal cv = COL_VAL_VALUE(colIdx, sv);
3,141,833✔
1994
            if (NULL == taosArrayPush(pVals, &cv)) {
3,141,833✔
1995
              code = terrno;
×
1996
              QUERY_CHECK_CODE(code, lino, _end);
×
1997
            }
1998
          }
1999
          break;
3,144,850✔
2000
        }
2001
        case TSDB_DATA_TYPE_BLOB:
×
2002
        case TSDB_DATA_TYPE_JSON:
2003
        case TSDB_DATA_TYPE_MEDIUMBLOB:
2004
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
2005
          code = TSDB_CODE_APP_ERROR;
×
2006
          QUERY_CHECK_CODE(code, lino, _end);
×
2007
          break;
×
2008
        default:
95,953,325✔
2009
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
95,953,325✔
2010
            if (colDataIsNull_s(pColInfoData, j)) {
192,024,872✔
2011
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx) {
1,024,804✔
2012
                tsOrPrimaryKeyIsNull = true;
2,190✔
2013
                qDebug("Primary timestamp column should not be null, skip this row");
2,190✔
2014
                break;
2,190✔
2015
              }
2016

2017
              SColVal cv = COL_VAL_NULL(colIdx, pCol->type);  // should use pCol->type
1,022,614✔
2018
              if (NULL == taosArrayPush(pVals, &cv)) {
1,022,614✔
2019
                code = terrno;
×
2020
                QUERY_CHECK_CODE(code, lino, _end);
×
2021
              }
2022
            } else {
2023
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx && !dataInsertInfo->needSortMerge) {
95,037,969✔
2024
                if (*(int64_t*)var <= dataInsertInfo->lastTs) {
9,014,582✔
2025
                  dataInsertInfo->needSortMerge = true;
20,937✔
2026
                } else {
2027
                  dataInsertInfo->lastTs = *(int64_t*)var;
8,992,801✔
2028
                }
2029
              }
2030

2031
              SValue sv = {.type = pCol->type};
95,038,495✔
2032
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
94,940,694✔
2033
              SColVal cv = COL_VAL_VALUE(colIdx, sv);
94,912,041✔
2034
              if (NULL == taosArrayPush(pVals, &cv)) {
94,873,729✔
2035
                code = terrno;
×
2036
                QUERY_CHECK_CODE(code, lino, _end);
2,367✔
2037
              }
2038
            }
2039
          } else {
2040
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
503✔
2041
            code = TSDB_CODE_APP_ERROR;
3,549✔
2042
            QUERY_CHECK_CODE(code, lino, _end);
3,549✔
2043
          }
2044
          break;
95,860,590✔
2045
      }
2046
      if (tsOrPrimaryKeyIsNull) break;  // skip remaining columns because the primary key is null
98,994,314✔
2047
    }
2048
    if (tsOrPrimaryKeyIsNull) continue;  // skip this row if primary key is null
32,239,996✔
2049
    SRow*             pRow = NULL;
32,193,586✔
2050
    SRowBuildScanInfo sinfo = {0};
32,211,133✔
2051
    if ((code = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) != TSDB_CODE_SUCCESS) {
32,210,734✔
2052
      QUERY_CHECK_CODE(code, lino, _end);
×
2053
    }
2054
    if (NULL == taosArrayPush(tbData->aRowP, &pRow)) {
64,412,165✔
2055
      taosMemFree(pRow);
×
2056
      code = terrno;
×
2057
      QUERY_CHECK_CODE(code, lino, _end);
×
2058
    }
2059
  }
2060
  if (dataInsertInfo->isLastBlock) {
2,663,751✔
2061
    int32_t nRows = taosArrayGetSize(tbData->aRowP);
2,662,750✔
2062
    if (taosArrayGetSize(tbData->aRowP) == 0) {
2,662,750✔
2063
      tbData->flags |= SUBMIT_REQ_ONLY_CREATE_TABLE;
2,056,097✔
2064
      stDebug("no valid data to insert, try to only create tabale:%s", pInsertParam->tbname);
2,056,097✔
2065
    }
2066
    stDebug("appendInsertData, isLastBlock:%d, needSortMerge:%d, totalRows:%d", dataInsertInfo->isLastBlock,
2,662,750✔
2067
            dataInsertInfo->needSortMerge, nRows);
2068
    if (dataInsertInfo->needSortMerge) {
2,662,750✔
2069
      if ((tRowSort(tbData->aRowP) != TSDB_CODE_SUCCESS) ||
41,874✔
2070
          (code = tRowMerge(tbData->aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
20,937✔
2071
        QUERY_CHECK_CODE(code, lino, _end);
×
2072
      }
2073
    }
2074
    nRows = taosArrayGetSize(tbData->aRowP);
2,662,750✔
2075
    stDebug("appendInsertData, after merge, totalRows:%d", nRows);
2,662,750✔
2076
  }
2077

2078
_end:
147,753✔
2079
  taosArrayDestroy(pVals);
2,662,930✔
2080
  return code;
2,664,173✔
2081
}
2082

2083
int32_t buildStreamSubmitReqFromBlock(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
2,664,173✔
2084
                                      SStreamDataInserterInfo* pInserterInfo, SSubmitReq2** ppReq,
2085
                                      const SSDataBlock* pDataBlock, SVgroupInfo* vgInfo,
2086
                                      SBuildInsertDataInfo* tbDataInfo) {
2087
  SSubmitReq2* pReq = *ppReq;
2,664,173✔
2088
  int32_t      numOfBlks = 0;
2,664,173✔
2089

2090
  int32_t               code = TSDB_CODE_SUCCESS;
2,664,173✔
2091
  int32_t               lino = 0;
2,664,173✔
2092
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
2,664,173✔
2093
  SInsertTableInfo**    ppTbInfo = NULL;
2,664,173✔
2094
  SInsertTableInfo*     pTbInfo = NULL;
2,664,173✔
2095
  STSchema*             pTSchema = NULL;
2,664,173✔
2096
  SSubmitTbData*        tbData = &tbDataInfo->pTbData;
2,664,173✔
2097
  int32_t               colNum = 0;
2,664,173✔
2098
  int32_t               rows = 0;
2,664,173✔
2099

2100
  if (NULL == pReq) {
2,664,173✔
2101
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
2,662,750✔
2102
      code = terrno;
×
2103
      QUERY_CHECK_CODE(code, lino, _end);
×
2104
    }
2105
    *ppReq = pReq;
2,662,750✔
2106

2107
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
2,662,750✔
2108
      code = terrno;
×
2109
      QUERY_CHECK_CODE(code, lino, _end);
×
2110
    }
2111
  }
2112

2113
  if (pDataBlock) {
2,664,173✔
2114
    colNum = taosArrayGetSize(pDataBlock->pDataBlock);
610,266✔
2115
    rows = pDataBlock->info.rows;
610,266✔
2116
  }
2117

2118
  tbData->flags |= SUBMIT_REQ_SCHEMA_RES;
2,664,173✔
2119

2120
  if (tbDataInfo->isFirstBlock) {
2,664,173✔
2121
    if (pInserterInfo->isAutoCreateTable) {
2,662,750✔
2122
      code = initTableInfo(pInserter, pInserterInfo);
314,680✔
2123
      QUERY_CHECK_CODE(code, lino, _end);
314,680✔
2124
      if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
314,680✔
2125
        code = buildNormalTableCreateReq(pInserter, pInsertParam, tbData);
96,401✔
2126
      } else if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
218,279✔
2127
        code = buildStreamSubTableCreateReq(pTask, pInserter, pInsertParam, pInserterInfo, tbData);
218,279✔
2128
      } else {
2129
        code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
2130
        ST_TASK_ELOG("buildStreamSubmitReqFromBlock, unknown table type %d", pInsertParam->tbType);
×
2131
      }
2132
      QUERY_CHECK_CODE(code, lino, _end);
314,680✔
2133
    }
2134
  }
2135

2136
  code = getStreamInsertTableInfo(pInserterInfo->streamId, pInserterInfo->groupId, &ppTbInfo);
2,664,173✔
2137
  QUERY_CHECK_CODE(code, lino, _end);
2,664,173✔
2138

2139
  pTbInfo =  *ppTbInfo;
2,664,173✔
2140
  if (tbDataInfo->isFirstBlock) {
2,664,173✔
2141
    if (!pInserterInfo->isAutoCreateTable) {
2,662,750✔
2142
      tstrncpy(pInserterInfo->tbName, pTbInfo->tbname, TSDB_TABLE_NAME_LEN);
2,348,070✔
2143
    }
2144

2145
    tbData->uid = pTbInfo->uid;
2,662,750✔
2146
    tbData->sver = pTbInfo->version;
2,662,750✔
2147

2148
    if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
2,662,750✔
2149
      tbData->suid = pInsertParam->suid;
2,310,632✔
2150
    }
2151

2152
    pTSchema = pTbInfo->pSchema;
2,662,750✔
2153
  } else {
2154
    pTSchema = pTbInfo->pSchema;
1,423✔
2155
  }
2156

2157
  code = getTableVgInfo(pInserter, pInsertParam->dbFName, pTbInfo->tbname, vgInfo);
2,664,173✔
2158
  QUERY_CHECK_CODE(code, lino, _end);
2,664,173✔
2159

2160
  ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64 " tbname:%s autoCreate:%d uid:%" PRId64 " suid:%" PRId64
2,664,173✔
2161
               " sver:%d vgid:%d isLastBlock:%d",
2162
               pInserter, pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, tbData->uid,
2163
               tbData->suid, tbData->sver, vgInfo->vgId, tbDataInfo->isFirstBlock);
2164

2165
  code = appendInsertData(pInsertParam, pDataBlock, tbData, pTSchema, tbDataInfo);
2,664,173✔
2166
  QUERY_CHECK_CODE(code, lino, _end);
2,664,173✔
2167

2168
_end:
2,664,173✔
2169
  releaseStreamInsertTableInfo(ppTbInfo);
2,664,173✔
2170
  if (code != TSDB_CODE_SUCCESS) {
2,664,173✔
2171
    ST_TASK_ELOG("buildStreamSubmitReqFromBlock, code:0x%0x, groupId:%" PRId64 " tbname:%s autoCreate:%d", code,
×
2172
                 pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable);
2173
  }
2174
  return code;
2,664,173✔
2175
}
2176

2177
int32_t streamDataBlocksToSubmitReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
2,662,750✔
2178
                                    SStreamDataInserterInfo* pInserterInfo, void** pMsg, int32_t* msgLen,
2179
                                    SVgroupInfo* vgInfo) {
2180
  int32_t code = 0;
2,662,750✔
2181
  int32_t lino = 0;
2,662,750✔
2182

2183
  const SArray*        pBlocks = pInserter->pDataBlocks;
2,662,750✔
2184
  int32_t              sz = taosArrayGetSize(pBlocks);
2,662,750✔
2185
  SSubmitReq2*         pReq = NULL;
2,662,750✔
2186
  SBuildInsertDataInfo tbDataInfo = {0};
2,662,750✔
2187

2188
  int32_t rows = 0;
2,662,750✔
2189
  for (int32_t i = 0; i < sz; i++) {
5,326,923✔
2190
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
2,664,173✔
2191
    if (NULL == pDataBlock) {
2,664,173✔
2192
      stDebug("data block is NULL, just create empty table");
2,053,907✔
2193
      continue;
2,053,907✔
2194
    }
2195
    rows += pDataBlock->info.rows;
610,266✔
2196
  }
2197
  code = initInsertProcessInfo(&tbDataInfo, rows);
2,662,750✔
2198
  if (code != TSDB_CODE_SUCCESS) {
2,662,502✔
2199
    ST_TASK_ELOG("streamDataBlocksToSubmitReq, initInsertDataInfo failed, code:%d", code);
×
2200
    return code;
×
2201
  }
2202

2203
  for (int32_t i = 0; i < sz; i++) {
5,326,675✔
2204
    tbDataInfo.isFirstBlock = (i == 0);
2,663,925✔
2205
    tbDataInfo.isLastBlock = (i == sz - 1);
2,663,925✔
2206
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
2,663,925✔
2207
    ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64
2,664,173✔
2208
            " tbname:%s autoCreate:%d block: %d/%d rows:%" PRId64,
2209
            pInserter, pInserterInfo->groupId, pInserterInfo->tbName,
2210
            pInserterInfo->isAutoCreateTable, i + 1, sz, (pDataBlock != NULL ? pDataBlock->info.rows : 0));
2211
    code = buildStreamSubmitReqFromBlock(pTask, pInserter, pInserterInfo, &pReq, pDataBlock, vgInfo, &tbDataInfo);
2,664,173✔
2212
    QUERY_CHECK_CODE(code, lino, _end);
2,664,173✔
2213
  }
2214

2215
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbDataInfo.pTbData)) {
5,325,500✔
2216
    code = terrno;
×
2217
    QUERY_CHECK_CODE(code, lino, _end);
×
2218
  }
2219

2220
  code = submitReqToMsg(vgInfo->vgId, pReq, pMsg, msgLen);
2,662,750✔
2221
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
2,662,537✔
2222
  taosMemoryFree(pReq);
2,662,502✔
2223
  ST_TASK_DLOG("[data inserter], submit req, vgid:%d, GROUP:%" PRId64 " tbname:%s autoCreate:%d code:%d ", vgInfo->vgId,
2,662,340✔
2224
               pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, code);
2225

2226
_end:
2,660,668✔
2227
  if (code != 0) {
2,662,750✔
2228
    tDestroySubmitTbData(&tbDataInfo.pTbData, TSDB_MSG_FLG_ENCODE);
×
2229
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
2230
    taosMemoryFree(pReq);
×
2231
  }
2232

2233
  return code;
2,662,750✔
2234
}
2235

2236
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
1,987,054✔
2237
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
1,987,054✔
2238
  if (!pInserter->explain) {
1,987,054✔
2239
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
3,965,144✔
2240
      return terrno;
×
2241
    }
2242
    if (pInserter->isStbInserter) {
1,982,572✔
2243
      SArray* pMsgs = taosArrayInit(4, sizeof(POINTER_BYTES));
3,504✔
2244
      if (NULL == pMsgs) {
3,504✔
2245
        return terrno;
×
2246
      }
2247
      int32_t code = dataBlocksToSubmitReqArray(pInserter, pMsgs);
3,504✔
2248
      if (code) {
3,504✔
2249
        taosArrayDestroyP(pMsgs, destroySSubmitTbDataSendInfo);
×
2250
        return code;
×
2251
      }
2252
      taosArrayClear(pInserter->pDataBlocks);
3,504✔
2253
      for (int32_t i = 0; i < taosArrayGetSize(pMsgs); ++i) {
9,928✔
2254
        SSubmitTbDataSendInfo* pSendInfo = taosArrayGetP(pMsgs, i);
6,424✔
2255
        code = sendSubmitRequest(pInserter, NULL, pSendInfo->msg.pData, pSendInfo->msg.len,
12,848✔
2256
                                 pInserter->pParam->readHandle->pMsgCb->clientRpc, &pSendInfo->epSet);
6,424✔
2257
        taosMemoryFree(pSendInfo);
6,424✔
2258
        if (code) {
6,424✔
2259
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
2260
            SSubmitTbDataSendInfo* pSendInfo2 = taosArrayGetP(pMsgs, j);
×
2261
            destroySSubmitTbDataSendInfo(pSendInfo2);
×
2262
          }
2263
          taosArrayDestroy(pMsgs);
×
2264
          return code;
×
2265
        }
2266
        QRY_ERR_RET(tsem_wait(&pInserter->ready));
6,424✔
2267

2268
        if (pInserter->submitRes.code) {
6,424✔
2269
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
2270
            SSubmitTbDataSendInfo* pSendInfo2 = taosArrayGetP(pMsgs, j);
×
2271
            destroySSubmitTbDataSendInfo(pSendInfo2);
×
2272
          }
2273
          taosArrayDestroy(pMsgs);
×
2274
          return pInserter->submitRes.code;
×
2275
        }
2276
      }
2277

2278
      taosArrayDestroy(pMsgs);
3,504✔
2279

2280
    } else {
2281
      void*   pMsg = NULL;
1,979,068✔
2282
      int32_t msgLen = 0;
1,979,068✔
2283
      int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
1,979,068✔
2284
      if (code) {
1,979,068✔
2285
        return code;
7,452✔
2286
      }
2287

2288
      taosArrayClear(pInserter->pDataBlocks);
1,971,616✔
2289

2290
      code = sendSubmitRequest(pInserter, NULL, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc,
1,971,616✔
2291
                               &pInserter->pNode->epSet);
1,971,616✔
2292
      if (code) {
1,971,616✔
2293
        return code;
×
2294
      }
2295

2296
      QRY_ERR_RET(tsem_wait(&pInserter->ready));
1,971,616✔
2297

2298
      if (pInserter->submitRes.code) {
1,971,616✔
2299
        return pInserter->submitRes.code;
×
2300
      }
2301
    }
2302
  }
2303

2304
  *pContinue = true;
1,979,602✔
2305

2306
  return TSDB_CODE_SUCCESS;
1,979,602✔
2307
}
2308

2309
static int32_t resetInserterTbVersion(SDataInserterHandle* pInserter, const SInputData* pInput) {
22,296✔
2310
  SInsertTableInfo** ppTbInfo = NULL;
22,296✔
2311
  int32_t           code = getStreamInsertTableInfo(pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId, &ppTbInfo);
22,296✔
2312
  if (code != TSDB_CODE_SUCCESS) {
22,296✔
2313
    return code;
×
2314
  }
2315

2316
  SInsertTableInfo*  pTbInfo  = *ppTbInfo;
22,296✔
2317
  stDebug("resetInserterTbVersion, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbName:%s, uid:%" PRId64 ", version:%d",
22,296✔
2318
          pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId,
2319
          pInput->pStreamDataInserterInfo->tbName, pTbInfo->uid, pTbInfo->version);
2320
  if (pInserter->pParam->streamInserterParam->tbType != TSDB_NORMAL_TABLE) {
22,296✔
2321
    pInserter->pParam->streamInserterParam->sver = pTbInfo->version;
860✔
2322
  }
2323
  code = releaseStreamInsertTableInfo(ppTbInfo);
22,296✔
2324
  return code;
22,296✔
2325
}
2326

2327
static int32_t putStreamDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
2,640,454✔
2328
  int32_t              code = 0;
2,640,454✔
2329
  int32_t              lino = 0;
2,640,454✔
2330
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
2,640,454✔
2331
  SStreamRunnerTask*   pTask = pInput->pTask;
2,640,454✔
2332
  if (!pInserter || !pInserter->pParam || !pInserter->pParam->streamInserterParam) {
2,640,454✔
UNCOV
2333
    ST_TASK_ELOG("putStreamDataBlock invalid param, pInserter: %p, pParam:%p", pInserter,
×
2334
                 pInserter ? pInserter->pParam : NULL);
2335
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2336
  }
2337
  if (!pInserter->explain) {
2,640,454✔
2338
    code = TSDB_CODE_SUCCESS;
2,640,454✔
2339
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
5,280,908✔
2340
      return terrno;
×
2341
    }
2342
    void*       pMsg = NULL;
2,640,454✔
2343
    int32_t     msgLen = 0;
2,640,454✔
2344
    SVgroupInfo vgInfo = {0};
2,640,454✔
2345

2346
    code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
2,640,454✔
2347
    QUERY_CHECK_CODE(code, lino, _return);
2,640,454✔
2348

2349
    code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
2,640,454✔
2350
                             pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
2,640,454✔
2351
    QUERY_CHECK_CODE(code, lino, _return);
2,640,454✔
2352

2353
    code = tsem_wait(&pInserter->ready);
2,640,454✔
2354
    QUERY_CHECK_CODE(code, lino, _return);
2,640,454✔
2355

2356
    if (pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
2,640,454✔
2357
      pInput->pStreamDataInserterInfo->isAutoCreateTable = false;
22,296✔
2358
      code = resetInserterTbVersion(pInserter, pInput);
22,296✔
2359
      QUERY_CHECK_CODE(code, lino, _return);
22,296✔
2360

2361
      code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
22,296✔
2362
      QUERY_CHECK_CODE(code, lino, _return);
22,296✔
2363

2364
      code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
22,296✔
2365
                               pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
22,296✔
2366
      QUERY_CHECK_CODE(code, lino, _return);
22,296✔
2367

2368
      code = tsem_wait(&pInserter->ready);
22,296✔
2369
      QUERY_CHECK_CODE(code, lino, _return);
22,296✔
2370
    }
2371

2372
    if (pInput->pStreamDataInserterInfo->isAutoCreateTable &&
2,640,454✔
2373
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
292,384✔
2374
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
1,423✔
2375
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
1,423✔
2376
                   ", tbName:%s. so reset dbVgInfo and try again",
2377
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2378
      return putStreamDataBlock(pHandle, pInput, pContinue);
1,423✔
2379
    }
2380

2381
    if ((pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_NOT_EXIST &&
2,639,031✔
2382
         !pInput->pStreamDataInserterInfo->isAutoCreateTable) ||
627✔
2383
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
2,638,404✔
2384
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
627✔
2385
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
627✔
2386
                   ", tbName:%s. so reset dbVgInfo",
2387
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2388
      code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
627✔
2389
      QUERY_CHECK_CODE(code, lino, _return);
627✔
2390
    }
2391

2392
    if (pInserter->submitRes.code) {
2,638,404✔
2393
      code = pInserter->submitRes.code;
582✔
2394
      ST_TASK_ELOG("submitRes err:%s, code:%0x", tstrerror(pInserter->submitRes.code), pInserter->submitRes.code);
582✔
2395
      QUERY_CHECK_CODE(code, lino, _return);
582✔
2396
    }
2397

2398
    *pContinue = true;
2,637,822✔
2399

2400
  _return:
2,639,031✔
2401
    taosArrayClear(pInserter->pDataBlocks);
2,639,031✔
2402
    if (code == TSDB_CODE_STREAM_NO_DATA) {
2,639,031✔
2403
      ST_TASK_DLOG("putStreamDataBlock, no valid data to insert, skip this block, groupID:%" PRId64,
×
2404
                   pInput->pStreamDataInserterInfo->groupId);
2405
      code = TSDB_CODE_SUCCESS;
×
2406
    } else if (code) {
2,639,031✔
2407
      ST_TASK_ELOG("submitRes err:%s, code:%0x lino:%d", tstrerror(code), code, lino);
1,209✔
2408
      return code;
1,209✔
2409
    }
2410
    return code;
2,637,822✔
2411
  }
UNCOV
2412
  return TSDB_CODE_SUCCESS;
×
2413
}
2414

2415
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
253,578✔
2416
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
253,578✔
2417
  (void)taosThreadMutexLock(&pInserter->mutex);
253,578✔
2418
  pInserter->queryEnd = true;
253,578✔
2419
  pInserter->useconds = useconds;
253,578✔
2420
  (void)taosThreadMutexUnlock(&pInserter->mutex);
253,578✔
2421
}
253,578✔
2422

2423
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
253,578✔
2424
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
253,578✔
2425
  *pLen = pDispatcher->submitRes.affectedRows;
253,578✔
2426
  qDebug("got total affectedRows %" PRId64, *pLen);
253,578✔
2427
}
253,578✔
2428

2429
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
469,030✔
2430
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
469,030✔
2431
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
469,030✔
2432
  taosArrayDestroy(pInserter->pDataBlocks);
469,030✔
2433
  taosMemoryFree(pInserter->pSchema);
469,030✔
2434
  if (pInserter->pParam->streamInserterParam) {
469,030✔
2435
    destroyStreamInserterParam(pInserter->pParam->streamInserterParam);
207,416✔
2436
    taosMemoryFree(pInserter->pParam->readHandle); // only for stream
207,416✔
2437
  }
2438
  taosMemoryFree(pInserter->pParam);
469,030✔
2439
  taosHashCleanup(pInserter->pCols);
469,030✔
2440
  nodesDestroyNode((SNode*)pInserter->pNode);
469,030✔
2441
  pInserter->pNode = NULL;
469,030✔
2442

2443
  (void)taosThreadMutexDestroy(&pInserter->mutex);
469,030✔
2444

2445
  taosMemoryFree(pInserter->pManager);
469,030✔
2446

2447
  if (pInserter->dbVgInfoMap) {
469,030✔
2448
    taosHashSetFreeFp(pInserter->dbVgInfoMap, freeUseDbOutput_tmp);
3,504✔
2449
    taosHashCleanup(pInserter->dbVgInfoMap);
3,504✔
2450
  }
2451

2452
  if (pInserter->pTagSchema) {
469,030✔
2453
    taosMemoryFreeClear(pInserter->pTagSchema->pSchema);
4,088✔
2454
    taosMemoryFree(pInserter->pTagSchema);
4,088✔
2455
  }
2456

2457
  return TSDB_CODE_SUCCESS;
469,030✔
2458
}
2459

2460
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
2461
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
×
2462

2463
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
2464
  return TSDB_CODE_SUCCESS;
×
2465
}
2466

2467
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
261,030✔
2468
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
261,030✔
2469

2470
  *pFlags = atomic_load_64(&pDispatcher->flags);
261,030✔
2471
  return TSDB_CODE_SUCCESS;
261,030✔
2472
}
2473

2474
int32_t createDataInserter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
261,614✔
2475
                           void* pParam) {
2476
  SDataSinkNode*       pDataSink = *ppDataSink;
261,614✔
2477
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
261,614✔
2478
  if (NULL == inserter) {
261,614✔
2479
    taosMemoryFree(pParam);
×
2480
    goto _return;
×
2481
  }
2482

2483
  SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
261,614✔
2484
  inserter->sink.fPut = putDataBlock;
261,614✔
2485
  inserter->sink.fEndPut = endPut;
261,614✔
2486
  inserter->sink.fGetLen = getDataLength;
261,614✔
2487
  inserter->sink.fGetData = NULL;
261,614✔
2488
  inserter->sink.fDestroy = destroyDataSinker;
261,614✔
2489
  inserter->sink.fGetCacheSize = getCacheSize;
261,614✔
2490
  inserter->sink.fGetFlags = getSinkFlags;
261,614✔
2491
  inserter->pManager = pManager;
261,614✔
2492
  inserter->pNode = pInserterNode;
261,614✔
2493
  inserter->pParam = pParam;
261,614✔
2494
  inserter->status = DS_BUF_EMPTY;
261,614✔
2495
  inserter->queryEnd = false;
261,614✔
2496
  inserter->explain = pInserterNode->explain;
261,614✔
2497
  *ppDataSink = NULL;
261,614✔
2498

2499
  int64_t suid = 0;
261,614✔
2500
  int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
261,614✔
2501
                                                       &inserter->pSchema, &suid, &inserter->pTagSchema);
2502
  if (code) {
261,614✔
2503
    terrno = code;
×
2504
    goto _return;
×
2505
  }
2506

2507
  pManager->pAPI->metaFn.getBasicInfo(inserter->pParam->readHandle->vnode, &inserter->dbFName, NULL, NULL, NULL);
261,614✔
2508

2509
  if (pInserterNode->tableType == TSDB_SUPER_TABLE) {
261,614✔
2510
    inserter->isStbInserter = true;
4,088✔
2511
  }
2512

2513
  if (pInserterNode->stableId != suid) {
261,614✔
2514
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
×
2515
    goto _return;
×
2516
  }
2517

2518
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
261,614✔
2519
  if (NULL == inserter->pDataBlocks) {
261,614✔
2520
    goto _return;
×
2521
  }
2522
  QRY_ERR_JRET(taosThreadMutexInit(&inserter->mutex, NULL));
261,614✔
2523

2524
  inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;
261,614✔
2525

2526
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
261,614✔
2527
                                 false, HASH_NO_LOCK);
2528
  if (NULL == inserter->pCols) {
261,614✔
2529
    goto _return;
×
2530
  }
2531

2532
  SNode*  pNode = NULL;
261,614✔
2533
  int32_t i = 0;
261,614✔
2534
  bool    foundTbname = false;
261,614✔
2535
  FOREACH(pNode, pInserterNode->pCols) {
3,547,177✔
2536
    if (pNode->type == QUERY_NODE_FUNCTION && ((SFunctionNode*)pNode)->funcType == FUNCTION_TYPE_TBNAME) {
3,285,563✔
2537
      int16_t colId = 0;
3,504✔
2538
      int16_t slotId = 0;
3,504✔
2539
      QRY_ERR_JRET(taosHashPut(inserter->pCols, &colId, sizeof(colId), &slotId, sizeof(slotId)));
3,504✔
2540
      foundTbname = true;
3,504✔
2541
      continue;
3,504✔
2542
    }
2543
    SColumnNode* pCol = (SColumnNode*)pNode;
3,282,059✔
2544
    QRY_ERR_JRET(taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId)));
3,282,059✔
2545
    if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) {
3,282,059✔
2546
      inserter->fullOrderColList = false;
1,168✔
2547
    }
2548
    ++i;
3,282,059✔
2549
  }
2550

2551
  if (inserter->isStbInserter && !foundTbname) {
261,614✔
2552
    QRY_ERR_JRET(TSDB_CODE_PAR_TBNAME_ERROR);
584✔
2553
  }
2554

2555
  QRY_ERR_JRET(tsem_init(&inserter->ready, 0, 0));
261,030✔
2556

2557
  inserter->dbVgInfoMap = NULL;
261,030✔
2558

2559
  *pHandle = inserter;
261,030✔
2560
  return TSDB_CODE_SUCCESS;
261,030✔
2561

2562
_return:
584✔
2563

2564
  if (inserter) {
584✔
2565
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
584✔
2566
    taosMemoryFree(inserter);
584✔
2567
  } else {
2568
    taosMemoryFree(pManager);
×
2569
  }
2570

2571
  nodesDestroyNode((SNode*)*ppDataSink);
584✔
2572
  *ppDataSink = NULL;
584✔
2573

2574
  return terrno;
584✔
2575
}
2576

2577
                           
2578
static TdThreadOnce g_dbVgInfoMgrInit = PTHREAD_ONCE_INIT;
2579

2580
SDBVgInfoMgr g_dbVgInfoMgr = {0};
2581
                           
2582
void dbVgInfoMgrInitOnce() {
14,509✔
2583
  g_dbVgInfoMgr.dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
14,509✔
2584
  if (g_dbVgInfoMgr.dbVgInfoMap == NULL) {
14,509✔
2585
    stError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
2586
    return;
×
2587
  }
2588

2589
  taosHashSetFreeFp(g_dbVgInfoMgr.dbVgInfoMap, freeUseDbOutput_tmp);
14,509✔
2590
}
2591

2592

2593

2594
int32_t createStreamDataInserter(SDataSinkManager* pManager, DataSinkHandle* pHandle, void* pParam) {
207,176✔
2595
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
207,176✔
2596

2597
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
207,176✔
2598
  TSDB_CHECK_NULL(g_dbVgInfoMgr.dbVgInfoMap, code, lino, _exit, terrno);
207,416✔
2599

2600
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
207,416✔
2601
  TSDB_CHECK_NULL(inserter, code, lino, _exit, terrno);
207,012✔
2602

2603
  inserter->sink.fPut = putStreamDataBlock;
207,012✔
2604
  inserter->sink.fEndPut = endPut;
207,012✔
2605
  inserter->sink.fGetLen = getDataLength;
207,012✔
2606
  inserter->sink.fGetData = NULL;
207,416✔
2607
  inserter->sink.fDestroy = destroyDataSinker;
207,416✔
2608
  inserter->sink.fGetCacheSize = getCacheSize;
207,416✔
2609
  inserter->sink.fGetFlags = getSinkFlags;
207,416✔
2610
  inserter->pManager = pManager;
207,012✔
2611
  inserter->pNode = NULL;
207,416✔
2612
  inserter->pParam = pParam;
206,804✔
2613
  inserter->status = DS_BUF_EMPTY;
207,416✔
2614
  inserter->queryEnd = false;
207,208✔
2615
  inserter->explain = false;
206,804✔
2616

2617
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
207,208✔
2618
  TSDB_CHECK_NULL(inserter->pDataBlocks, code, lino, _exit, terrno);
207,012✔
2619
  
2620
  TAOS_CHECK_EXIT(taosThreadMutexInit(&inserter->mutex, NULL));
207,012✔
2621
  TAOS_CHECK_EXIT(tsem_init(&inserter->ready, 0, 0));
207,012✔
2622

2623
  inserter->dbVgInfoMap = NULL;
207,416✔
2624

2625
  *pHandle = inserter;
207,416✔
2626
  return TSDB_CODE_SUCCESS;
207,012✔
2627

2628
_exit:
×
2629

2630
  if (inserter) {
×
2631
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
×
2632
    taosMemoryFree(inserter);
×
2633
  } else {
2634
    taosMemoryFree(pManager);
×
2635
  }
2636

2637
  if (code) {
×
2638
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2639
  }
2640

2641
  return code;
×
2642
}
2643

2644
int32_t getDbVgInfoByTbName(void* clientRpc, const char* dbFName, SDBVgInfo** dbVgInfo) {
2,664,419✔
2645
  int32_t       code = TSDB_CODE_SUCCESS;
2,664,419✔
2646
  int32_t       line = 0;
2,664,419✔
2647
  SUseDbOutput* output = NULL;
2,664,419✔
2648

2649
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
2,664,419✔
2650

2651
  if (find == NULL) {
2,664,203✔
2652
    output = taosMemoryCalloc(1, sizeof(SUseDbOutput));
37,413✔
2653
    if (output == NULL) {
37,413✔
2654
      return TSDB_CODE_OUT_OF_MEMORY;
×
2655
    }
2656

2657
    code = buildDbVgInfoMap(clientRpc, dbFName, output);
37,413✔
2658
    QUERY_CHECK_CODE(code, line, _return);
37,413✔
2659

2660
    code = taosHashPut(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
37,413✔
2661
    if (code == TSDB_CODE_DUP_KEY) {
37,413✔
2662
      code = TSDB_CODE_SUCCESS;
5,361✔
2663
      // another thread has put the same dbFName, so we need to free the output
2664
      freeUseDbOutput_tmp(&output);
5,361✔
2665
      find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
5,361✔
2666
      if (find == NULL) {
5,361✔
2667
        QUERY_CHECK_CODE(code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, line, _return);
×
2668
      }
2669
      output = *find;
5,361✔
2670
    }
2671
    QUERY_CHECK_CODE(code, line, _return);
37,413✔
2672
  } else {
2673
    output = *find;
2,626,790✔
2674
  }
2675

2676
  *dbVgInfo = output->dbVgroup;
2,664,203✔
2677
  return code;
2,663,799✔
2678

2679
_return:
×
2680
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2681
  freeUseDbOutput_tmp(&output);
×
2682
  return code;
×
2683
}
2684

2685
int32_t getDbVgInfoForExec(void* clientRpc, const char* dbFName, const char* tbName, SVgroupInfo* pVgInfo) {
2,664,419✔
2686
  SDBVgInfo* dbInfo = NULL;
2,664,419✔
2687
  int32_t code = 0, lino = 0;
2,664,419✔
2688
  char tbFullName[TSDB_TABLE_FNAME_LEN];
2,662,337✔
2689
  snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbFName, tbName);
2,664,419✔
2690
  
2691
  taosRLockLatch(&g_dbVgInfoMgr.lock);
2,664,419✔
2692
  
2693
  TAOS_CHECK_EXIT(getDbVgInfoByTbName(clientRpc, dbFName, &dbInfo));
2,664,419✔
2694

2695
  TAOS_CHECK_EXIT(inserterGetVgInfo(dbInfo, tbFullName, pVgInfo));
2,663,799✔
2696

2697
_exit:
2,664,419✔
2698

2699
  taosRUnLockLatch(&g_dbVgInfoMgr.lock);
2,664,419✔
2700

2701
  if (code) {
2,664,419✔
2702
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2703
  }
2704

2705
  return code;
2,664,419✔
2706
}
2707

2708
void rmDbVgInfoFromCache(const char* dbFName) {
2,050✔
2709
  taosWLockLatch(&g_dbVgInfoMgr.lock);
2,050✔
2710

2711
  TAOS_UNUSED(taosHashRemove(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName)));
2,050✔
2712

2713
  taosWUnLockLatch(&g_dbVgInfoMgr.lock);
2,050✔
2714
}
2,050✔
2715

2716
static int32_t dropTableReqToMsg(int32_t vgId, SVDropTbBatchReq* pReq, void** pData, int32_t* pLen) {
246✔
2717
  int32_t code = TSDB_CODE_SUCCESS;
246✔
2718
  int32_t len = 0;
246✔
2719
  void*   pBuf = NULL;
246✔
2720
  tEncodeSize(tEncodeSVDropTbBatchReq, pReq, len, code);
246✔
2721
  if (TSDB_CODE_SUCCESS == code) {
246✔
2722
    SEncoder encoder;
246✔
2723
    len += sizeof(SMsgHead);
246✔
2724
    pBuf = taosMemoryMalloc(len);
246✔
2725
    if (NULL == pBuf) {
246✔
2726
      return terrno;
×
2727
    }
2728
    ((SDropTbDataMsg*)pBuf)->header.vgId = htonl(vgId);
246✔
2729
    ((SDropTbDataMsg*)pBuf)->header.contLen = htonl(len);
246✔
2730
    //((SDropTbDataMsg*)pBuf)->pData = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
2731
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
246✔
2732
    code = tEncodeSVDropTbBatchReq(&encoder, pReq);
246✔
2733
    tEncoderClear(&encoder);
246✔
2734
  }
2735

2736
  if (TSDB_CODE_SUCCESS == code) {
246✔
2737
    *pData = pBuf;
246✔
2738
    *pLen = len;
246✔
2739
  } else {
2740
    taosMemoryFree(pBuf);
×
2741
  }
2742

2743
  return code;
246✔
2744
}
2745

2746
int32_t dropTbCallback(void* param, SDataBuf* pMsg, int32_t code) {
246✔
2747
  SDropTbCtx* pCtx = (SDropTbCtx*)param;
246✔
2748
  if (code) {
246✔
2749
    stError("dropTbCallback, code:%d, stream:%" PRId64 " gid:%" PRId64, code, pCtx->req->streamId, pCtx->req->gid);
×
2750
  }
2751
  pCtx->code = code;
246✔
2752
  code = tsem_post(&pCtx->ready);
246✔
2753
  taosMemoryFree(pMsg->pData);
246✔
2754

2755
  return TSDB_CODE_SUCCESS;
246✔
2756
}
2757

2758
static int32_t sendDropTbRequest(SDropTbCtx* ctx, void* pMsg, int32_t msgLen, void* pTransporter, SEpSet* pEpset) {
246✔
2759
  // send the fetch remote task result reques
2760
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
246✔
2761
  if (NULL == pMsgSendInfo) {
246✔
2762
    return terrno;
×
2763
  }
2764

2765
  pMsgSendInfo->param = ctx;
246✔
2766
  pMsgSendInfo->paramFreeFp = NULL;
246✔
2767
  pMsgSendInfo->msgInfo.pData = pMsg;
246✔
2768
  pMsgSendInfo->msgInfo.len = msgLen;
246✔
2769
  pMsgSendInfo->msgType = TDMT_VND_SNODE_DROP_TABLE;
246✔
2770
  pMsgSendInfo->fp = dropTbCallback;
246✔
2771

2772
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
246✔
2773
}
2774

2775
int32_t doDropStreamTable(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq) {
246✔
2776
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
246✔
2777
  int32_t                  code = 0;
246✔
2778
  int32_t                  lino = 0;
246✔
2779
  SVDropTbBatchReq         req = {.nReqs = 1};
246✔
2780
  SVDropTbReq*             pDropReq = NULL;
246✔
2781
  int32_t                  msgLen = 0;
246✔
2782
  tsem_t*                  pSem = NULL;
246✔
2783
  SDropTbDataMsg*          pMsg = NULL;
246✔
2784

2785
  SInsertTableInfo** ppTbInfo = NULL;
246✔
2786
  int32_t            vgId = 0;
246✔
2787

2788
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
246✔
2789
  if (!req.pArray) return terrno;
246✔
2790

2791
  pDropReq = taosArrayGet(req.pArray, 0);
246✔
2792

2793
  code = getStreamInsertTableInfo(pReq->streamId, pReq->gid, &ppTbInfo);
246✔
2794
  if (TSDB_CODE_SUCCESS == code) {
246✔
2795
    pDropReq->name = taosStrdup((*ppTbInfo)->tbname);
246✔
2796
    pDropReq->suid = (*ppTbInfo)->uid;
246✔
2797
    pDropReq->uid = (*ppTbInfo)->uid;
246✔
2798
    pDropReq->igNotExists = true;
246✔
2799
    vgId = (*ppTbInfo)->vgid;
246✔
2800

2801
    int64_t key[2] = {pReq->streamId, pReq->gid};
246✔
2802
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
246✔
2803
  } else {
2804
    code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
2805
  }
2806
  QUERY_CHECK_CODE(code, lino, _end);
246✔
2807

2808
  code = dropTableReqToMsg(vgId, &req, (void**)&pMsg, &msgLen);
246✔
2809
  QUERY_CHECK_CODE(code, lino, _end);
246✔
2810

2811
  SVgroupInfo vgInfo = {0};
246✔
2812
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
246✔
2813
  QUERY_CHECK_CODE(code, lino, _end);
246✔
2814

2815
  SDropTbCtx ctx = {.req = pReq};
246✔
2816
  code = tsem_init(&ctx.ready, 0, 0);
246✔
2817
  QUERY_CHECK_CODE(code, lino, _end);
246✔
2818
  pSem = &ctx.ready;
246✔
2819

2820
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
246✔
2821
  QUERY_CHECK_CODE(code, lino, _end);
246✔
2822
  pMsg = NULL;  // now owned by sendDropTbRequest
246✔
2823

2824
  code = tsem_wait(&ctx.ready);
246✔
2825
  code = ctx.code;
246✔
2826
  stDebug("doDropStreamTable,  code:0x%" PRIx32 " req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
246✔
2827
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2828

2829
_end:
246✔
2830
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND) {
246✔
2831
    stError("doDropStreamTable, code:0x%" PRIx32 ", streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2832
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2833
    if (pMsg) {
×
2834
      taosMemoryFreeClear(pMsg);
×
2835
    }
2836
  }
2837
  if (pSem) tsem_destroy(pSem);
246✔
2838
  if (pDropReq && pDropReq->name) taosMemoryFreeClear(pDropReq->name);
246✔
2839
  if (ppTbInfo) releaseStreamInsertTableInfo(ppTbInfo);
246✔
2840
  taosArrayDestroy(req.pArray);
246✔
2841

2842
  return code;
246✔
2843
}
2844

2845
int32_t doDropStreamTableByTbName(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
2846
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
×
2847
  int32_t                  code = 0;
×
2848
  int32_t                  lino = 0;
×
2849
  SVDropTbBatchReq         req = {.nReqs = 1};
×
2850
  SVDropTbReq*             pDropReq = NULL;
×
2851
  int32_t                  msgLen = 0;
×
2852
  tsem_t*                  pSem = NULL;
×
2853
  SDropTbDataMsg*          pMsg = NULL;
×
2854

2855
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
×
2856

2857
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
×
2858
  if (!req.pArray) return terrno;
×
2859

2860
  pDropReq = taosArrayGet(req.pArray, 0);
×
2861

2862
  pDropReq->name = tbName;
×
2863
  pDropReq->igNotExists = true;
×
2864

2865
  int64_t key[2] = {pReq->streamId, pReq->gid};
×
2866
  TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
×
2867

2868
  SVgroupInfo vgInfo = {0};
×
2869
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
×
2870
  QUERY_CHECK_CODE(code, lino, _end);
×
2871

2872
  code = dropTableReqToMsg(vgInfo.vgId, &req, (void**)&pMsg, &msgLen);
×
2873
  QUERY_CHECK_CODE(code, lino, _end);
×
2874

2875
  SDropTbCtx ctx = {.req = pReq};
×
2876
  code = tsem_init(&ctx.ready, 0, 0);
×
2877
  QUERY_CHECK_CODE(code, lino, _end);
×
2878
  pSem = &ctx.ready;
×
2879

2880
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
×
2881
  QUERY_CHECK_CODE(code, lino, _end);
×
2882
  pMsg = NULL;  // now owned by sendDropTbRequest
×
2883

2884
  code = tsem_wait(&ctx.ready);
×
2885
  code = ctx.code;
×
2886
  stDebug("doDropStreamTableByTbName,  code:%d req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
×
2887
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2888

2889
_end:
×
2890
  if (code != TSDB_CODE_SUCCESS) {
×
2891
    stError("doDropStreamTableByTbName, code:%d, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2892
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2893
    if (pMsg) {
×
2894
      taosMemoryFreeClear(pMsg);
×
2895
    }
2896
  }
2897
  if (pSem) tsem_destroy(pSem);
×
2898
  taosArrayDestroy(req.pArray);
×
2899

2900
  return code;
×
2901
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc