• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.81
/source/dnode/vnode/src/tq/tqSink.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <common/tmsg.h>
17
#include "tcommon.h"
18
#include "tmsg.h"
19
#include "tq.h"
20

21
#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
22

23
typedef struct STableSinkInfo {
24
  uint64_t uid;
25
  tstr     name;
26
} STableSinkInfo;
27

28
static bool    hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
29
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
30
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
31
                                  SSubmitTbData* pTableData);
32
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
33
                                       int64_t suid);
34
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
35
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
36
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
37
                             int64_t earlyTs, const char* id);
38
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
39
                                        const char* dstTableName, int64_t* uid);
40

41
static bool    isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
42
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
43
                                  int32_t numOfTags);
44
static int32_t createDefaultTagColName(SArray** pColNameList);
45
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
46
                                          const char* stbFullName, int64_t gid, bool newSubTableRule);
47
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
48
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
49
                                           const char* id);
50
static bool    doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
51
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
52
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
53
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
54
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
55
                                    int64_t earlyTs);
56
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);
57

58
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
1,062✔
59
                         const char* pIdStr, bool newSubTableRule) {
60
  int32_t          totalRows = pDataBlock->info.rows;
1,062✔
61
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
1,062✔
62
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
1,062✔
63
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1,062✔
64
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1,062✔
65

66
  if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
1,062!
67
    return terrno;
×
68
  }
69

70
  tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
1,062!
71

72
  for (int32_t row = 0; row < totalRows; row++) {
2,813✔
73
    int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
1,751!
74
    int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
1,751!
75
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
1,751!
76

77
    char* name = NULL;
1,751✔
78
    char* originName = NULL;
1,751✔
79
    void* varTbName = NULL;
1,751✔
80
    if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
3,502!
81
      varTbName = colDataGetVarData(pTbNameCol, row);
672✔
82
    }
83

84
    if (varTbName != NULL && varTbName != (void*)-1) {
2,423!
85
      size_t cap = TMAX(TSDB_TABLE_NAME_LEN, varDataLen(varTbName) + 1);
672✔
86
      name = taosMemoryMalloc(cap);
672✔
87
      if (name == NULL) {
672!
88
        return terrno;
×
89
      }
90

91
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
672✔
92
      name[varDataLen(varTbName)] = '\0';
672✔
93
      if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
672!
94
          stbFullName) {
95
        int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
656✔
96
        if (code != TSDB_CODE_SUCCESS) {
656!
97
          return code;
×
98
        }
99
      }
100
    } else if (stbFullName) {
1,079✔
101
      int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name);
463✔
102
      if (code) {
463!
103
        return code;
×
104
      }
105
    } else {
106
      originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
616✔
107
      if (originName == NULL) {
616!
108
        return terrno;
×
109
      }
110

111
      if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) {
616!
112
        name = varDataVal(originName);
616✔
113
      }
114
    }
115

116
    if (!name || *name == '\0') {
1,751!
117
      tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
×
118
             " since invalid tbname:%s",
119
             pIdStr, groupId, skey, ekey, name ? name : "NULL");
120
    } else {
121
      tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr,
1,751!
122
              groupId, name, skey, ekey);
123

124
      SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
1,751✔
125
      tstrncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
1,751✔
126
      void* p = taosArrayPush(deleteReq->deleteReqs, &req);
1,751✔
127
      if (p == NULL) {
1,751!
128
        return terrno;
×
129
      }
130
    }
131

132
    if (originName) {
1,751✔
133
      name = originName;
616✔
134
    }
135

136
    taosMemoryFreeClear(name);
1,751!
137
  }
138

139
  return 0;
1,062✔
140
}
141

142
static int32_t encodeCreateChildTableForRPC(void* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
3,282✔
143
  int32_t ret = 0;
3,282✔
144

145
  tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
3,282!
146
  if (ret < 0) {
3,282!
147
    ret = -1;
×
148
    goto end;
×
149
  }
150
  *contLen += sizeof(SMsgHead);
3,282✔
151
  *pBuf = rpcMallocCont(*contLen);
3,282✔
152
  if (NULL == *pBuf) {
3,282!
153
    ret = -1;
×
154
    goto end;
×
155
  }
156
  ((SMsgHead*)(*pBuf))->vgId = vgId;
3,282✔
157
  ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
3,282✔
158
  SEncoder coder = {0};
3,282✔
159
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
3,282✔
160
  if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
3,282!
161
    rpcFreeCont(*pBuf);
×
162
    *pBuf = NULL;
×
163
    *contLen = 0;
×
164
    tEncoderClear(&coder);
×
165
    ret = -1;
×
166
    goto end;
×
167
  }
168
  tEncoderClear(&coder);
3,282✔
169

170
end:
3,282✔
171
  return ret;
3,282✔
172
}
173

174
static int32_t encodeDropChildTableForRPC(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen) {
44✔
175
  int32_t  code = 0;
44✔
176
  SEncoder ec = {0};
44✔
177
  tEncodeSize(tEncodeSVDropTbBatchReq, pReqs, *contLen, code);
44!
178
  if (code < 0) {
44!
179
    code = TSDB_CODE_INVALID_MSG;
×
180
    goto end;
×
181
  }
182
  *contLen += sizeof(SMsgHead);
44✔
183
  *ppBuf = rpcMallocCont(*contLen);
44✔
184

185
  if (!*ppBuf) {
44!
186
    code = terrno;
×
187
    goto end;
×
188
  }
189

190
  ((SMsgHead*)(*ppBuf))->vgId = vgId;
44✔
191
  ((SMsgHead*)(*ppBuf))->contLen = htonl(*contLen);
44✔
192

193
  tEncoderInit(&ec, POINTER_SHIFT(*ppBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
44✔
194
  code = tEncodeSVDropTbBatchReq(&ec, pReqs);
44✔
195
  tEncoderClear(&ec);
44✔
196
  if (code < 0) {
44!
197
    rpcFreeCont(*ppBuf);
×
198
    *ppBuf = NULL;
×
199
    *contLen = 0;
×
200
    code = TSDB_CODE_INVALID_MSG;
×
201
    goto end;
×
202
  }
203
end:
44✔
204
  return code;
44✔
205
}
206

207
static int32_t tqPutReqToQueue(SVnode* pVnode, void* pReqs, int32_t(*encoder)(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen), tmsg_t msgType) {
3,326✔
208
  void*   buf = NULL;
3,326✔
209
  int32_t tlen = 0;
3,326✔
210

211
  int32_t code = encoder(pReqs, TD_VID(pVnode), &buf, &tlen);
3,326✔
212
  if (code) {
3,326!
213
    tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", TD_VID(pVnode), tstrerror(code));
×
214
    return code;
×
215
  }
216

217
  SRpcMsg msg = {.msgType = msgType, .pCont = buf, .contLen = tlen};
3,326✔
218
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
3,326✔
219
  if (code) {
3,326!
220
    tqError("failed to put into write-queue since %s", terrstr());
×
221
  }
222

223
  return code;
3,326✔
224
}
225

226
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
3,839✔
227
  pCreateTableReq->flags = 0;
3,839✔
228
  pCreateTableReq->type = TSDB_CHILD_TABLE;
3,839✔
229
  pCreateTableReq->ctb.suid = suid;
3,839✔
230

231
  // set super table name
232
  SName name = {0};
3,839✔
233

234
  int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
3,839✔
235
  if (code == 0) {
3,839!
236
    pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
3,839✔
237
    if (pCreateTableReq->ctb.stbName == NULL) {  // ignore this error code
3,839!
238
      tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
×
239
      code = terrno;
×
240
    }
241
  }
242

243
  pCreateTableReq->ctb.tagNum = numOfTags;
3,839✔
244
  return code;
3,839✔
245
}
246

247
int32_t createDefaultTagColName(SArray** pColNameList) {
1,169✔
248
  *pColNameList = NULL;
1,169✔
249

250
  SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
1,169✔
251
  if (pTagColNameList == NULL) {
1,169!
252
    return terrno;
×
253
  }
254

255
  char  tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
1,169✔
256
  void* p = taosArrayPush(pTagColNameList, tagNameStr);
1,169✔
257
  if (p == NULL) {
1,169!
258
    taosArrayDestroy(pTagColNameList);
×
259
    return terrno;
×
260
  }
261

262
  *pColNameList = pTagColNameList;
1,169✔
263
  return TSDB_CODE_SUCCESS;
1,169✔
264
}
265

266
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
3,839✔
267
                                   int64_t gid, bool newSubTableRule) {
268
  if (pDataBlock->info.parTbName[0]) {
3,839✔
269
    if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
3,814✔
270
        !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
742!
271
      pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
22✔
272
      if (pCreateTableReq->name == NULL) {
22!
273
        return terrno;
×
274
      }
275

276
      tstrncpy(pCreateTableReq->name, pDataBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
22✔
277
      int32_t code = buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid, TSDB_TABLE_NAME_LEN);
22✔
278
      if (code != TSDB_CODE_SUCCESS) {
22!
279
        return code;
×
280
      }
281
      //      tqDebug("gen name from:%s", pDataBlock->info.parTbName);
282
    } else {
283
      pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
3,792✔
284
      if (pCreateTableReq->name == NULL) {
3,792!
285
        return terrno;
×
286
      }
287
      //      tqDebug("copy name:%s", pDataBlock->info.parTbName);
288
    }
289
  } else {
290
    int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
25✔
291
    return code;
25✔
292
  }
293

294
  return 0;
3,814✔
295
}
296

297
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
3,282✔
298
                                            SStreamTask* pTask, int64_t suid) {
299
  STSchema*          pTSchema = pTask->outputInfo.tbSink.pTSchema;
3,282✔
300
  int32_t            rows = pDataBlock->info.rows;
3,282✔
301
  SArray*            tagArray = NULL;
3,282✔
302
  const char*        id = pTask->id.idStr;
3,282✔
303
  int32_t            vgId = pTask->pMeta->vgId;
3,282✔
304
  int32_t            code = 0;
3,282✔
305
  STableSinkInfo*    pInfo = NULL;
3,282✔
306
  SVCreateTbBatchReq reqs = {0};
3,282✔
307
  SArray*            crTblArray = NULL;
3,282✔
308

309
  tqDebug("s-task:%s build create %d table(s) msg", id, rows);
3,282!
310

311
  tagArray = taosArrayInit(4, sizeof(STagVal));
3,282✔
312
  crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
3,282✔
313
  if ((NULL == reqs.pArray) || (tagArray == NULL)) {
3,282!
314
    tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
×
315
    code = terrno;
×
316
    goto _end;
×
317
  }
318

319
  for (int32_t rowId = 0; rowId < rows; rowId++) {
6,564✔
320
    SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
3,282✔
321

322
    int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
3,282✔
323
    int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
3,282✔
324

325
    code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
3,282✔
326
    if (code) {
3,282!
327
      tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
×
328
      continue;
×
329
    }
330

331
    taosArrayClear(tagArray);
3,282✔
332

333
    if (size == 2) {
3,282✔
334
      STagVal tagVal = {
612✔
335
          .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
612✔
336

337
      void* p = taosArrayPush(tagArray, &tagVal);
612✔
338
      if (p == NULL) {
612!
339
        return terrno;
×
340
      }
341

342
      code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
612✔
343
      if (code) {
612!
344
        return code;
×
345
      }
346
    } else {
347
      for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
25,242✔
348
        SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
22,572✔
349
        if (pTagData == NULL) {
22,572!
350
          continue;
7,548✔
351
        }
352

353
        STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
22,572✔
354
        void*   pData = colDataGetData(pTagData, rowId);
22,572!
355
        if (colDataIsNull_s(pTagData, rowId)) {
45,144!
356
          continue;
7,548✔
357
        } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
15,024!
358
          tagVal.nData = varDataLen(pData);
5,970✔
359
          tagVal.pData = (uint8_t*)varDataVal(pData);
5,970✔
360
        } else {
361
          memcpy(&tagVal.i64, pData, pTagData->info.bytes);
9,054✔
362
        }
363
        void* p = taosArrayPush(tagArray, &tagVal);
15,024✔
364
        if (p == NULL) {
15,024!
365
          code = terrno;
×
366
          goto _end;
×
367
        }
368
      }
369
    }
370

371
    code = tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
3,282✔
372
    taosArrayDestroy(tagArray);
3,282✔
373
    tagArray = NULL;
3,282✔
374

375
    if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
3,282!
376
      tdDestroySVCreateTbReq(pCreateTbReq);
377
      code = TSDB_CODE_OUT_OF_MEMORY;
×
378
      goto _end;
×
379
    }
380

381
    uint64_t gid = pDataBlock->info.id.groupId;
3,282✔
382
    if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
3,282!
383
      SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
3,282✔
384
      if (pGpIdColInfo == NULL) {
3,282!
385
        continue;
×
386
      }
387

388
      // todo remove this
389
      void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
3,282!
390
      if (gid != *(int64_t*)pGpIdData) {
3,282!
391
        tqError("s-task:%s vgId:%d invalid groupId:%" PRId64 " actual:%" PRId64 " in sink task", id, vgId, gid,
×
392
                *(int64_t*)pGpIdData);
393
      }
394
    }
395

396
    code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask));
3,282!
397
    if (code) {
3,282!
398
      goto _end;
×
399
    }
400

401
    void* p = taosArrayPush(reqs.pArray, pCreateTbReq);
3,282✔
402
    if (p == NULL) {
3,282!
403
      code = terrno;
×
404
      goto _end;
×
405
    }
406

407
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo);
3,282✔
408
    if (!alreadyCached) {
3,282✔
409
      code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo);
2,767✔
410
      if (code) {
2,767!
411
        tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
×
412
        continue;
×
413
      }
414

415
      code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id);
2,767✔
416
      if (code) {
2,767!
417
        tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
×
418
      }
419
    }
420

421
    tqDebug("s-task:%s build create table:%s msg complete", id, pCreateTbReq->name);
3,282!
422
  }
423

424
  reqs.nReqs = taosArrayGetSize(reqs.pArray);
3,282✔
425
  code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
3,282✔
426
  if (code != TSDB_CODE_SUCCESS) {
3,282!
427
    tqError("s-task:%s failed to send create table msg", id);
×
428
  }
429

430
_end:
3,282✔
431
  taosArrayDestroy(tagArray);
3,282✔
432
  taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
3,282✔
433
  return code;
3,282✔
434
}
435

436
static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SSDataBlock* pDataBlock,
44✔
437
                                          SStreamTask* pTask, int64_t suid) {
438
  int32_t          lino = 0;
44✔
439
  int32_t          code = 0;
44✔
440
  int32_t          rows = pDataBlock->info.rows;
44✔
441
  const char*      id = pTask->id.idStr;
44✔
442
  SVDropTbBatchReq batchReq = {0};
44✔
443
  SVDropTbReq      req = {0};
44✔
444

445
  if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
44!
446

447
  batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
44✔
448
  if (!batchReq.pArray) return terrno;
44!
449
  batchReq.nReqs = rows;
44✔
450
  req.suid = suid;
44✔
451
  req.igNotExists = true;
44✔
452

453
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
44✔
454
  char             tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
44✔
455
  int32_t          i = 0;
44✔
456
  void*            pData = colDataGetVarData(pTbNameCol, i);
44✔
457
  memcpy(tbName, varDataVal(pData), varDataLen(pData));
44✔
458
  tbName[varDataLen(pData) + 1] = 0;
44✔
459
  req.name = tbName;
44✔
460
  if (taosArrayPush(batchReq.pArray, &req) == NULL) {
88!
461
    TSDB_CHECK_CODE(terrno, lino, _exit);
×
462
  }
463

464
  SMetaReader mr = {0};
44✔
465
  metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
44✔
466

467
  code = metaGetTableEntryByName(&mr, tbName);
44✔
468
  if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
44!
469
    STableSinkInfo* pTableSinkInfo = NULL;
44✔
470
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
44✔
471
    if (alreadyCached) {
44✔
472
      pTableSinkInfo->uid = mr.me.uid;
30✔
473
    }
474
  }
475
  metaReaderClear(&mr);
44✔
476
  tqDebug("s-task:%s build drop %d table(s) msg", id, rows);
44!
477
  code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
44✔
478
  TSDB_CHECK_CODE(code, lino, _exit);
44!
479

480

481
  code = doWaitForDstTableDropped(pVnode, pTask, tbName);
44✔
482
  TSDB_CHECK_CODE(code, lino, _exit);
44!
483

484
_exit:
44✔
485
  if (batchReq.pArray) {
44!
486
    taosArrayDestroy(batchReq.pArray);
44✔
487
  }
488
  return code;
44✔
489
}
490

491
int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
8,735✔
492
  const char* id = pTask->id.idStr;
8,735✔
493
  int32_t     vgId = TD_VID(pVnode);
8,735✔
494
  int32_t     len = 0;
8,735✔
495
  void*       pBuf = NULL;
8,735✔
496
  int32_t     numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData);
8,735✔
497

498
  int32_t code = buildSubmitMsgImpl(pReq, vgId, &pBuf, &len);
8,735✔
499
  if (code != TSDB_CODE_SUCCESS) {
8,735!
500
    tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
×
501
    return code;
×
502
  }
503

504
  SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
8,735✔
505
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
8,735✔
506
  if (code == TSDB_CODE_SUCCESS) {
8,735!
507
    tqDebug("s-task:%s vgId:%d comp %d blocks into %d and send to dstTable(s) completed", id, vgId, numOfBlocks,
8,735✔
508
            numOfFinalBlocks);
509
  } else {
UNCOV
510
    tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
×
511
  }
512

513
  SSinkRecorder* pRec = &pTask->execInfo.sink;
8,735✔
514

515
  pRec->numOfSubmit += 1;
8,735✔
516
  if ((pRec->numOfSubmit % 1000) == 0) {
8,735!
517
    double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0;
×
518
    tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
×
519
           " submit into dst table, %.2fMiB duration:%.2f Sec.",
520
           id, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), el);
521
  }
522

523
  return TSDB_CODE_SUCCESS;
8,735✔
524
}
525

526
// merge the new submit table block with the existed blocks
527
// if ts in the new data block overlap with existed one, replace it
528
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
853✔
529
  int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
853✔
530
  int32_t newLen = taosArrayGetSize(pNew->aRowP);
853✔
531
  int32_t numOfPk = 0;
853✔
532

533
  int32_t j = 0, k = 0;
853✔
534
  SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
853✔
535
  if (pFinal == NULL) {
853!
536
    tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno));
×
537
    return terrno;
×
538
  }
539

540
  while (j < newLen && k < oldLen) {
1,174,546✔
541
    SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
1,173,693✔
542
    SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
1,173,693✔
543

544
    if (pNewRow->ts < pOldRow->ts) {
1,173,693✔
545
      void* p = taosArrayPush(pFinal, &pNewRow);
47✔
546
      if (p == NULL) {
47!
547
        return terrno;
×
548
      }
549
      j += 1;
47✔
550
    } else if (pNewRow->ts > pOldRow->ts) {
1,173,646✔
551
      void* p = taosArrayPush(pFinal, &pOldRow);
1,172,735✔
552
      if (p == NULL) {
1,172,735!
553
        return terrno;
×
554
      }
555

556
      k += 1;
1,172,735✔
557
    } else {
558
      // check for the existance of primary key
559
      if (pNewRow->numOfPKs == 0) {
911!
560
        void* p = taosArrayPush(pFinal, &pNewRow);
911✔
561
        if (p == NULL) {
911!
562
          return terrno;
×
563
        }
564

565
        k += 1;
911✔
566
        j += 1;
911✔
567
        tRowDestroy(pOldRow);
911✔
568
      } else {
569
        numOfPk = pNewRow->numOfPKs;
×
570

571
        SRowKey kNew, kOld;
572
        tRowGetKey(pNewRow, &kNew);
×
573
        tRowGetKey(pOldRow, &kOld);
×
574

575
        int32_t ret = tRowKeyCompare(&kNew, &kOld);
×
576
        if (ret <= 0) {
×
577
          void* p = taosArrayPush(pFinal, &pNewRow);
×
578
          if (p == NULL) {
×
579
            return terrno;
×
580
          }
581

582
          j += 1;
×
583

584
          if (ret == 0) {
×
585
            k += 1;
×
586
            tRowDestroy(pOldRow);
×
587
          }
588
        } else {
589
          void* p = taosArrayPush(pFinal, &pOldRow);
×
590
          if (p == NULL) {
×
591
            return terrno;
×
592
          }
593

594
          k += 1;
×
595
        }
596
      }
597
    }
598
  }
599

600
  while (j < newLen) {
160,703✔
601
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
159,850✔
602
    void* p = taosArrayPush(pFinal, &pRow);
159,850✔
603
    if (p == NULL) {
159,850!
604
      return terrno;
×
605
    }
606
  }
607

608
  while (k < oldLen) {
1,890✔
609
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
1,037✔
610
    void* p = taosArrayPush(pFinal, &pRow);
1,037✔
611
    if (p == NULL) {
1,037!
612
      return terrno;
×
613
    }
614
  }
615

616
  taosArrayDestroy(pNew->aRowP);
853✔
617
  taosArrayDestroy(pExisted->aRowP);
853✔
618
  pExisted->aRowP = pFinal;
853✔
619

620
  tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
853!
621
          (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
622
          (pNew->pCreateTbReq != NULL));
623

624
  tdDestroySVCreateTbReq(pNew->pCreateTbReq);
853!
625
  taosMemoryFree(pNew->pCreateTbReq);
853✔
626
  return TSDB_CODE_SUCCESS;
853✔
627
}
628

629
bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) {
3,227✔
630
  if (pReader->me.type != TSDB_CHILD_TABLE) {
3,227!
631
    tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
×
632
    terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE;
×
633
    return false;
×
634
  }
635

636
  if (pReader->me.ctbEntry.suid != suid) {
3,227!
637
    tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
×
638
            ctbName, suid, pReader->me.ctbEntry.suid);
639
    terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
640
    return false;
×
641
  }
642

643
  terrno = 0;
3,227✔
644
  return true;
3,227✔
645
}
646

647
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
557✔
648
                                SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq) {
649
  *pReq = NULL;
557✔
650

651
  SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
557✔
652
  if (pCreateTbReq == NULL) {
557!
653
    return terrno;
×
654
  }
655

656
  taosArrayClear(pTagArray);
557✔
657
  int32_t code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
557✔
658
  if (code != 0) {
557!
659
    return code;
×
660
  }
661

662
  STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
557✔
663
  void* p = taosArrayPush(pTagArray, &tagVal);
557✔
664
  if (p == NULL) {
557!
665
    return terrno;
×
666
  }
667

668
  code = tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
557✔
669
  if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
557!
670
    tdDestroySVCreateTbReq(pCreateTbReq);
671
    taosMemoryFreeClear(pCreateTbReq);
×
672
    return code;
×
673
  }
674

675
  code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
557✔
676
  if (code) {
557!
677
    return code;
×
678
  }
679

680
  // set table name
681
  code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule);
557✔
682
  if (code) {
557!
683
    return code;
×
684
  }
685

686
  *pReq = pCreateTbReq;
557✔
687
  return code;
557✔
688
}
689

690
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
8,735✔
691
  int32_t code = 0;
8,735✔
692
  void*   pBuf = NULL;
8,735✔
693
  *msgLen = 0;
8,735✔
694

695
  // encode
696
  int32_t len = 0;
8,735✔
697
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
8,735!
698

699
  SEncoder encoder = {0};
8,734✔
700
  len += sizeof(SSubmitReq2Msg);
8,734✔
701

702
  pBuf = rpcMallocCont(len);
8,734✔
703
  if (NULL == pBuf) {
8,735!
704
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
705
    return terrno;
×
706
  }
707

708
  ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
8,735✔
709
  ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
8,735✔
710
  ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
8,735✔
711

712
  tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
8,735✔
713
  if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) {
8,734!
714
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
715
    tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
×
716
    tEncoderClear(&encoder);
×
717
    rpcFreeCont(pBuf);
×
718
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
719
    return code;
×
720
  }
721

722
  tEncoderClear(&encoder);
8,735✔
723
  tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
8,735✔
724

725
  *msgLen = len;
8,735✔
726
  *pMsg = pBuf;
8,735✔
727
  return TSDB_CODE_SUCCESS;
8,735✔
728
}
729

730
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
35,110,712✔
731
  SRow* pRow1 = *(SRow**)p1;
35,110,712✔
732
  SRow* pRow2 = *(SRow**)p2;
35,110,712✔
733

734
  if (pRow1->ts == pRow2->ts) {
35,110,712!
735
    return 0;
×
736
  } else {
737
    return pRow1->ts > pRow2->ts ? 1 : -1;
35,110,712!
738
  }
739
}
740

741
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs,
9,974✔
742
                      const char* id) {
743
  int32_t numOfRows = pDataBlock->info.rows;
9,974✔
744
  int32_t code = TSDB_CODE_SUCCESS;
9,974✔
745

746
  SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
9,974✔
747
  pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
9,974✔
748

749
  if (pTableData->aRowP == NULL || pVals == NULL) {
9,974!
750
    taosArrayDestroy(pTableData->aRowP);
×
751
    pTableData->aRowP = NULL;
×
752
    taosArrayDestroy(pVals);
×
753
    code = terrno;
×
754
    tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
×
755
    return code;
×
756
  }
757

758
  for (int32_t j = 0; j < numOfRows; j++) {
6,076,324✔
759
    taosArrayClear(pVals);
6,066,373✔
760

761
    int32_t dataIndex = 0;
6,066,368✔
762
    int64_t ts = 0;
6,066,368✔
763

764
    for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
58,990,714✔
765
      const STColumn* pCol = &pTSchema->columns[k];
52,920,973✔
766

767
      // primary timestamp column, for debug purpose
768
      if (k == 0) {
52,920,973✔
769
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
6,066,281✔
770
        if (pColData == NULL) {
6,066,205!
771
          continue;
×
772
        }
773

774
        ts = *(int64_t*)colDataGetData(pColData, j);
6,066,205!
775
        tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
6,066,205✔
776

777
        if (ts < earlyTs) {
6,064,186!
778
          tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
×
779
                  ts, earlyTs);
780
          taosArrayDestroy(pTableData->aRowP);
×
781
          pTableData->aRowP = NULL;
×
782
          taosArrayDestroy(pVals);
×
783
          return TSDB_CODE_SUCCESS;
×
784
        }
785
      }
786

787
      if (IS_SET_NULL(pCol)) {
52,918,878✔
788
        if (pCol->flags & COL_IS_KEY) {
671!
789
          qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
×
790
                 pCol->colId, pCol->type);
791
          break;
×
792
        }
793
        SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
671✔
794
        void* p = taosArrayPush(pVals, &cv);
671✔
795
        if (p == NULL) {
671!
796
          return terrno;
×
797
        }
798
      } else {
799
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
52,918,207✔
800
        if (pColData == NULL) {
52,918,302!
801
          continue;
×
802
        }
803

804
        if (colDataIsNull_s(pColData, j)) {
105,836,604✔
805
          if (pCol->flags & COL_IS_KEY) {
5,824,032!
806
            qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
×
807
                   ts, pCol->colId, pCol->type);
808
            break;
×
809
          }
810

811
          SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
5,824,032✔
812
          void* p = taosArrayPush(pVals, &cv);
5,824,027✔
813
          if (p == NULL) {
5,824,027!
814
            return terrno;
×
815
          }
816

817
          dataIndex++;
5,824,027✔
818
        } else {
819
          void* colData = colDataGetData(pColData, j);
47,094,270!
820
          if (IS_VAR_DATA_TYPE(pCol->type)) {  // address copy, no value
52,910,454!
821
            SValue sv =
5,816,555✔
822
                (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
5,816,555✔
823
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
5,816,555✔
824
            void* p = taosArrayPush(pVals, &cv);
5,816,184✔
825
            if (p == NULL) {
5,816,184!
826
              return terrno;
×
827
            }
828
          } else {
829
            SValue sv = {.type = pCol->type};
41,277,715✔
830
            memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
41,277,715✔
831
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
41,277,715✔
832
            void* p = taosArrayPush(pVals, &cv);
41,283,464✔
833
            if (p == NULL) {
41,283,464!
834
              return terrno;
×
835
            }
836
          }
837
          dataIndex++;
47,099,648✔
838
        }
839
      }
840
    }
841

842
    SRow* pRow = NULL;
6,069,741✔
843
    code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
6,069,741✔
844
    if (code != TSDB_CODE_SUCCESS) {
6,067,390!
845
      tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
×
846
      taosArrayDestroy(pVals);
×
847
      tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
×
848
      return code;
×
849
    }
850

851
    void* p = taosArrayPush(pTableData->aRowP, &pRow);
6,067,390✔
852
    if (p == NULL) {
6,066,350!
853
      return terrno;
×
854
    }
855
  }
856

857
  taosArrayDestroy(pVals);
9,951✔
858
  return TSDB_CODE_SUCCESS;
9,973✔
859
}
860

861
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
3,108✔
862
                                 const char* dstTableName, int64_t* uid) {
863
  int32_t     vgId = TD_VID(pVnode);
3,108✔
864
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
3,108✔
865
  const char* id = pTask->id.idStr;
3,108✔
866

867
  while (pTableSinkInfo->uid == 0) {
4,983!
868
    if (streamTaskShouldStop(pTask)) {
4,983!
869
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
×
870
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
3,108✔
871
    }
872

873
    // wait for the table to be created
874
    SMetaReader mr = {0};
4,983✔
875
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
4,983✔
876

877
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
4,983✔
878
    if (code == 0) {  // table already exists, check its type and uid
4,983✔
879
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
3,108✔
880
      if (isValid) {  // not valid table, ignore it
3,108!
881
        tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
3,108✔
882
        // set the destination table uid
883
        (*uid) = mr.me.uid;
3,108✔
884
        pTableSinkInfo->uid = mr.me.uid;
3,108✔
885
      }
886

887
      metaReaderClear(&mr);
3,108✔
888
      return terrno;
3,108✔
889
    } else {  // not exist, wait and retry
890
      metaReaderClear(&mr);
1,875✔
891
      taosMsleep(100);
1,875✔
892
      tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
1,875!
893
    }
894
  }
895

896
  return TSDB_CODE_SUCCESS;
×
897
}
898

899
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
44✔
900
  int32_t vgId = TD_VID(pVnode);
44✔
901
  int64_t suid = pTask->outputInfo.tbSink.stbUid;
44✔
902
  const char* id = pTask->id.idStr;
44✔
903

904
  while (1) {
44✔
905
    if (streamTaskShouldStop(pTask)) {
88!
906
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s drop", id, dstTableName);
×
907
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
×
908
    }
909
    SMetaReader mr = {0};
88✔
910
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
88✔
911
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
88✔
912
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
88✔
913
      metaReaderClear(&mr);
44✔
914
      break;
44✔
915
    } else if (TSDB_CODE_SUCCESS == code) {
44!
916
      if (isValidDstChildTable(&mr, vgId, dstTableName, suid)) {
44!
917
        metaReaderClear(&mr);
44✔
918
        taosMsleep(100);
44✔
919
        tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
44!
920
      } else {
921
        metaReaderClear(&mr);
×
922
        break;
×
923
      }
924
    } else {
925
      tqError("s-task:%s failed to wait for table:%s drop", id, dstTableName);
×
926
      metaReaderClear(&mr);
×
927
      return terrno;
×
928
    }
929
  }
930
  return TSDB_CODE_SUCCESS;
44✔
931
}
932

933
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
3,349✔
934
  int32_t nameLen = strlen(pDstTableName);
3,349✔
935
  (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
3,349✔
936
  if (*pInfo == NULL) {
3,349!
937
    return terrno;
×
938
  }
939

940
  (*pInfo)->name.len = nameLen;
3,349✔
941
  memcpy((*pInfo)->name.data, pDstTableName, nameLen);
3,349✔
942
  return TSDB_CODE_SUCCESS;
3,349✔
943
}
944

945
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
9,115✔
946
                           SSubmitTbData* pTableData) {
947
  uint64_t        groupId = pDataBlock->info.id.groupId;
9,115✔
948
  char*           dstTableName = pDataBlock->info.parTbName;
9,115✔
949
  int32_t         numOfRows = pDataBlock->info.rows;
9,115✔
950
  const char*     id = pTask->id.idStr;
9,115✔
951
  int64_t         suid = pTask->outputInfo.tbSink.stbUid;
9,115✔
952
  STSchema*       pTSchema = pTask->outputInfo.tbSink.pTSchema;
9,115✔
953
  int32_t         vgId = TD_VID(pVnode);
9,115✔
954
  STableSinkInfo* pTableSinkInfo = NULL;
9,115✔
955
  int32_t         code = 0;
9,115✔
956

957
  bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo);
9,115✔
958

959
  if (alreadyCached) {
9,115✔
960
    if (dstTableName[0] == 0) {  // data block does not set the destination table name
8,533✔
961
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
770✔
962
      tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
770!
963
              dstTableName);
964
    } else {
965
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
7,763✔
966
      if (pTableSinkInfo->uid != 0) {
7,763✔
967
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
4,850✔
968
                dstTableName, pTableSinkInfo->uid);
969
      } else {
970
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
2,913✔
971
                id, numOfRows, groupId, dstTableName);
972
      }
973
    }
974
  } else {  // this groupId has not been kept in cache yet
975
    if (dstTableName[0] == 0) {
582✔
976
      memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
231✔
977
      code = buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
231✔
978
      if (code) {
231!
979
        tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
×
980
        return code;
×
981
      }
982
    } else {
983
      if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
351!
984
          !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
×
985
        tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
×
986
        if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
987
          code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
988
        } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
×
989
          code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
990
        }
991
        if (code != TSDB_CODE_SUCCESS) {
×
992
          return code;
×
993
        }
994
      }
995
    }
996

997
    code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo);
582✔
998
    if (code == 0) {
582!
999
      tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
582✔
1000
    } else {
1001
      tqDebug("s-task:%s failed to build new sinkTableInfo, dstTable:%s", id, dstTableName);
×
1002
      return code;
×
1003
    }
1004
  }
1005

1006
  if (alreadyCached) {
9,115✔
1007
    pTableData->uid = pTableSinkInfo->uid;
8,533✔
1008

1009
    if (pTableData->uid == 0) {
8,533✔
1010
      tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
3,108✔
1011
      return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
3,108✔
1012
    } else {
1013
      tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
5,425✔
1014
    }
1015
  } else {
1016
    // The auto-create option will always set to be open for those submit messages, which arrives during the period
1017
    // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
1018
    // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
1019
    // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
1020
    // randomly generated, but false table uid in the WAL.
1021
    SMetaReader mr = {0};
582✔
1022
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
582✔
1023

1024
    // table not in cache, let's try to extract it from tsdb meta
1025
    if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
582✔
1026
      metaReaderClear(&mr);
551✔
1027

1028
      if (pTask->outputInfo.tbSink.autoCreateCtb) {
551!
1029
        tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
551✔
1030

1031
        SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
551✔
1032
        if (pTagArray == NULL) {
551!
1033
          tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId,
×
1034
                  tstrerror(terrno));
1035
          return terrno;
×
1036
        }
1037

1038
        pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
551✔
1039
        code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
1,102✔
1040
                                       IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq);
551!
1041
        taosArrayDestroy(pTagArray);
551✔
1042

1043
        if (code) {
551!
1044
          tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName, tstrerror(code));
×
1045
          taosMemoryFree(pTableSinkInfo);
×
1046
          return code;
×
1047
        }
1048

1049
        pTableSinkInfo->uid = 0;
551✔
1050
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
551✔
1051
      } else {
1052
        metaReaderClear(&mr);
×
1053

1054
        tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
×
1055
                dstTableName);
1056
        return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1057
      }
1058
    } else {
1059
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
31✔
1060
      if (!isValid) {
31!
1061
        metaReaderClear(&mr);
×
1062
        taosMemoryFree(pTableSinkInfo);
×
1063
        tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
×
1064
                dstTableName);
1065
        return terrno;
×
1066
      } else {
1067
        pTableData->uid = mr.me.uid;
31✔
1068
        pTableSinkInfo->uid = mr.me.uid;
31✔
1069

1070
        metaReaderClear(&mr);
31✔
1071
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
31✔
1072
      }
1073
    }
1074
  }
1075

1076
  return code;
6,007✔
1077
}
1078

1079
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
9,974✔
1080
                                 SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
1081
  int32_t numOfRows = pDataBlock->info.rows;
9,974✔
1082
  char*   dstTableName = pDataBlock->info.parTbName;
9,974✔
1083

1084
  tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
9,974✔
1085
          blockIndex + 1, numOfRows, suid);
1086

1087
  // convert all rows
1088
  int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id);
9,974✔
1089
  if (code != TSDB_CODE_SUCCESS) {
9,973!
1090
    tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
×
1091
    return code;
×
1092
  }
1093

1094
  if (pTableData->aRowP != NULL) {
9,973!
1095
    taosArraySort(pTableData->aRowP, tsAscendingSortFn);
9,973✔
1096
    tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
9,973✔
1097
  }
1098

1099
  return code;
9,973✔
1100
}
1101

1102
int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) {
7,399✔
1103
  int32_t          code = TSDB_CODE_SUCCESS;
7,399✔
1104
  const char*      id = pTask->id.idStr;
7,399✔
1105
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
7,399✔
1106
  int32_t          vgId = pTask->pMeta->vgId;
7,399✔
1107

1108
  if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
7,399✔
1109
    SMetaReader mer1 = {0};
1,769✔
1110
    metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
1,769✔
1111

1112
    code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
1,769✔
1113
    if (code != TSDB_CODE_SUCCESS) {
1,769!
1114
      tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
×
1115
      metaReaderClear(&mer1);
×
1116
      return code;
×
1117
    }
1118

1119
    pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
1,769✔
1120
    metaReaderClear(&mer1);
1,769✔
1121

1122
    if (pOutputInfo->tbSink.pTagSchema == NULL) {
1,769!
1123
      tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno));
×
1124
      return terrno;
×
1125
    }
1126

1127
    SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
1,769✔
1128
    SSchema*        pCol1 = &pTagSchema->pSchema[0];
1,769✔
1129
    if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
1,769!
1130
      pOutputInfo->tbSink.autoCreateCtb = true;
847✔
1131
    } else {
1132
      pOutputInfo->tbSink.autoCreateCtb = false;
922✔
1133
    }
1134
  }
1135

1136
  return code;
7,399✔
1137
}
1138

1139
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
7,399✔
1140
  const SArray*    pBlocks = (const SArray*)data;
7,399✔
1141
  SVnode*          pVnode = (SVnode*)vnode;
7,399✔
1142
  int64_t          suid = pTask->outputInfo.tbSink.stbUid;
7,399✔
1143
  char*            stbFullName = pTask->outputInfo.tbSink.stbFullName;
7,399✔
1144
  STSchema*        pTSchema = pTask->outputInfo.tbSink.pTSchema;
7,399✔
1145
  int32_t          vgId = TD_VID(pVnode);
7,399✔
1146
  int32_t          numOfBlocks = taosArrayGetSize(pBlocks);
7,399✔
1147
  int32_t          code = TSDB_CODE_SUCCESS;
7,399✔
1148
  const char*      id = pTask->id.idStr;
7,399✔
1149
  int64_t          earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
7,399✔
1150
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
7,399✔
1151

1152
  code = checkTagSchema(pTask, pVnode);
7,399✔
1153
  if (code != TSDB_CODE_SUCCESS) {
7,399!
1154
    return;
×
1155
  }
1156

1157
  bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
7,399✔
1158
  if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
7,399✔
1159
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
3,391!
1160
            numOfBlocks);
1161

1162
    for (int32_t i = 0; i < numOfBlocks; ++i) {
12,489✔
1163
      if (streamTaskShouldStop(pTask)) {
9,098!
1164
        return;
×
1165
      }
1166

1167
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
9,098✔
1168
      if (pDataBlock == NULL) {
9,098!
1169
        continue;
×
1170
      }
1171

1172
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
9,098✔
1173
        code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
1,045✔
1174
      } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
8,053✔
1175
        code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
3,282✔
1176
      } else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
4,771!
1177
        continue;
×
1178
      } else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
4,771!
1179
        code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
44✔
1180
      } else {
1181
        code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
4,727✔
1182
      }
1183
    }
1184
  } else {
1185
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
4,008✔
1186
    if (streamTaskShouldStop(pTask)) {
4,008!
1187
      return;
×
1188
    }
1189

1190
    reubuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
4,008✔
1191
  }
1192
}
1193

1194
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1195
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
7,399✔
1196
  for (int32_t i = 0; i < numOfBlocks; ++i) {
13,756✔
1197
    SSDataBlock* p = taosArrayGet(pBlocks, i);
9,378✔
1198
    if (p == NULL) {
9,378!
1199
      continue;
×
1200
    }
1201

1202
    if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
9,378✔
1203
      return false;
3,021✔
1204
    }
1205
  }
1206

1207
  return true;
4,378✔
1208
}
1209

1210
int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
3,349✔
1211
  int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
3,349✔
1212
  if (code != TSDB_CODE_SUCCESS) {
3,349!
1213
    taosMemoryFreeClear(pTableSinkInfo);
×
1214
  } else {
1215
    tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
3,349✔
1216
            pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
1217
  }
1218

1219
  return code;
3,349✔
1220
}
1221

1222
bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
12,441✔
1223
  void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
12,441✔
1224
  if (pVal) {
12,441✔
1225
    *pInfo = *(STableSinkInfo**)pVal;
9,078✔
1226
    return true;
9,078✔
1227
  }
1228

1229
  return false;
3,363✔
1230
}
1231

1232
int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
×
1233
  if (tSimpleHashGetSize(pSinkTableMap) == 0) {
×
1234
    return TSDB_CODE_SUCCESS;
×
1235
  }
1236

1237
  int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
×
1238
  if (code == 0) {
×
1239
    tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
×
1240
  } else {
1241
    tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
×
1242
  }
1243
  return code;
×
1244
}
1245

1246
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
1,045✔
1247
                                int64_t suid) {
1248
  SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
1,045✔
1249
  if (deleteReq.deleteReqs == NULL) {
1,045!
1250
    return terrno;
×
1251
  }
1252

1253
  int32_t code =
1254
      tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask));
1,045!
1255
  if (code != TSDB_CODE_SUCCESS) {
1,045!
1256
    return code;
×
1257
  }
1258

1259
  if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
1,045!
1260
    taosArrayDestroy(deleteReq.deleteReqs);
×
1261
    return TSDB_CODE_SUCCESS;
×
1262
  }
1263

1264
  int32_t len;
1265
  tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
1,045!
1266
  if (code != TSDB_CODE_SUCCESS) {
1,045!
1267
    qError("s-task:%s failed to encode delete request", pTask->id.idStr);
×
1268
    return code;
×
1269
  }
1270

1271
  SEncoder encoder = {0};
1,045✔
1272
  void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
1,045✔
1273
  void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
1,045✔
1274
  tEncoderInit(&encoder, abuf, len);
1,045✔
1275
  code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
1,045✔
1276
  tEncoderClear(&encoder);
1,045✔
1277
  taosArrayDestroy(deleteReq.deleteReqs);
1,045✔
1278

1279
  if (code) {
1,045!
1280
    return code;
×
1281
  }
1282

1283
  ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
1,045✔
1284

1285
  SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
1,045✔
1286
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
1,045!
1287
    tqDebug("failed to put delete req into write-queue since %s", terrstr());
×
1288
  }
1289

1290
  return TSDB_CODE_SUCCESS;
1,045✔
1291
}
1292

1293
void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
4,008✔
1294
  int32_t     code = 0;
4,008✔
1295
  const char* id = pTask->id.idStr;
4,008✔
1296
  int32_t     vgId = pTask->pMeta->vgId;
4,008✔
1297
  int32_t     numOfBlocks = taosArrayGetSize(pBlocks);
4,008✔
1298
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
4,008✔
1299
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
4,008✔
1300
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
4,008✔
1301

1302
  SHashObj* pTableIndexMap =
1303
      taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
4,008✔
1304

1305
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
4,008✔
1306
  if (submitReq.aSubmitTbData == NULL) {
4,008!
1307
    code = terrno;
×
1308
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
×
1309
    taosHashCleanup(pTableIndexMap);
×
1310
    return;
×
1311
  }
1312

1313
  bool hasSubmit = false;
4,008✔
1314
  for (int32_t i = 0; i < numOfBlocks; i++) {
9,248✔
1315
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
5,241✔
1316
    if (pDataBlock == NULL) {
5,241!
1317
      continue;
×
1318
    }
1319

1320
    if (pDataBlock->info.type == STREAM_CHECKPOINT) {
5,241!
1321
      continue;
×
1322
    }
1323

1324
    hasSubmit = true;
5,241✔
1325
    pTask->execInfo.sink.numOfBlocks += 1;
5,241✔
1326
    uint64_t groupId = pDataBlock->info.id.groupId;
5,241✔
1327

1328
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
5,241✔
1329

1330
    int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
5,241✔
1331
    if (index == NULL) {  // no data yet, append it
5,241✔
1332
      code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
4,388✔
1333
      if (code != TSDB_CODE_SUCCESS) {
4,388!
1334
        tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
×
1335
        continue;
×
1336
      }
1337

1338
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
4,388✔
1339
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
4,388!
1340
        if (tbData.pCreateTbReq != NULL) {
×
1341
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1342
          (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id);
×
1343
          tbData.pCreateTbReq = NULL;
×
1344
        }
1345
        continue;
×
1346
      }
1347

1348
      void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
4,388✔
1349
      if (p == NULL) {
4,388!
1350
        tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
×
1351
        continue;
×
1352
      }
1353

1354
      int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
4,388✔
1355
      code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
4,388✔
1356
      if (code) {
4,388!
1357
        tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
×
1358
        continue;
×
1359
      }
1360
    } else {
1361
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
853✔
1362
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
852!
1363
        if (tbData.pCreateTbReq != NULL) {
×
1364
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1365
          tbData.pCreateTbReq = NULL;
×
1366
        }
1367
        continue;
×
1368
      }
1369

1370
      SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
853✔
1371
      if (pExisted == NULL) {
853!
1372
        continue;
×
1373
      }
1374

1375
      code = doMergeExistedRows(pExisted, &tbData, id);
853✔
1376
      if (code != TSDB_CODE_SUCCESS) {
853!
1377
        continue;
×
1378
      }
1379
    }
1380

1381
    pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
5,241✔
1382
  }
1383

1384
  taosHashCleanup(pTableIndexMap);
4,007✔
1385

1386
  if (hasSubmit) {
4,008!
1387
    code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
4,008✔
1388
    if (code) {  // failed and continue
4,008!
1389
      tqError("vgId:%d failed to build and send submit msg", vgId);
×
1390
    }
1391
  } else {
1392
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1393
    tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
×
1394
  }
1395
}
1396

1397
int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) {
4,727✔
1398
  int32_t     code = 0;
4,727✔
1399
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
4,727✔
1400
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
4,727✔
1401
  const char* id = pTask->id.idStr;
4,727✔
1402
  int32_t     vgId = TD_VID(pVnode);
4,727✔
1403
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
4,727✔
1404

1405
  pTask->execInfo.sink.numOfBlocks += 1;
4,727✔
1406

1407
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
4,727✔
1408
  if (submitReq.aSubmitTbData == NULL) {
4,727!
1409
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno));
×
1410
    return terrno;
×
1411
  }
1412

1413
  SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
4,727✔
1414
  code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
4,727✔
1415
  if (code != TSDB_CODE_SUCCESS) {
4,727!
1416
    tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
×
1417
    return code;
×
1418
  }
1419

1420
  code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id);
4,727✔
1421
  if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
4,727!
1422
    if (tbData.pCreateTbReq != NULL) {
×
1423
      tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1424
      (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id);
×
1425
      tbData.pCreateTbReq = NULL;
×
1426
    }
1427

1428
    return code;
×
1429
  }
1430

1431
  void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
4,727✔
1432
  if (p == NULL) {
4,727!
1433
    tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
×
1434
    return terrno;
×
1435
  }
1436

1437
  code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
4,727✔
1438
  if (code) {  // failed and continue
4,727!
1439
    tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code));
×
1440
  }
1441

1442
  return code;
4,727✔
1443
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc