• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3608

12 Feb 2025 05:57AM UTC coverage: 63.066% (+1.4%) from 61.715%
#3608

push

travis-ci

web-flow
Merge pull request #29746 from taosdata/merge/mainto3.02

merge: from main to 3.0 branch

140199 of 286257 branches covered (48.98%)

Branch coverage included in aggregate %.

89 of 161 new or added lines in 18 files covered. (55.28%)

3211 existing lines in 190 files now uncovered.

218998 of 283298 relevant lines covered (77.3%)

5949310.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.0
/source/dnode/vnode/src/tq/tqSink.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcommon.h"
17
#include "tq.h"
18

19
typedef struct STableSinkInfo {
20
  uint64_t uid;
21
  tstr     name;
22
} STableSinkInfo;
23

24
static bool    hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
25
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
26
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
27
                                  SSubmitTbData* pTableData);
28
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
29
                                       int64_t suid);
30
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
31
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
32
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
33
                             int64_t earlyTs, const char* id);
34
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
35
                                        const char* dstTableName, int64_t* uid);
36

37
static bool    isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
38
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
39
                                  int32_t numOfTags);
40
static int32_t createDefaultTagColName(SArray** pColNameList);
41
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
42
                                          const char* stbFullName, int64_t gid, bool newSubTableRule);
43
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
44
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
45
                                           const char* id);
46
static bool    doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
47
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
48
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
49
static void    rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
50
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
51
                                    int64_t earlyTs);
52
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);
53

54
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
1,090✔
55
                         const char* pIdStr, bool newSubTableRule) {
56
  int32_t          totalRows = pDataBlock->info.rows;
1,090✔
57
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
1,090✔
58
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
1,090✔
59
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1,090✔
60
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1,090✔
61

62
  if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
1,090!
63
    return terrno;
×
64
  }
65

66
  tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
1,090!
67

68
  for (int32_t row = 0; row < totalRows; row++) {
2,911✔
69
    int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
1,821!
70
    int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
1,821!
71
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
1,821!
72

73
    char* name = NULL;
1,821✔
74
    char* originName = NULL;
1,821✔
75
    void* varTbName = NULL;
1,821✔
76
    if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
3,642✔
77
      varTbName = colDataGetVarData(pTbNameCol, row);
691✔
78
    }
79

80
    if (varTbName != NULL && varTbName != (void*)-1) {
2,497!
81
      size_t cap = TMAX(TSDB_TABLE_NAME_LEN, varDataLen(varTbName) + 1);
676✔
82
      name = taosMemoryMalloc(cap);
676!
83
      if (name == NULL) {
676!
84
        return terrno;
×
85
      }
86

87
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
676✔
88
      name[varDataLen(varTbName)] = '\0';
676✔
89
      if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
676!
90
          stbFullName) {
91
        int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
654✔
92
        if (code != TSDB_CODE_SUCCESS) {
654!
93
          return code;
×
94
        }
95
      }
96
    } else if (stbFullName) {
1,145✔
97
      int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name);
529✔
98
      if (code) {
529!
99
        return code;
×
100
      }
101
    } else {
102
      originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
616!
103
      if (originName == NULL) {
616!
104
        return terrno;
×
105
      }
106

107
      if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) {
616!
108
        name = varDataVal(originName);
616✔
109
      }
110
    }
111

112
    if (!name || *name == '\0') {
1,821!
113
      tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
×
114
             " since invalid tbname:%s",
115
             pIdStr, groupId, skey, ekey, name ? name : "NULL");
116
    } else {
117
      tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr,
1,821!
118
              groupId, name, skey, ekey);
119

120
      SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
1,821✔
121
      tstrncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
1,821✔
122
      void* p = taosArrayPush(deleteReq->deleteReqs, &req);
1,821✔
123
      if (p == NULL) {
1,821!
124
        return terrno;
×
125
      }
126
    }
127

128
    if (originName) {
1,821✔
129
      name = originName;
616✔
130
    }
131

132
    taosMemoryFreeClear(name);
1,821!
133
  }
134

135
  return 0;
1,090✔
136
}
137

138
static int32_t encodeCreateChildTableForRPC(void* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
3,584✔
139
  int32_t ret = 0;
3,584✔
140

141
  tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
3,584!
142
  if (ret < 0) {
3,584!
143
    ret = -1;
×
144
    goto end;
×
145
  }
146
  *contLen += sizeof(SMsgHead);
3,584✔
147
  *pBuf = rpcMallocCont(*contLen);
3,584✔
148
  if (NULL == *pBuf) {
3,584!
149
    ret = -1;
×
150
    goto end;
×
151
  }
152
  ((SMsgHead*)(*pBuf))->vgId = vgId;
3,584✔
153
  ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
3,584✔
154
  SEncoder coder = {0};
3,584✔
155
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
3,584✔
156
  if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
3,584!
157
    rpcFreeCont(*pBuf);
×
158
    *pBuf = NULL;
×
159
    *contLen = 0;
×
160
    tEncoderClear(&coder);
×
161
    ret = -1;
×
162
    goto end;
×
163
  }
164
  tEncoderClear(&coder);
3,584✔
165

166
end:
3,584✔
167
  return ret;
3,584✔
168
}
169

170
static int32_t encodeDropChildTableForRPC(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen) {
55✔
171
  int32_t  code = 0;
55✔
172
  SEncoder ec = {0};
55✔
173
  tEncodeSize(tEncodeSVDropTbBatchReq, pReqs, *contLen, code);
55!
174
  if (code < 0) {
55!
175
    code = TSDB_CODE_INVALID_MSG;
×
176
    goto end;
×
177
  }
178
  *contLen += sizeof(SMsgHead);
55✔
179
  *ppBuf = rpcMallocCont(*contLen);
55✔
180

181
  if (!*ppBuf) {
55!
182
    code = terrno;
×
183
    goto end;
×
184
  }
185

186
  ((SMsgHead*)(*ppBuf))->vgId = vgId;
55✔
187
  ((SMsgHead*)(*ppBuf))->contLen = htonl(*contLen);
55✔
188

189
  tEncoderInit(&ec, POINTER_SHIFT(*ppBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
55✔
190
  code = tEncodeSVDropTbBatchReq(&ec, pReqs);
55✔
191
  tEncoderClear(&ec);
55✔
192
  if (code < 0) {
55!
193
    rpcFreeCont(*ppBuf);
×
194
    *ppBuf = NULL;
×
195
    *contLen = 0;
×
196
    code = TSDB_CODE_INVALID_MSG;
×
197
    goto end;
×
198
  }
199
end:
55✔
200
  return code;
55✔
201
}
202

203
static int32_t tqPutReqToQueue(SVnode* pVnode, void* pReqs, int32_t(*encoder)(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen), tmsg_t msgType) {
3,639✔
204
  void*   buf = NULL;
3,639✔
205
  int32_t tlen = 0;
3,639✔
206

207
  int32_t code = encoder(pReqs, TD_VID(pVnode), &buf, &tlen);
3,639✔
208
  if (code) {
3,639!
209
    tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", TD_VID(pVnode), tstrerror(code));
×
210
    return code;
×
211
  }
212

213
  SRpcMsg msg = {.msgType = msgType, .pCont = buf, .contLen = tlen};
3,639✔
214
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
3,639✔
215
  if (code) {
3,639!
216
    tqError("failed to put into write-queue since %s", terrstr());
×
217
  }
218

219
  return code;
3,639✔
220
}
221

222
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
4,367✔
223
  pCreateTableReq->flags = 0;
4,367✔
224
  pCreateTableReq->type = TSDB_CHILD_TABLE;
4,367✔
225
  pCreateTableReq->ctb.suid = suid;
4,367✔
226

227
  // set super table name
228
  SName name = {0};
4,367✔
229

230
  int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
4,367✔
231
  if (code == 0) {
4,367!
232
    pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
4,367!
233
    if (pCreateTableReq->ctb.stbName == NULL) {  // ignore this error code
4,367!
234
      tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
×
235
      code = terrno;
×
236
    }
237
  }
238

239
  pCreateTableReq->ctb.tagNum = numOfTags;
4,367✔
240
  return code;
4,367✔
241
}
242

243
int32_t createDefaultTagColName(SArray** pColNameList) {
1,382✔
244
  *pColNameList = NULL;
1,382✔
245

246
  SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
1,382✔
247
  if (pTagColNameList == NULL) {
1,382!
248
    return terrno;
×
249
  }
250

251
  char  tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
1,382✔
252
  void* p = taosArrayPush(pTagColNameList, tagNameStr);
1,382✔
253
  if (p == NULL) {
1,382!
254
    taosArrayDestroy(pTagColNameList);
×
255
    return terrno;
×
256
  }
257

258
  *pColNameList = pTagColNameList;
1,382✔
259
  return TSDB_CODE_SUCCESS;
1,382✔
260
}
261

262
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
4,367✔
263
                                   int64_t gid, bool newSubTableRule) {
264
  if (pDataBlock->info.parTbName[0]) {
4,367✔
265
    if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
4,346✔
266
        !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
753!
267
      pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
22!
268
      if (pCreateTableReq->name == NULL) {
22!
269
        return terrno;
×
270
      }
271

272
      tstrncpy(pCreateTableReq->name, pDataBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
22✔
273
      int32_t code = buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid, TSDB_TABLE_NAME_LEN);
22✔
274
      if (code != TSDB_CODE_SUCCESS) {
22!
275
        return code;
×
276
      }
277
      //      tqDebug("gen name from:%s", pDataBlock->info.parTbName);
278
    } else {
279
      pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
4,324!
280
      if (pCreateTableReq->name == NULL) {
4,324!
281
        return terrno;
×
282
      }
283
      //      tqDebug("copy name:%s", pDataBlock->info.parTbName);
284
    }
285
  } else {
286
    int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
21✔
287
    return code;
21✔
288
  }
289

290
  return 0;
4,346✔
291
}
292

293
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
3,584✔
294
                                            SStreamTask* pTask, int64_t suid) {
295
  STSchema*          pTSchema = pTask->outputInfo.tbSink.pTSchema;
3,584✔
296
  int32_t            rows = pDataBlock->info.rows;
3,584✔
297
  SArray*            tagArray = NULL;
3,584✔
298
  const char*        id = pTask->id.idStr;
3,584✔
299
  int32_t            vgId = pTask->pMeta->vgId;
3,584✔
300
  int32_t            code = 0;
3,584✔
301
  STableSinkInfo*    pInfo = NULL;
3,584✔
302
  SVCreateTbBatchReq reqs = {0};
3,584✔
303
  SArray*            crTblArray = NULL;
3,584✔
304

305
  tqDebug("s-task:%s build create %d table(s) msg", id, rows);
3,584!
306

307
  tagArray = taosArrayInit(4, sizeof(STagVal));
3,584✔
308
  crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
3,584✔
309
  if ((NULL == reqs.pArray) || (tagArray == NULL)) {
3,584!
310
    tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
×
311
    code = terrno;
×
312
    goto _end;
×
313
  }
314

315
  for (int32_t rowId = 0; rowId < rows; rowId++) {
7,168✔
316
    SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
3,584✔
317

318
    int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
3,584✔
319
    int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
3,584✔
320

321
    code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
3,584✔
322
    if (code) {
3,584!
323
      tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
×
324
      continue;
×
325
    }
326

327
    taosArrayClear(tagArray);
3,584✔
328

329
    if (size == 2) {
3,584✔
330
      STagVal tagVal = {
599✔
331
          .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
599✔
332

333
      void* p = taosArrayPush(tagArray, &tagVal);
599✔
334
      if (p == NULL) {
599!
335
        return terrno;
×
336
      }
337

338
      code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
599✔
339
      if (code) {
599!
340
        return code;
×
341
      }
342
    } else {
343
      for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
27,410✔
344
        SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
24,425✔
345
        if (pTagData == NULL) {
24,425!
346
          continue;
7,767✔
347
        }
348

349
        STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
24,425✔
350
        void*   pData = colDataGetData(pTagData, rowId);
24,425!
351
        if (colDataIsNull_s(pTagData, rowId)) {
48,850!
352
          continue;
7,767✔
353
        } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
16,658!
354
          tagVal.nData = varDataLen(pData);
6,689✔
355
          tagVal.pData = (uint8_t*)varDataVal(pData);
6,689✔
356
        } else {
357
          memcpy(&tagVal.i64, pData, pTagData->info.bytes);
9,969✔
358
        }
359
        void* p = taosArrayPush(tagArray, &tagVal);
16,658✔
360
        if (p == NULL) {
16,658!
361
          code = terrno;
×
362
          goto _end;
×
363
        }
364
      }
365
    }
366

367
    code = tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
3,584✔
368
    taosArrayDestroy(tagArray);
3,584✔
369
    tagArray = NULL;
3,584✔
370

371
    if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
3,584!
372
      tdDestroySVCreateTbReq(pCreateTbReq);
373
      code = TSDB_CODE_OUT_OF_MEMORY;
×
374
      goto _end;
×
375
    }
376

377
    uint64_t gid = pDataBlock->info.id.groupId;
3,584✔
378
    if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
3,584!
379
      SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
3,584✔
380
      if (pGpIdColInfo == NULL) {
3,584!
381
        continue;
×
382
      }
383

384
      // todo remove this
385
      void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
3,584!
386
      if (gid != *(int64_t*)pGpIdData) {
3,584!
387
        tqError("s-task:%s vgId:%d invalid groupId:%" PRId64 " actual:%" PRId64 " in sink task", id, vgId, gid,
×
388
                *(int64_t*)pGpIdData);
389
      }
390
    }
391

392
    code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask));
3,584!
393
    if (code) {
3,584!
394
      goto _end;
×
395
    }
396

397
    void* p = taosArrayPush(reqs.pArray, pCreateTbReq);
3,584✔
398
    if (p == NULL) {
3,584!
399
      code = terrno;
×
400
      goto _end;
×
401
    }
402

403
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo);
3,584✔
404
    if (!alreadyCached) {
3,584✔
405
      code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo);
3,008✔
406
      if (code) {
3,008!
407
        tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
×
408
        continue;
×
409
      }
410

411
      code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id);
3,008✔
412
      if (code) {
3,008!
413
        tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
×
414
      }
415
    }
416

417
    tqDebug("s-task:%s build create table:%s msg complete", id, pCreateTbReq->name);
3,584!
418
  }
419

420
  reqs.nReqs = taosArrayGetSize(reqs.pArray);
3,584✔
421
  code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
3,584✔
422
  if (code != TSDB_CODE_SUCCESS) {
3,584!
423
    tqError("s-task:%s failed to send create table msg", id);
×
424
  }
425

426
_end:
3,584✔
427
  taosArrayDestroy(tagArray);
3,584✔
428
  taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
3,584✔
429
  return code;
3,584✔
430
}
431

432
static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SSDataBlock* pDataBlock,
55✔
433
                                          SStreamTask* pTask, int64_t suid) {
434
  int32_t          lino = 0;
55✔
435
  int32_t          code = 0;
55✔
436
  int32_t          rows = pDataBlock->info.rows;
55✔
437
  const char*      id = pTask->id.idStr;
55✔
438
  SVDropTbBatchReq batchReq = {0};
55✔
439
  SVDropTbReq      req = {0};
55✔
440

441
  if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
55!
442

443
  batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
55✔
444
  if (!batchReq.pArray) return terrno;
55!
445
  batchReq.nReqs = rows;
55✔
446
  req.suid = suid;
55✔
447
  req.igNotExists = true;
55✔
448

449
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
55✔
450
  char             tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
55✔
451
  int32_t          i = 0;
55✔
452
  void*            pData = colDataGetVarData(pTbNameCol, i);
55✔
453
  memcpy(tbName, varDataVal(pData), varDataLen(pData));
55✔
454
  tbName[varDataLen(pData) + 1] = 0;
55✔
455
  req.name = tbName;
55✔
456
  if (taosArrayPush(batchReq.pArray, &req) == NULL) {
110!
457
    TSDB_CHECK_CODE(terrno, lino, _exit);
×
458
  }
459

460
  SMetaReader mr = {0};
55✔
461
  metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
55✔
462

463
  code = metaGetTableEntryByName(&mr, tbName);
55✔
464
  if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
55!
465
    STableSinkInfo* pTableSinkInfo = NULL;
55✔
466
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
55✔
467
    if (alreadyCached) {
55✔
468
      pTableSinkInfo->uid = mr.me.uid;
30✔
469
    }
470
  }
471
  metaReaderClear(&mr);
55✔
472
  tqDebug("s-task:%s build drop %d table(s) msg", id, rows);
55!
473
  code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
55✔
474
  TSDB_CHECK_CODE(code, lino, _exit);
55!
475

476

477
  code = doWaitForDstTableDropped(pVnode, pTask, tbName);
55✔
478
  TSDB_CHECK_CODE(code, lino, _exit);
55!
479

480
_exit:
55✔
481
  if (batchReq.pArray) {
55!
482
    taosArrayDestroy(batchReq.pArray);
55✔
483
  }
484
  return code;
55✔
485
}
486

487
int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
9,041✔
488
  const char* id = pTask->id.idStr;
9,041✔
489
  int32_t     vgId = TD_VID(pVnode);
9,041✔
490
  int32_t     len = 0;
9,041✔
491
  void*       pBuf = NULL;
9,041✔
492
  int32_t     numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData);
9,041✔
493

494
  int32_t code = buildSubmitMsgImpl(pReq, vgId, &pBuf, &len);
9,042✔
495
  if (code != TSDB_CODE_SUCCESS) {
9,042!
496
    tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
×
497
    return code;
×
498
  }
499

500
  SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
9,042✔
501
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
9,042✔
502
  if (code == TSDB_CODE_SUCCESS) {
9,042!
503
    tqDebug("s-task:%s vgId:%d comp %d blocks into %d and send to dstTable(s) completed", id, vgId, numOfBlocks,
9,042✔
504
            numOfFinalBlocks);
505
  } else {
UNCOV
506
    tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
×
507
  }
508

509
  SSinkRecorder* pRec = &pTask->execInfo.sink;
9,042✔
510

511
  pRec->numOfSubmit += 1;
9,042✔
512
  if ((pRec->numOfSubmit % 1000) == 0) {
9,042!
UNCOV
513
    double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0;
×
UNCOV
514
    tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
×
515
           " submit into dst table, %.2fMiB duration:%.2f Sec.",
516
           id, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), el);
517
  }
518

519
  return TSDB_CODE_SUCCESS;
9,042✔
520
}
521

522
// merge the new submit table block with the existed blocks
523
// if ts in the new data block overlap with existed one, replace it
524
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
535✔
525
  int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
535✔
526
  int32_t newLen = taosArrayGetSize(pNew->aRowP);
535✔
527
  int32_t numOfPk = 0;
535✔
528

529
  int32_t j = 0, k = 0;
535✔
530
  SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
535✔
531
  if (pFinal == NULL) {
535!
532
    tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno));
×
533
    return terrno;
×
534
  }
535

536
  while (j < newLen && k < oldLen) {
1,629,854✔
537
    SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
1,629,319✔
538
    SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
1,629,319✔
539

540
    if (pNewRow->ts < pOldRow->ts) {
1,629,319✔
541
      void* p = taosArrayPush(pFinal, &pNewRow);
36✔
542
      if (p == NULL) {
36!
543
        return terrno;
×
544
      }
545
      j += 1;
36✔
546
    } else if (pNewRow->ts > pOldRow->ts) {
1,629,283✔
547
      void* p = taosArrayPush(pFinal, &pOldRow);
1,628,647✔
548
      if (p == NULL) {
1,628,647!
549
        return terrno;
×
550
      }
551

552
      k += 1;
1,628,647✔
553
    } else {
554
      // check for the existance of primary key
555
      if (pNewRow->numOfPKs == 0) {
635!
556
        void* p = taosArrayPush(pFinal, &pNewRow);
636✔
557
        if (p == NULL) {
636!
558
          return terrno;
×
559
        }
560

561
        k += 1;
636✔
562
        j += 1;
636✔
563
        tRowDestroy(pOldRow);
636✔
564
      } else {
565
        numOfPk = pNewRow->numOfPKs;
×
566

567
        SRowKey kNew, kOld;
568
        tRowGetKey(pNewRow, &kNew);
×
569
        tRowGetKey(pOldRow, &kOld);
×
570

571
        int32_t ret = tRowKeyCompare(&kNew, &kOld);
×
572
        if (ret <= 0) {
×
573
          void* p = taosArrayPush(pFinal, &pNewRow);
×
574
          if (p == NULL) {
×
575
            return terrno;
×
576
          }
577

578
          j += 1;
×
579

580
          if (ret == 0) {
×
581
            k += 1;
×
582
            tRowDestroy(pOldRow);
×
583
          }
584
        } else {
585
          void* p = taosArrayPush(pFinal, &pOldRow);
×
586
          if (p == NULL) {
×
587
            return terrno;
×
588
          }
589

590
          k += 1;
×
591
        }
592
      }
593
    }
594
  }
595

596
  while (j < newLen) {
269,768✔
597
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
269,233✔
598
    void* p = taosArrayPush(pFinal, &pRow);
269,233✔
599
    if (p == NULL) {
269,233!
600
      return terrno;
×
601
    }
602
  }
603

604
  while (k < oldLen) {
962✔
605
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
427✔
606
    void* p = taosArrayPush(pFinal, &pRow);
427✔
607
    if (p == NULL) {
427!
608
      return terrno;
×
609
    }
610
  }
611

612
  taosArrayDestroy(pNew->aRowP);
535✔
613
  taosArrayDestroy(pExisted->aRowP);
535✔
614
  pExisted->aRowP = pFinal;
535✔
615

616
  tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
535!
617
          (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
618
          (pNew->pCreateTbReq != NULL));
619

620
  tdDestroySVCreateTbReq(pNew->pCreateTbReq);
535!
621
  taosMemoryFree(pNew->pCreateTbReq);
535!
622
  return TSDB_CODE_SUCCESS;
535✔
623
}
624

625
bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) {
3,632✔
626
  if (pReader->me.type != TSDB_CHILD_TABLE) {
3,632!
627
    tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
×
628
    terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE;
×
629
    return false;
×
630
  }
631

632
  if (pReader->me.ctbEntry.suid != suid) {
3,632!
633
    tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
×
634
            ctbName, suid, pReader->me.ctbEntry.suid);
635
    terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
636
    return false;
×
637
  }
638

639
  terrno = 0;
3,632✔
640
  return true;
3,632✔
641
}
642

643
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
783✔
644
                                SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq) {
645
  *pReq = NULL;
783✔
646

647
  SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
783!
648
  if (pCreateTbReq == NULL) {
783!
649
    return terrno;
×
650
  }
651

652
  taosArrayClear(pTagArray);
783✔
653
  int32_t code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
783✔
654
  if (code != 0) {
783!
655
    return code;
×
656
  }
657

658
  STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
783✔
659
  void* p = taosArrayPush(pTagArray, &tagVal);
783✔
660
  if (p == NULL) {
783!
661
    return terrno;
×
662
  }
663

664
  code = tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
783✔
665
  if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
783!
666
    tdDestroySVCreateTbReq(pCreateTbReq);
667
    taosMemoryFreeClear(pCreateTbReq);
×
668
    return code;
×
669
  }
670

671
  code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
783✔
672
  if (code) {
783!
673
    return code;
×
674
  }
675

676
  // set table name
677
  code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule);
783✔
678
  if (code) {
783!
679
    return code;
×
680
  }
681

682
  *pReq = pCreateTbReq;
783✔
683
  return code;
783✔
684
}
685

686
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
9,042✔
687
  int32_t code = 0;
9,042✔
688
  void*   pBuf = NULL;
9,042✔
689
  *msgLen = 0;
9,042✔
690

691
  // encode
692
  int32_t len = 0;
9,042✔
693
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
9,042!
694

695
  SEncoder encoder = {0};
9,041✔
696
  len += sizeof(SSubmitReq2Msg);
9,041✔
697

698
  pBuf = rpcMallocCont(len);
9,041✔
699
  if (NULL == pBuf) {
9,042!
700
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
701
    return terrno;
×
702
  }
703

704
  ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
9,042✔
705
  ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
9,042✔
706
  ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
9,042✔
707

708
  tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
9,042✔
709
  if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) {
9,042!
710
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
711
    tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
×
712
    tEncoderClear(&encoder);
×
713
    rpcFreeCont(pBuf);
×
714
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
715
    return code;
×
716
  }
717

718
  tEncoderClear(&encoder);
9,041✔
719
  tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
9,041✔
720

721
  *msgLen = len;
9,042✔
722
  *pMsg = pBuf;
9,042✔
723
  return TSDB_CODE_SUCCESS;
9,042✔
724
}
725

726
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
35,753,036✔
727
  SRow* pRow1 = *(SRow**)p1;
35,753,036✔
728
  SRow* pRow2 = *(SRow**)p2;
35,753,036✔
729

730
  if (pRow1->ts == pRow2->ts) {
35,753,036!
731
    return 0;
×
732
  } else {
733
    return pRow1->ts > pRow2->ts ? 1 : -1;
35,753,036!
734
  }
735
}
736

737
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs,
10,192✔
738
                      const char* id) {
739
  int32_t numOfRows = pDataBlock->info.rows;
10,192✔
740
  int32_t code = TSDB_CODE_SUCCESS;
10,192✔
741

742
  SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
10,192✔
743
  pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
10,192✔
744

745
  if (pTableData->aRowP == NULL || pVals == NULL) {
10,192!
UNCOV
746
    taosArrayDestroy(pTableData->aRowP);
×
747
    pTableData->aRowP = NULL;
×
748
    taosArrayDestroy(pVals);
×
749
    code = terrno;
×
750
    tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
×
751
    return code;
×
752
  }
753

754
  for (int32_t j = 0; j < numOfRows; j++) {
6,187,055✔
755
    taosArrayClear(pVals);
6,176,772✔
756

757
    int32_t dataIndex = 0;
6,176,766✔
758
    int64_t ts = 0;
6,176,766✔
759

760
    for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
59,611,152✔
761
      const STColumn* pCol = &pTSchema->columns[k];
53,428,499✔
762

763
      // primary timestamp column, for debug purpose
764
      if (k == 0) {
53,428,499✔
765
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
6,176,748✔
766
        if (pColData == NULL) {
6,176,715!
767
          continue;
×
768
        }
769

770
        ts = *(int64_t*)colDataGetData(pColData, j);
6,176,715!
771
        tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
6,176,715✔
772

773
        if (ts < earlyTs) {
6,178,452!
774
          tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
×
775
                  ts, earlyTs);
776
          taosArrayDestroy(pTableData->aRowP);
×
777
          pTableData->aRowP = NULL;
×
778
          taosArrayDestroy(pVals);
×
779
          return TSDB_CODE_SUCCESS;
×
780
        }
781
      }
782

783
      if (IS_SET_NULL(pCol)) {
53,430,203✔
784
        if (pCol->flags & COL_IS_KEY) {
671!
785
          qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
×
786
                 pCol->colId, pCol->type);
787
          break;
×
788
        }
789
        SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
671✔
790
        void* p = taosArrayPush(pVals, &cv);
671✔
791
        if (p == NULL) {
671!
792
          return terrno;
×
793
        }
794
      } else {
795
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
53,429,532✔
796
        if (pColData == NULL) {
53,428,358!
797
          continue;
×
798
        }
799

800
        if (colDataIsNull_s(pColData, j)) {
106,856,716✔
801
          if (pCol->flags & COL_IS_KEY) {
5,902,924!
802
            qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
×
803
                   ts, pCol->colId, pCol->type);
804
            break;
×
805
          }
806

807
          SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
5,902,924✔
808
          void* p = taosArrayPush(pVals, &cv);
5,902,904✔
809
          if (p == NULL) {
5,902,904!
810
            return terrno;
×
811
          }
812

813
          dataIndex++;
5,902,904✔
814
        } else {
815
          void* colData = colDataGetData(pColData, j);
47,525,434!
816
          if (IS_VAR_DATA_TYPE(pCol->type)) {  // address copy, no value
53,344,475!
817
            SValue sv =
5,818,962✔
818
                (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
5,818,962✔
819
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
5,818,962✔
820
            void* p = taosArrayPush(pVals, &cv);
5,819,041✔
821
            if (p == NULL) {
5,819,041!
822
              return terrno;
×
823
            }
824
          } else {
825
            SValue sv = {.type = pCol->type};
41,706,472✔
826
            memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
41,706,472✔
827
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
41,706,472✔
828
            void* p = taosArrayPush(pVals, &cv);
41,711,770✔
829
            if (p == NULL) {
41,711,770!
830
              return terrno;
×
831
            }
832
          }
833
          dataIndex++;
47,530,811✔
834
        }
835
      }
836
    }
837

838
    SRow* pRow = NULL;
6,182,653✔
839
    code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
6,182,653✔
840
    if (code != TSDB_CODE_SUCCESS) {
6,177,130!
841
      tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
×
842
      taosArrayDestroy(pVals);
×
843
      tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
×
844
      return code;
×
845
    }
846

847
    void* p = taosArrayPush(pTableData->aRowP, &pRow);
6,177,130✔
848
    if (p == NULL) {
6,176,863!
849
      return terrno;
×
850
    }
851
  }
852

853
  taosArrayDestroy(pVals);
10,283✔
854
  return TSDB_CODE_SUCCESS;
10,192✔
855
}
856

857
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
3,470✔
858
                                 const char* dstTableName, int64_t* uid) {
859
  int32_t     vgId = TD_VID(pVnode);
3,470✔
860
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
3,470✔
861
  const char* id = pTask->id.idStr;
3,470✔
862

863
  while (pTableSinkInfo->uid == 0) {
5,616!
864
    if (streamTaskShouldStop(pTask)) {
5,616!
UNCOV
865
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
×
866
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
3,470✔
867
    }
868

869
    // wait for the table to be created
870
    SMetaReader mr = {0};
5,616✔
871
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
5,616✔
872

873
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
5,616✔
874
    if (code == 0) {  // table already exists, check its type and uid
5,616✔
875
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
3,470✔
876
      if (isValid) {  // not valid table, ignore it
3,470!
877
        tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
3,470✔
878
        // set the destination table uid
879
        (*uid) = mr.me.uid;
3,470✔
880
        pTableSinkInfo->uid = mr.me.uid;
3,470✔
881
      }
882

883
      metaReaderClear(&mr);
3,470✔
884
      return terrno;
3,470✔
885
    } else {  // not exist, wait and retry
886
      metaReaderClear(&mr);
2,146✔
887
      taosMsleep(100);
2,146✔
888
      tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
2,146!
889
    }
890
  }
891

892
  return TSDB_CODE_SUCCESS;
×
893
}
894

895
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
55✔
896
  int32_t vgId = TD_VID(pVnode);
55✔
897
  int64_t suid = pTask->outputInfo.tbSink.stbUid;
55✔
898
  const char* id = pTask->id.idStr;
55✔
899

900
  while (1) {
55✔
901
    if (streamTaskShouldStop(pTask)) {
110!
902
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s drop", id, dstTableName);
×
903
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
×
904
    }
905
    SMetaReader mr = {0};
110✔
906
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
110✔
907
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
110✔
908
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
110✔
909
      metaReaderClear(&mr);
55✔
910
      break;
55✔
911
    } else if (TSDB_CODE_SUCCESS == code) {
55!
912
      if (isValidDstChildTable(&mr, vgId, dstTableName, suid)) {
55!
913
        metaReaderClear(&mr);
55✔
914
        taosMsleep(100);
55✔
915
        tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
55!
916
      } else {
917
        metaReaderClear(&mr);
×
918
        break;
×
919
      }
920
    } else {
921
      tqError("s-task:%s failed to wait for table:%s drop", id, dstTableName);
×
922
      metaReaderClear(&mr);
×
923
      return terrno;
×
924
    }
925
  }
926
  return TSDB_CODE_SUCCESS;
55✔
927
}
928

929
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
3,841✔
930
  int32_t nameLen = strlen(pDstTableName);
3,841✔
931
  (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
3,841!
932
  if (*pInfo == NULL) {
3,841!
933
    return terrno;
×
934
  }
935

936
  (*pInfo)->name.len = nameLen;
3,841✔
937
  memcpy((*pInfo)->name.data, pDstTableName, nameLen);
3,841✔
938
  return TSDB_CODE_SUCCESS;
3,841✔
939
}
940

941
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
9,655✔
942
                           SSubmitTbData* pTableData) {
943
  uint64_t        groupId = pDataBlock->info.id.groupId;
9,655✔
944
  char*           dstTableName = pDataBlock->info.parTbName;
9,655✔
945
  int32_t         numOfRows = pDataBlock->info.rows;
9,655✔
946
  const char*     id = pTask->id.idStr;
9,655✔
947
  int64_t         suid = pTask->outputInfo.tbSink.stbUid;
9,655✔
948
  STSchema*       pTSchema = pTask->outputInfo.tbSink.pTSchema;
9,655✔
949
  int32_t         vgId = TD_VID(pVnode);
9,655✔
950
  STableSinkInfo* pTableSinkInfo = NULL;
9,655✔
951
  int32_t         code = 0;
9,655✔
952

953
  bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo);
9,655✔
954

955
  if (alreadyCached) {
9,655✔
956
    if (dstTableName[0] == 0) {  // data block does not set the destination table name
8,822✔
957
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
1,108✔
958
      tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
1,108!
959
              dstTableName);
960
    } else {
961
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
7,714✔
962
      if (pTableSinkInfo->uid != 0) {
7,714✔
963
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
4,484✔
964
                dstTableName, pTableSinkInfo->uid);
965
      } else {
966
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
3,230✔
967
                id, numOfRows, groupId, dstTableName);
968
      }
969
    }
970
  } else {  // this groupId has not been kept in cache yet
971
    if (dstTableName[0] == 0) {
833✔
972
      memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
315✔
973
      code = buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
315✔
974
      if (code) {
315!
975
        tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
×
976
        return code;
×
977
      }
978
    } else {
979
      if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
518!
980
          !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
×
981
        tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
×
982
        if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
983
          code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
984
        } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER && stbFullName) {
×
985
          code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
986
        }
987
        if (code != TSDB_CODE_SUCCESS) {
×
988
          return code;
×
989
        }
990
      }
991
    }
992

993
    code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo);
833✔
994
    if (code == 0) {
833!
995
      tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
833✔
996
    } else {
997
      tqDebug("s-task:%s failed to build new sinkTableInfo, dstTable:%s", id, dstTableName);
×
998
      return code;
×
999
    }
1000
  }
1001

1002
  if (alreadyCached) {
9,655✔
1003
    pTableData->uid = pTableSinkInfo->uid;
8,822✔
1004

1005
    if (pTableData->uid == 0) {
8,822✔
1006
      tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
3,470✔
1007
      return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
3,470✔
1008
    } else {
1009
      tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
5,352✔
1010
    }
1011
  } else {
1012
    // The auto-create option will always set to be open for those submit messages, which arrives during the period
1013
    // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
1014
    // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
1015
    // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
1016
    // randomly generated, but false table uid in the WAL.
1017
    SMetaReader mr = {0};
833✔
1018
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
833✔
1019

1020
    // table not in cache, let's try to extract it from tsdb meta
1021
    if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
833✔
1022
      metaReaderClear(&mr);
781✔
1023

1024
      if (pTask->outputInfo.tbSink.autoCreateCtb) {
781!
1025
        tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
781✔
1026

1027
        SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
781✔
1028
        if (pTagArray == NULL) {
781!
1029
          tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId,
×
1030
                  tstrerror(terrno));
1031
          return terrno;
×
1032
        }
1033

1034
        pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
781✔
1035
        code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
1,562✔
1036
                                       IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq);
781!
1037
        taosArrayDestroy(pTagArray);
781✔
1038

1039
        if (code) {
781!
1040
          tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName, tstrerror(code));
×
1041
          taosMemoryFree(pTableSinkInfo);
×
1042
          return code;
×
1043
        }
1044

1045
        pTableSinkInfo->uid = 0;
781✔
1046
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
781✔
1047
      } else {
1048
        tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
×
1049
                dstTableName);
1050
        return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1051
      }
1052
    } else {
1053
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
52✔
1054
      if (!isValid) {
52!
1055
        metaReaderClear(&mr);
×
1056
        taosMemoryFree(pTableSinkInfo);
×
1057
        tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
×
1058
                dstTableName);
1059
        return terrno;
×
1060
      } else {
1061
        pTableData->uid = mr.me.uid;
52✔
1062
        pTableSinkInfo->uid = mr.me.uid;
52✔
1063

1064
        metaReaderClear(&mr);
52✔
1065
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
52✔
1066
      }
1067
    }
1068
  }
1069

1070
  return code;
6,185✔
1071
}
1072

1073
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
10,192✔
1074
                                 SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
1075
  int32_t numOfRows = pDataBlock->info.rows;
10,192✔
1076
  char*   dstTableName = pDataBlock->info.parTbName;
10,192✔
1077

1078
  tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
10,192✔
1079
          blockIndex + 1, numOfRows, suid);
1080

1081
  // convert all rows
1082
  int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id);
10,192✔
1083
  if (code != TSDB_CODE_SUCCESS) {
10,192!
1084
    tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
×
1085
    return code;
×
1086
  }
1087

1088
  if (pTableData->aRowP != NULL) {
10,192!
1089
    taosArraySort(pTableData->aRowP, tsAscendingSortFn);
10,192✔
1090
    tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
10,192✔
1091
  }
1092

1093
  return code;
10,192✔
1094
}
1095

1096
int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) {
7,385✔
1097
  int32_t          code = TSDB_CODE_SUCCESS;
7,385✔
1098
  const char*      id = pTask->id.idStr;
7,385✔
1099
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
7,385✔
1100
  int32_t          vgId = pTask->pMeta->vgId;
7,385✔
1101

1102
  if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
7,385✔
1103
    SMetaReader mer1 = {0};
2,087✔
1104
    metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
2,087✔
1105

1106
    code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
2,087✔
1107
    if (code != TSDB_CODE_SUCCESS) {
2,087!
1108
      tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
×
1109
      metaReaderClear(&mer1);
×
1110
      return code;
×
1111
    }
1112

1113
    pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
2,087✔
1114
    metaReaderClear(&mer1);
2,087✔
1115

1116
    if (pOutputInfo->tbSink.pTagSchema == NULL) {
2,087!
1117
      tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno));
×
1118
      return terrno;
×
1119
    }
1120

1121
    SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
2,087✔
1122
    SSchema*        pCol1 = &pTagSchema->pSchema[0];
2,087✔
1123
    if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
2,087!
1124
      pOutputInfo->tbSink.autoCreateCtb = true;
1,012✔
1125
    } else {
1126
      pOutputInfo->tbSink.autoCreateCtb = false;
1,075✔
1127
    }
1128
  }
1129

1130
  return code;
7,385✔
1131
}
1132

1133
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
7,384✔
1134
  const SArray*    pBlocks = (const SArray*)data;
7,384✔
1135
  SVnode*          pVnode = (SVnode*)vnode;
7,384✔
1136
  int64_t          suid = pTask->outputInfo.tbSink.stbUid;
7,384✔
1137
  char*            stbFullName = pTask->outputInfo.tbSink.stbFullName;
7,384✔
1138
  STSchema*        pTSchema = pTask->outputInfo.tbSink.pTSchema;
7,384✔
1139
  int32_t          vgId = TD_VID(pVnode);
7,384✔
1140
  int32_t          numOfBlocks = taosArrayGetSize(pBlocks);
7,384✔
1141
  int32_t          code = TSDB_CODE_SUCCESS;
7,385✔
1142
  const char*      id = pTask->id.idStr;
7,385✔
1143
  int64_t          earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
7,385✔
1144
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
7,385✔
1145

1146
  code = checkTagSchema(pTask, pVnode);
7,385✔
1147
  if (code != TSDB_CODE_SUCCESS) {
7,385!
1148
    return;
×
1149
  }
1150

1151
  code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode);
7,385✔
1152
  if (code != TSDB_CODE_SUCCESS) {
7,385!
1153
    tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id);
×
1154
    // continue processing even if notification fails
1155
  }
1156

1157
  bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
7,385✔
1158
  if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
7,385✔
1159
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
3,456!
1160
            numOfBlocks);
1161

1162
    for (int32_t i = 0; i < numOfBlocks; ++i) {
13,281✔
1163
      if (streamTaskShouldStop(pTask)) {
9,825!
UNCOV
1164
        return;
×
1165
      }
1166

1167
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
9,825✔
1168
      if (pDataBlock == NULL) {
9,825!
1169
        continue;
×
1170
      }
1171

1172
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
9,825✔
1173
        code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
1,073✔
1174
      } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
8,752✔
1175
        code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
3,584✔
1176
      } else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
5,168!
1177
        continue;
×
1178
      } else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
5,168!
1179
        code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
55✔
1180
      } else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
5,113!
1181
        continue;
×
1182
      } else {
1183
        code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
5,113✔
1184
      }
1185
    }
1186
  } else {
1187
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
3,929✔
1188
    if (streamTaskShouldStop(pTask)) {
3,929!
1189
      return;
×
1190
    }
1191

1192
    rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
3,929✔
1193
  }
1194
}
1195

1196
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1197
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
7,385✔
1198
  for (int32_t i = 0; i < numOfBlocks; ++i) {
13,357✔
1199
    SSDataBlock* p = taosArrayGet(pBlocks, i);
9,137✔
1200
    if (p == NULL) {
9,137!
1201
      continue;
×
1202
    }
1203

1204
    if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
9,137✔
1205
      return false;
3,165✔
1206
    }
1207
  }
1208

1209
  return true;
4,220✔
1210
}
1211

1212
int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
3,841✔
1213
  int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
3,841✔
1214
  if (code != TSDB_CODE_SUCCESS) {
3,841!
1215
    taosMemoryFreeClear(pTableSinkInfo);
×
1216
  } else {
1217
    tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
3,841✔
1218
            pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
1219
  }
1220

1221
  return code;
3,841✔
1222
}
1223

1224
bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
13,293✔
1225
  void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
13,293✔
1226
  if (pVal) {
13,293✔
1227
    *pInfo = *(STableSinkInfo**)pVal;
9,428✔
1228
    return true;
9,428✔
1229
  }
1230

1231
  return false;
3,865✔
1232
}
1233

1234
int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
×
1235
  if (tSimpleHashGetSize(pSinkTableMap) == 0) {
×
1236
    return TSDB_CODE_SUCCESS;
×
1237
  }
1238

1239
  int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
×
1240
  if (code == 0) {
×
1241
    tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
×
1242
  } else {
1243
    tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
×
1244
  }
1245
  return code;
×
1246
}
1247

1248
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
1,073✔
1249
                                int64_t suid) {
1250
  SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
1,073✔
1251
  if (deleteReq.deleteReqs == NULL) {
1,073!
1252
    return terrno;
×
1253
  }
1254

1255
  int32_t code =
1256
      tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask));
1,073!
1257
  if (code != TSDB_CODE_SUCCESS) {
1,073!
1258
    return code;
×
1259
  }
1260

1261
  if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
1,073!
1262
    taosArrayDestroy(deleteReq.deleteReqs);
×
1263
    return TSDB_CODE_SUCCESS;
×
1264
  }
1265

1266
  int32_t len;
1267
  tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
1,073!
1268
  if (code != TSDB_CODE_SUCCESS) {
1,073!
1269
    qError("s-task:%s failed to encode delete request", pTask->id.idStr);
×
1270
    return code;
×
1271
  }
1272

1273
  SEncoder encoder = {0};
1,073✔
1274
  void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
1,073✔
1275
  void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
1,073✔
1276
  tEncoderInit(&encoder, abuf, len);
1,073✔
1277
  code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
1,073✔
1278
  tEncoderClear(&encoder);
1,073✔
1279
  taosArrayDestroy(deleteReq.deleteReqs);
1,073✔
1280

1281
  if (code) {
1,073!
1282
    return code;
×
1283
  }
1284

1285
  ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
1,073✔
1286

1287
  SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
1,073✔
1288
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
1,073!
1289
    tqDebug("failed to put delete req into write-queue since %s", terrstr());
×
1290
  }
1291

1292
  return TSDB_CODE_SUCCESS;
1,073✔
1293
}
1294

1295
void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
3,929✔
1296
  int32_t     code = 0;
3,929✔
1297
  const char* id = pTask->id.idStr;
3,929✔
1298
  int32_t     vgId = pTask->pMeta->vgId;
3,929✔
1299
  int32_t     numOfBlocks = taosArrayGetSize(pBlocks);
3,929✔
1300
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
3,929✔
1301
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
3,929✔
1302
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
3,929✔
1303

1304
  SHashObj* pTableIndexMap =
1305
      taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
3,929✔
1306

1307
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
3,929✔
1308
  if (submitReq.aSubmitTbData == NULL) {
3,929!
1309
    code = terrno;
×
1310
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
×
1311
    taosHashCleanup(pTableIndexMap);
×
1312
    return;
×
1313
  }
1314

1315
  bool hasSubmit = false;
3,929✔
1316
  for (int32_t i = 0; i < numOfBlocks; i++) {
9,006✔
1317
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
5,077✔
1318
    if (pDataBlock == NULL) {
5,077!
1319
      continue;
×
1320
    }
1321

1322
    if (pDataBlock->info.type == STREAM_CHECKPOINT) {
5,077!
1323
      continue;
×
1324
    }
1325

1326
    if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
5,077!
1327
      continue;
×
1328
    }
1329

1330
    hasSubmit = true;
5,077✔
1331
    pTask->execInfo.sink.numOfBlocks += 1;
5,077✔
1332
    uint64_t groupId = pDataBlock->info.id.groupId;
5,077✔
1333

1334
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
5,077✔
1335

1336
    int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
5,077✔
1337
    if (index == NULL) {  // no data yet, append it
5,077✔
1338
      code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
4,542✔
1339
      if (code != TSDB_CODE_SUCCESS) {
4,542!
1340
        tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
×
1341
        continue;
×
1342
      }
1343

1344
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
4,542✔
1345
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
4,542!
1346
        if (tbData.pCreateTbReq != NULL) {
×
1347
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1348
          (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id);
×
1349
          tbData.pCreateTbReq = NULL;
×
1350
        }
1351
        continue;
×
1352
      }
1353

1354
      void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
4,542✔
1355
      if (p == NULL) {
4,542!
1356
        tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
×
1357
        continue;
×
1358
      }
1359

1360
      int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
4,542✔
1361
      code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
4,542✔
1362
      if (code) {
4,542!
1363
        tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
×
1364
        continue;
×
1365
      }
1366
    } else {
1367
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
535✔
1368
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
535!
1369
        if (tbData.pCreateTbReq != NULL) {
×
1370
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1371
          tbData.pCreateTbReq = NULL;
×
1372
        }
1373
        continue;
×
1374
      }
1375

1376
      SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
535✔
1377
      if (pExisted == NULL) {
535!
1378
        continue;
×
1379
      }
1380

1381
      code = doMergeExistedRows(pExisted, &tbData, id);
535✔
1382
      if (code != TSDB_CODE_SUCCESS) {
535!
1383
        continue;
×
1384
      }
1385
    }
1386

1387
    pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
5,077✔
1388
  }
1389

1390
  taosHashCleanup(pTableIndexMap);
3,929✔
1391

1392
  if (hasSubmit) {
3,929!
1393
    code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
3,929✔
1394
    if (code) {  // failed and continue
3,929!
1395
      tqError("vgId:%d failed to build and send submit msg", vgId);
×
1396
    }
1397
  } else {
1398
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1399
    tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
×
1400
  }
1401
}
1402

1403
int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) {
5,113✔
1404
  int32_t     code = 0;
5,113✔
1405
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
5,113✔
1406
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
5,113✔
1407
  const char* id = pTask->id.idStr;
5,113✔
1408
  int32_t     vgId = TD_VID(pVnode);
5,113✔
1409
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
5,113✔
1410

1411
  pTask->execInfo.sink.numOfBlocks += 1;
5,113✔
1412

1413
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
5,113✔
1414
  if (submitReq.aSubmitTbData == NULL) {
5,113!
1415
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno));
×
1416
    return terrno;
×
1417
  }
1418

1419
  SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
5,113✔
1420
  code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
5,113✔
1421
  if (code != TSDB_CODE_SUCCESS) {
5,113!
UNCOV
1422
    tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
×
UNCOV
1423
    return code;
×
1424
  }
1425

1426
  code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id);
5,113✔
1427
  if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
5,113!
1428
    if (tbData.pCreateTbReq != NULL) {
×
1429
      tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1430
      (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id);
×
1431
      tbData.pCreateTbReq = NULL;
×
1432
    }
1433

1434
    return code;
×
1435
  }
1436

1437
  void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
5,113✔
1438
  if (p == NULL) {
5,113!
1439
    tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
×
1440
    return terrno;
×
1441
  }
1442

1443
  code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
5,113✔
1444
  if (code) {  // failed and continue
5,113!
1445
    tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code));
×
1446
  }
1447

1448
  return code;
5,113✔
1449
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc