• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3593

24 Jan 2025 08:57AM UTC coverage: 63.239% (-0.3%) from 63.546%
#3593

push

travis-ci

web-flow
Merge pull request #29638 from taosdata/docs/TS-5846-3.0

enh: TDengine modify taosBenchmark new query rule cases and add doc

140619 of 285630 branches covered (49.23%)

Branch coverage included in aggregate %.

218877 of 282844 relevant lines covered (77.38%)

19647377.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.81
/source/dnode/vnode/src/tq/tqSink.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcommon.h"
17
#include "tq.h"
18

19
typedef struct STableSinkInfo {
20
  uint64_t uid;
21
  tstr     name;
22
} STableSinkInfo;
23

24
static bool    hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
25
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
26
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
27
                                  SSubmitTbData* pTableData);
28
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
29
                                       int64_t suid);
30
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
31
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
32
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
33
                             int64_t earlyTs, const char* id);
34
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
35
                                        const char* dstTableName, int64_t* uid);
36

37
static bool    isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
38
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
39
                                  int32_t numOfTags);
40
static int32_t createDefaultTagColName(SArray** pColNameList);
41
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
42
                                          const char* stbFullName, int64_t gid, bool newSubTableRule);
43
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
44
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
45
                                           const char* id);
46
static bool    doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
47
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
48
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
49
static void    rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
50
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
51
                                    int64_t earlyTs);
52
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);
53

54
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
2,566✔
55
                         const char* pIdStr, bool newSubTableRule) {
56
  int32_t          totalRows = pDataBlock->info.rows;
2,566✔
57
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
2,566✔
58
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
2,567✔
59
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
2,566✔
60
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
2,566✔
61

62
  if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
2,564!
63
    return terrno;
×
64
  }
65

66
  tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
2,564✔
67

68
  for (int32_t row = 0; row < totalRows; row++) {
6,043✔
69
    int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
3,476!
70
    int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
3,476!
71
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
3,476!
72

73
    char* name = NULL;
3,476✔
74
    char* originName = NULL;
3,476✔
75
    void* varTbName = NULL;
3,476✔
76
    if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
6,952✔
77
      varTbName = colDataGetVarData(pTbNameCol, row);
2,089✔
78
    }
79

80
    if (varTbName != NULL && varTbName != (void*)-1) {
5,551!
81
      size_t cap = TMAX(TSDB_TABLE_NAME_LEN, varDataLen(varTbName) + 1);
2,074✔
82
      name = taosMemoryMalloc(cap);
2,074!
83
      if (name == NULL) {
2,072!
84
        return terrno;
×
85
      }
86

87
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
2,072✔
88
      name[varDataLen(varTbName)] = '\0';
2,072✔
89
      if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
2,072!
90
          stbFullName) {
91
        int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
1,902✔
92
        if (code != TSDB_CODE_SUCCESS) {
1,903!
93
          return code;
×
94
        }
95
      }
96
    } else if (stbFullName) {
1,402✔
97
      int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name);
787✔
98
      if (code) {
788!
99
        return code;
×
100
      }
101
    } else {
102
      originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
615!
103
      if (originName == NULL) {
616!
104
        return terrno;
×
105
      }
106

107
      if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) {
616!
108
        name = varDataVal(originName);
616✔
109
      }
110
    }
111

112
    if (!name || *name == '\0') {
3,479!
113
      tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
×
114
             " since invalid tbname:%s",
115
             pIdStr, groupId, skey, ekey, name ? name : "NULL");
116
    } else {
117
      tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr,
3,479✔
118
              groupId, name, skey, ekey);
119

120
      SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
3,479✔
121
      tstrncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
3,479✔
122
      void* p = taosArrayPush(deleteReq->deleteReqs, &req);
3,479✔
123
      if (p == NULL) {
3,479!
124
        return terrno;
×
125
      }
126
    }
127

128
    if (originName) {
3,479✔
129
      name = originName;
616✔
130
    }
131

132
    taosMemoryFreeClear(name);
3,479!
133
  }
134

135
  return 0;
2,567✔
136
}
137

138
static int32_t encodeCreateChildTableForRPC(void* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
6,804✔
139
  int32_t ret = 0;
6,804✔
140

141
  tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
6,804!
142
  if (ret < 0) {
6,800!
143
    ret = -1;
×
144
    goto end;
×
145
  }
146
  *contLen += sizeof(SMsgHead);
6,800✔
147
  *pBuf = rpcMallocCont(*contLen);
6,800✔
148
  if (NULL == *pBuf) {
6,804!
149
    ret = -1;
×
150
    goto end;
×
151
  }
152
  ((SMsgHead*)(*pBuf))->vgId = vgId;
6,804✔
153
  ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
6,804✔
154
  SEncoder coder = {0};
6,804✔
155
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
6,804✔
156
  if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
6,803!
157
    rpcFreeCont(*pBuf);
×
158
    *pBuf = NULL;
×
159
    *contLen = 0;
×
160
    tEncoderClear(&coder);
×
161
    ret = -1;
×
162
    goto end;
×
163
  }
164
  tEncoderClear(&coder);
6,801✔
165

166
end:
6,804✔
167
  return ret;
6,804✔
168
}
169

170
static int32_t encodeDropChildTableForRPC(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen) {
55✔
171
  int32_t  code = 0;
55✔
172
  SEncoder ec = {0};
55✔
173
  tEncodeSize(tEncodeSVDropTbBatchReq, pReqs, *contLen, code);
55!
174
  if (code < 0) {
55!
175
    code = TSDB_CODE_INVALID_MSG;
×
176
    goto end;
×
177
  }
178
  *contLen += sizeof(SMsgHead);
55✔
179
  *ppBuf = rpcMallocCont(*contLen);
55✔
180

181
  if (!*ppBuf) {
55!
182
    code = terrno;
×
183
    goto end;
×
184
  }
185

186
  ((SMsgHead*)(*ppBuf))->vgId = vgId;
55✔
187
  ((SMsgHead*)(*ppBuf))->contLen = htonl(*contLen);
55✔
188

189
  tEncoderInit(&ec, POINTER_SHIFT(*ppBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
55✔
190
  code = tEncodeSVDropTbBatchReq(&ec, pReqs);
55✔
191
  tEncoderClear(&ec);
55✔
192
  if (code < 0) {
55!
193
    rpcFreeCont(*ppBuf);
×
194
    *ppBuf = NULL;
×
195
    *contLen = 0;
×
196
    code = TSDB_CODE_INVALID_MSG;
×
197
    goto end;
×
198
  }
199
end:
55✔
200
  return code;
55✔
201
}
202

203
static int32_t tqPutReqToQueue(SVnode* pVnode, void* pReqs, int32_t(*encoder)(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen), tmsg_t msgType) {
6,859✔
204
  void*   buf = NULL;
6,859✔
205
  int32_t tlen = 0;
6,859✔
206

207
  int32_t code = encoder(pReqs, TD_VID(pVnode), &buf, &tlen);
6,859✔
208
  if (code) {
6,859!
209
    tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", TD_VID(pVnode), tstrerror(code));
×
210
    return code;
×
211
  }
212

213
  SRpcMsg msg = {.msgType = msgType, .pCont = buf, .contLen = tlen};
6,859✔
214
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
6,859✔
215
  if (code) {
6,861!
216
    tqError("failed to put into write-queue since %s", terrstr());
×
217
  }
218

219
  return code;
6,861✔
220
}
221

222
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
17,818✔
223
  pCreateTableReq->flags = 0;
17,818✔
224
  pCreateTableReq->type = TSDB_CHILD_TABLE;
17,818✔
225
  pCreateTableReq->ctb.suid = suid;
17,818✔
226

227
  // set super table name
228
  SName name = {0};
17,818✔
229

230
  int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
17,818✔
231
  if (code == 0) {
17,819!
232
    pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
17,819!
233
    if (pCreateTableReq->ctb.stbName == NULL) {  // ignore this error code
17,816!
234
      tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
×
235
      code = terrno;
×
236
    }
237
  }
238

239
  pCreateTableReq->ctb.tagNum = numOfTags;
17,816✔
240
  return code;
17,816✔
241
}
242

243
int32_t createDefaultTagColName(SArray** pColNameList) {
13,402✔
244
  *pColNameList = NULL;
13,402✔
245

246
  SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
13,402✔
247
  if (pTagColNameList == NULL) {
13,404!
248
    return terrno;
×
249
  }
250

251
  char  tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
13,404✔
252
  void* p = taosArrayPush(pTagColNameList, tagNameStr);
13,403✔
253
  if (p == NULL) {
13,403!
254
    taosArrayDestroy(pTagColNameList);
×
255
    return terrno;
×
256
  }
257

258
  *pColNameList = pTagColNameList;
13,403✔
259
  return TSDB_CODE_SUCCESS;
13,403✔
260
}
261

262
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
17,816✔
263
                                   int64_t gid, bool newSubTableRule) {
264
  if (pDataBlock->info.parTbName[0]) {
17,816✔
265
    if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
17,796✔
266
        !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
3,106!
267
      pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
74!
268
      if (pCreateTableReq->name == NULL) {
74!
269
        return terrno;
×
270
      }
271

272
      tstrncpy(pCreateTableReq->name, pDataBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
74✔
273
      int32_t code = buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid, TSDB_TABLE_NAME_LEN);
74✔
274
      if (code != TSDB_CODE_SUCCESS) {
74!
275
        return code;
×
276
      }
277
      //      tqDebug("gen name from:%s", pDataBlock->info.parTbName);
278
    } else {
279
      pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
17,722!
280
      if (pCreateTableReq->name == NULL) {
17,721!
281
        return terrno;
×
282
      }
283
      //      tqDebug("copy name:%s", pDataBlock->info.parTbName);
284
    }
285
  } else {
286
    int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
20✔
287
    return code;
21✔
288
  }
289

290
  return 0;
17,795✔
291
}
292

293
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
6,804✔
294
                                            SStreamTask* pTask, int64_t suid) {
295
  STSchema*          pTSchema = pTask->outputInfo.tbSink.pTSchema;
6,804✔
296
  int32_t            rows = pDataBlock->info.rows;
6,804✔
297
  SArray*            tagArray = NULL;
6,804✔
298
  const char*        id = pTask->id.idStr;
6,804✔
299
  int32_t            vgId = pTask->pMeta->vgId;
6,804✔
300
  int32_t            code = 0;
6,804✔
301
  STableSinkInfo*    pInfo = NULL;
6,804✔
302
  SVCreateTbBatchReq reqs = {0};
6,804✔
303
  SArray*            crTblArray = NULL;
6,804✔
304

305
  tqDebug("s-task:%s build create %d table(s) msg", id, rows);
6,804✔
306

307
  tagArray = taosArrayInit(4, sizeof(STagVal));
6,804✔
308
  crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
6,804✔
309
  if ((NULL == reqs.pArray) || (tagArray == NULL)) {
6,805!
310
    tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
×
311
    code = terrno;
×
312
    goto _end;
×
313
  }
314

315
  for (int32_t rowId = 0; rowId < rows; rowId++) {
13,609✔
316
    SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
6,805✔
317

318
    int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
6,805✔
319
    int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
6,805✔
320

321
    code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
6,805✔
322
    if (code) {
6,804!
323
      tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
×
324
      continue;
×
325
    }
326

327
    taosArrayClear(tagArray);
6,804✔
328

329
    if (size == 2) {
6,803✔
330
      STagVal tagVal = {
2,390✔
331
          .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
2,390✔
332

333
      void* p = taosArrayPush(tagArray, &tagVal);
2,390✔
334
      if (p == NULL) {
2,390!
335
        return terrno;
×
336
      }
337

338
      code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
2,390✔
339
      if (code) {
2,390!
340
        return code;
×
341
      }
342
    } else {
343
      for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
47,381✔
344
        SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
42,969✔
345
        if (pTagData == NULL) {
42,960!
346
          continue;
18,688✔
347
        }
348

349
        STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
42,960✔
350
        void*   pData = colDataGetData(pTagData, rowId);
42,960!
351
        if (colDataIsNull_s(pTagData, rowId)) {
85,920!
352
          continue;
18,688✔
353
        } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
24,272!
354
          tagVal.nData = varDataLen(pData);
7,721✔
355
          tagVal.pData = (uint8_t*)varDataVal(pData);
7,721✔
356
        } else {
357
          memcpy(&tagVal.i64, pData, pTagData->info.bytes);
16,551✔
358
        }
359
        void* p = taosArrayPush(tagArray, &tagVal);
24,280✔
360
        if (p == NULL) {
24,280!
361
          code = terrno;
×
362
          goto _end;
×
363
        }
364
      }
365
    }
366

367
    code = tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
6,802✔
368
    taosArrayDestroy(tagArray);
6,804✔
369
    tagArray = NULL;
6,805✔
370

371
    if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
6,805!
372
      tdDestroySVCreateTbReq(pCreateTbReq);
373
      code = TSDB_CODE_OUT_OF_MEMORY;
×
374
      goto _end;
×
375
    }
376

377
    uint64_t gid = pDataBlock->info.id.groupId;
6,805✔
378
    if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
6,805!
379
      SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
6,804✔
380
      if (pGpIdColInfo == NULL) {
6,805!
381
        continue;
×
382
      }
383

384
      // todo remove this
385
      void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
6,805!
386
      if (gid != *(int64_t*)pGpIdData) {
6,805!
387
        tqError("s-task:%s vgId:%d invalid groupId:%" PRId64 " actual:%" PRId64 " in sink task", id, vgId, gid,
×
388
                *(int64_t*)pGpIdData);
389
      }
390
    }
391

392
    code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask));
6,805!
393
    if (code) {
6,803!
394
      goto _end;
×
395
    }
396

397
    void* p = taosArrayPush(reqs.pArray, pCreateTbReq);
6,803✔
398
    if (p == NULL) {
6,804!
399
      code = terrno;
×
400
      goto _end;
×
401
    }
402

403
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo);
6,804✔
404
    if (!alreadyCached) {
6,804✔
405
      code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo);
5,648✔
406
      if (code) {
5,648!
407
        tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
×
408
        continue;
×
409
      }
410

411
      code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id);
5,648✔
412
      if (code) {
5,648!
413
        tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
×
414
      }
415
    }
416

417
    tqDebug("s-task:%s build create table:%s msg complete", id, pCreateTbReq->name);
6,804✔
418
  }
419

420
  reqs.nReqs = taosArrayGetSize(reqs.pArray);
6,804✔
421
  code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
6,804✔
422
  if (code != TSDB_CODE_SUCCESS) {
6,806!
423
    tqError("s-task:%s failed to send create table msg", id);
×
424
  }
425

426
_end:
6,806✔
427
  taosArrayDestroy(tagArray);
6,806✔
428
  taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
6,806✔
429
  return code;
6,803✔
430
}
431

432
static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SSDataBlock* pDataBlock,
55✔
433
                                          SStreamTask* pTask, int64_t suid) {
434
  int32_t          lino = 0;
55✔
435
  int32_t          code = 0;
55✔
436
  int32_t          rows = pDataBlock->info.rows;
55✔
437
  const char*      id = pTask->id.idStr;
55✔
438
  SVDropTbBatchReq batchReq = {0};
55✔
439
  SVDropTbReq      req = {0};
55✔
440

441
  if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
55!
442

443
  batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
55✔
444
  if (!batchReq.pArray) return terrno;
55!
445
  batchReq.nReqs = rows;
55✔
446
  req.suid = suid;
55✔
447
  req.igNotExists = true;
55✔
448

449
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
55✔
450
  char             tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
55✔
451
  int32_t          i = 0;
55✔
452
  void*            pData = colDataGetVarData(pTbNameCol, i);
55✔
453
  memcpy(tbName, varDataVal(pData), varDataLen(pData));
55✔
454
  tbName[varDataLen(pData) + 1] = 0;
55✔
455
  req.name = tbName;
55✔
456
  if (taosArrayPush(batchReq.pArray, &req) == NULL) {
110!
457
    TSDB_CHECK_CODE(terrno, lino, _exit);
×
458
  }
459

460
  SMetaReader mr = {0};
55✔
461
  metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
55✔
462

463
  code = metaGetTableEntryByName(&mr, tbName);
55✔
464
  if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
55!
465
    STableSinkInfo* pTableSinkInfo = NULL;
55✔
466
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
55✔
467
    if (alreadyCached) {
55✔
468
      pTableSinkInfo->uid = mr.me.uid;
28✔
469
    }
470
  }
471
  metaReaderClear(&mr);
55✔
472
  tqDebug("s-task:%s build drop %d table(s) msg", id, rows);
55!
473
  code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
55✔
474
  TSDB_CHECK_CODE(code, lino, _exit);
55!
475

476

477
  code = doWaitForDstTableDropped(pVnode, pTask, tbName);
55✔
478
  TSDB_CHECK_CODE(code, lino, _exit);
55!
479

480
_exit:
55✔
481
  if (batchReq.pArray) {
55!
482
    taosArrayDestroy(batchReq.pArray);
55✔
483
  }
484
  return code;
55✔
485
}
486

487
int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
16,604✔
488
  const char* id = pTask->id.idStr;
16,604✔
489
  int32_t     vgId = TD_VID(pVnode);
16,604✔
490
  int32_t     len = 0;
16,604✔
491
  void*       pBuf = NULL;
16,604✔
492
  int32_t     numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData);
16,604✔
493

494
  int32_t code = buildSubmitMsgImpl(pReq, vgId, &pBuf, &len);
16,603✔
495
  if (code != TSDB_CODE_SUCCESS) {
16,601!
496
    tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
×
497
    return code;
×
498
  }
499

500
  SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
16,601✔
501
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
16,601✔
502
  if (code == TSDB_CODE_SUCCESS) {
16,606!
503
    tqDebug("s-task:%s vgId:%d comp %d blocks into %d and send to dstTable(s) completed", id, vgId, numOfBlocks,
16,606✔
504
            numOfFinalBlocks);
505
  } else {
506
    tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
×
507
  }
508

509
  SSinkRecorder* pRec = &pTask->execInfo.sink;
16,604✔
510

511
  pRec->numOfSubmit += 1;
16,604✔
512
  if ((pRec->numOfSubmit % 1000) == 0) {
16,604!
513
    double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0;
×
514
    tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
×
515
           " submit into dst table, %.2fMiB duration:%.2f Sec.",
516
           id, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), el);
517
  }
518

519
  return TSDB_CODE_SUCCESS;
16,604✔
520
}
521

522
// merge the new submit table block with the existed blocks
523
// if ts in the new data block overlap with existed one, replace it
524
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
27,023✔
525
  int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
27,023✔
526
  int32_t newLen = taosArrayGetSize(pNew->aRowP);
27,023✔
527
  int32_t numOfPk = 0;
27,023✔
528

529
  int32_t j = 0, k = 0;
27,023✔
530
  SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
27,023✔
531
  if (pFinal == NULL) {
27,025!
532
    tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno));
×
533
    return terrno;
×
534
  }
535

536
  while (j < newLen && k < oldLen) {
1,811,680✔
537
    SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
1,784,661✔
538
    SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
1,784,661✔
539

540
    if (pNewRow->ts < pOldRow->ts) {
1,784,661✔
541
      void* p = taosArrayPush(pFinal, &pNewRow);
40✔
542
      if (p == NULL) {
40!
543
        return terrno;
×
544
      }
545
      j += 1;
40✔
546
    } else if (pNewRow->ts > pOldRow->ts) {
1,784,621✔
547
      void* p = taosArrayPush(pFinal, &pOldRow);
1,757,642✔
548
      if (p == NULL) {
1,757,642!
549
        return terrno;
×
550
      }
551

552
      k += 1;
1,757,642✔
553
    } else {
554
      // check for the existance of primary key
555
      if (pNewRow->numOfPKs == 0) {
26,975!
556
        void* p = taosArrayPush(pFinal, &pNewRow);
26,976✔
557
        if (p == NULL) {
26,976!
558
          return terrno;
×
559
        }
560

561
        k += 1;
26,976✔
562
        j += 1;
26,976✔
563
        tRowDestroy(pOldRow);
26,976✔
564
      } else {
565
        numOfPk = pNewRow->numOfPKs;
×
566

567
        SRowKey kNew, kOld;
568
        tRowGetKey(pNewRow, &kNew);
×
569
        tRowGetKey(pOldRow, &kOld);
×
570

571
        int32_t ret = tRowKeyCompare(&kNew, &kOld);
×
572
        if (ret <= 0) {
×
573
          void* p = taosArrayPush(pFinal, &pNewRow);
×
574
          if (p == NULL) {
×
575
            return terrno;
×
576
          }
577

578
          j += 1;
×
579

580
          if (ret == 0) {
×
581
            k += 1;
×
582
            tRowDestroy(pOldRow);
×
583
          }
584
        } else {
585
          void* p = taosArrayPush(pFinal, &pOldRow);
×
586
          if (p == NULL) {
×
587
            return terrno;
×
588
          }
589

590
          k += 1;
×
591
        }
592
      }
593
    }
594
  }
595

596
  while (j < newLen) {
377,769✔
597
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
350,750✔
598
    void* p = taosArrayPush(pFinal, &pRow);
350,750✔
599
    if (p == NULL) {
350,750!
600
      return terrno;
×
601
    }
602
  }
603

604
  while (k < oldLen) {
27,746✔
605
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
727✔
606
    void* p = taosArrayPush(pFinal, &pRow);
727✔
607
    if (p == NULL) {
727!
608
      return terrno;
×
609
    }
610
  }
611

612
  taosArrayDestroy(pNew->aRowP);
27,019✔
613
  taosArrayDestroy(pExisted->aRowP);
27,023✔
614
  pExisted->aRowP = pFinal;
27,023✔
615

616
  tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
27,023!
617
          (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
618
          (pNew->pCreateTbReq != NULL));
619

620
  tdDestroySVCreateTbReq(pNew->pCreateTbReq);
27,023!
621
  taosMemoryFree(pNew->pCreateTbReq);
27,023!
622
  return TSDB_CODE_SUCCESS;
27,023✔
623
}
624

625
bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) {
38,625✔
626
  if (pReader->me.type != TSDB_CHILD_TABLE) {
38,625!
627
    tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
×
628
    terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE;
×
629
    return false;
×
630
  }
631

632
  if (pReader->me.ctbEntry.suid != suid) {
38,625!
633
    tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
×
634
            ctbName, suid, pReader->me.ctbEntry.suid);
635
    terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
636
    return false;
×
637
  }
638

639
  terrno = 0;
38,625✔
640
  return true;
38,625✔
641
}
642

643
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
11,013✔
644
                                SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq) {
645
  *pReq = NULL;
11,013✔
646

647
  SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
11,013!
648
  if (pCreateTbReq == NULL) {
11,013!
649
    return terrno;
×
650
  }
651

652
  taosArrayClear(pTagArray);
11,013✔
653
  int32_t code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
11,013✔
654
  if (code != 0) {
11,013!
655
    return code;
×
656
  }
657

658
  STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
11,013✔
659
  void* p = taosArrayPush(pTagArray, &tagVal);
11,013✔
660
  if (p == NULL) {
11,013!
661
    return terrno;
×
662
  }
663

664
  code = tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
11,013✔
665
  if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
11,013!
666
    tdDestroySVCreateTbReq(pCreateTbReq);
667
    taosMemoryFreeClear(pCreateTbReq);
×
668
    return code;
×
669
  }
670

671
  code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
11,013✔
672
  if (code) {
11,013!
673
    return code;
×
674
  }
675

676
  // set table name
677
  code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule);
11,013✔
678
  if (code) {
11,013!
679
    return code;
×
680
  }
681

682
  *pReq = pCreateTbReq;
11,013✔
683
  return code;
11,013✔
684
}
685

686
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
16,601✔
687
  int32_t code = 0;
16,601✔
688
  void*   pBuf = NULL;
16,601✔
689
  *msgLen = 0;
16,601✔
690

691
  // encode
692
  int32_t len = 0;
16,601✔
693
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
16,601!
694

695
  SEncoder encoder = {0};
16,598✔
696
  len += sizeof(SSubmitReq2Msg);
16,598✔
697

698
  pBuf = rpcMallocCont(len);
16,598✔
699
  if (NULL == pBuf) {
16,602!
700
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
701
    return terrno;
×
702
  }
703

704
  ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
16,602✔
705
  ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
16,602✔
706
  ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
16,602✔
707

708
  tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
16,605✔
709
  if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) {
16,605!
710
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
711
    tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
×
712
    tEncoderClear(&encoder);
×
713
    rpcFreeCont(pBuf);
×
714
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
715
    return code;
×
716
  }
717

718
  tEncoderClear(&encoder);
16,601✔
719
  tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
16,603✔
720

721
  *msgLen = len;
16,602✔
722
  *pMsg = pBuf;
16,602✔
723
  return TSDB_CODE_SUCCESS;
16,602✔
724
}
725

726
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
47,519,120✔
727
  SRow* pRow1 = *(SRow**)p1;
47,519,120✔
728
  SRow* pRow2 = *(SRow**)p2;
47,519,120✔
729

730
  if (pRow1->ts == pRow2->ts) {
47,519,120!
731
    return 0;
×
732
  } else {
733
    return pRow1->ts > pRow2->ts ? 1 : -1;
47,519,120!
734
  }
735
}
736

737
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs,
90,307✔
738
                      const char* id) {
739
  int32_t numOfRows = pDataBlock->info.rows;
90,307✔
740
  int32_t code = TSDB_CODE_SUCCESS;
90,307✔
741

742
  SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
90,307✔
743
  pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
90,311✔
744

745
  if (pTableData->aRowP == NULL || pVals == NULL) {
90,315!
746
    taosArrayDestroy(pTableData->aRowP);
×
747
    pTableData->aRowP = NULL;
×
748
    taosArrayDestroy(pVals);
×
749
    code = terrno;
×
750
    tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
×
751
    return code;
×
752
  }
753

754
  for (int32_t j = 0; j < numOfRows; j++) {
10,101,298✔
755
    taosArrayClear(pVals);
10,010,963✔
756

757
    int32_t dataIndex = 0;
10,010,853✔
758
    int64_t ts = 0;
10,010,853✔
759

760
    for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
75,152,022✔
761
      const STColumn* pCol = &pTSchema->columns[k];
65,112,245✔
762

763
      // primary timestamp column, for debug purpose
764
      if (k == 0) {
65,112,245✔
765
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
10,007,714✔
766
        if (pColData == NULL) {
10,004,848!
767
          continue;
×
768
        }
769

770
        ts = *(int64_t*)colDataGetData(pColData, j);
10,004,848!
771
        tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
10,004,848✔
772

773
        if (ts < earlyTs) {
9,996,540!
774
          tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
×
775
                  ts, earlyTs);
776
          taosArrayDestroy(pTableData->aRowP);
×
777
          pTableData->aRowP = NULL;
×
778
          taosArrayDestroy(pVals);
×
779
          return TSDB_CODE_SUCCESS;
×
780
        }
781
      }
782

783
      if (IS_SET_NULL(pCol)) {
65,101,071✔
784
        if (pCol->flags & COL_IS_KEY) {
3,691!
785
          qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
×
786
                 pCol->colId, pCol->type);
787
          break;
×
788
        }
789
        SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
3,691✔
790
        void* p = taosArrayPush(pVals, &cv);
3,693✔
791
        if (p == NULL) {
3,693!
792
          return terrno;
×
793
        }
794
      } else {
795
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
65,097,380✔
796
        if (pColData == NULL) {
65,091,920!
797
          continue;
×
798
        }
799

800
        if (colDataIsNull_s(pColData, j)) {
130,183,840✔
801
          if (pCol->flags & COL_IS_KEY) {
5,914,375!
802
            qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
×
803
                   ts, pCol->colId, pCol->type);
804
            break;
×
805
          }
806

807
          SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
5,914,375✔
808
          void* p = taosArrayPush(pVals, &cv);
5,914,422✔
809
          if (p == NULL) {
5,914,422!
810
            return terrno;
×
811
          }
812

813
          dataIndex++;
5,914,422✔
814
        } else {
815
          void* colData = colDataGetData(pColData, j);
59,177,545!
816
          if (IS_VAR_DATA_TYPE(pCol->type)) {  // address copy, no value
65,012,320!
817
            SValue sv =
5,856,996✔
818
                (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
5,856,996✔
819
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
5,856,996✔
820
            void* p = taosArrayPush(pVals, &cv);
5,834,775✔
821
            if (p == NULL) {
5,834,775!
822
              return terrno;
×
823
            }
824
          } else {
825
            SValue sv = {.type = pCol->type};
53,320,549✔
826
            memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
53,320,549✔
827
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
53,320,549✔
828
            void* p = taosArrayPush(pVals, &cv);
53,388,279✔
829
            if (p == NULL) {
53,388,279!
830
              return terrno;
×
831
            }
832
          }
833
          dataIndex++;
59,223,054✔
834
        }
835
      }
836
    }
837

838
    SRow* pRow = NULL;
10,039,777✔
839
    code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
10,039,777✔
840
    if (code != TSDB_CODE_SUCCESS) {
10,017,395!
841
      tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
×
842
      taosArrayDestroy(pVals);
×
843
      tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
×
844
      return code;
×
845
    }
846

847
    void* p = taosArrayPush(pTableData->aRowP, &pRow);
10,017,395✔
848
    if (p == NULL) {
10,010,982!
849
      return terrno;
×
850
    }
851
  }
852

853
  taosArrayDestroy(pVals);
90,335✔
854
  return TSDB_CODE_SUCCESS;
90,316✔
855
}
856

857
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
6,364✔
858
                                 const char* dstTableName, int64_t* uid) {
859
  int32_t     vgId = TD_VID(pVnode);
6,364✔
860
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
6,364✔
861
  const char* id = pTask->id.idStr;
6,364✔
862

863
  while (pTableSinkInfo->uid == 0) {
11,411✔
864
    if (streamTaskShouldStop(pTask)) {
11,410✔
865
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
11!
866
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
6,367✔
867
    }
868

869
    // wait for the table to be created
870
    SMetaReader mr = {0};
11,399✔
871
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
11,399✔
872

873
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
11,403✔
874
    if (code == 0) {  // table already exists, check its type and uid
11,402✔
875
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
6,355✔
876
      if (isValid) {  // not valid table, ignore it
6,355!
877
        tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
6,355✔
878
        // set the destination table uid
879
        (*uid) = mr.me.uid;
6,354✔
880
        pTableSinkInfo->uid = mr.me.uid;
6,354✔
881
      }
882

883
      metaReaderClear(&mr);
6,354✔
884
      return terrno;
6,356✔
885
    } else {  // not exist, wait and retry
886
      metaReaderClear(&mr);
5,047✔
887
      taosMsleep(100);
5,047✔
888
      tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
5,046✔
889
    }
890
  }
891

892
  return TSDB_CODE_SUCCESS;
1✔
893
}
894

895
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
55✔
896
  int32_t vgId = TD_VID(pVnode);
55✔
897
  int64_t suid = pTask->outputInfo.tbSink.stbUid;
55✔
898
  const char* id = pTask->id.idStr;
55✔
899

900
  while (1) {
55✔
901
    if (streamTaskShouldStop(pTask)) {
110!
902
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s drop", id, dstTableName);
×
903
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
×
904
    }
905
    SMetaReader mr = {0};
110✔
906
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
110✔
907
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
110✔
908
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
110✔
909
      metaReaderClear(&mr);
55✔
910
      break;
55✔
911
    } else if (TSDB_CODE_SUCCESS == code) {
55!
912
      if (isValidDstChildTable(&mr, vgId, dstTableName, suid)) {
55!
913
        metaReaderClear(&mr);
55✔
914
        taosMsleep(100);
55✔
915
        tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
55!
916
      } else {
917
        metaReaderClear(&mr);
×
918
        break;
×
919
      }
920
    } else {
921
      tqError("s-task:%s failed to wait for table:%s drop", id, dstTableName);
×
922
      metaReaderClear(&mr);
×
923
      return terrno;
×
924
    }
925
  }
926
  return TSDB_CODE_SUCCESS;
55✔
927
}
928

929
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
48,819✔
930
  int32_t nameLen = strlen(pDstTableName);
48,819✔
931
  (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
48,819!
932
  if (*pInfo == NULL) {
48,820!
933
    return terrno;
×
934
  }
935

936
  (*pInfo)->name.len = nameLen;
48,820✔
937
  memcpy((*pInfo)->name.data, pDstTableName, nameLen);
48,820✔
938
  return TSDB_CODE_SUCCESS;
48,820✔
939
}
940

941
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
63,292✔
942
                           SSubmitTbData* pTableData) {
943
  uint64_t        groupId = pDataBlock->info.id.groupId;
63,292✔
944
  char*           dstTableName = pDataBlock->info.parTbName;
63,292✔
945
  int32_t         numOfRows = pDataBlock->info.rows;
63,292✔
946
  const char*     id = pTask->id.idStr;
63,292✔
947
  int64_t         suid = pTask->outputInfo.tbSink.stbUid;
63,292✔
948
  STSchema*       pTSchema = pTask->outputInfo.tbSink.pTSchema;
63,292✔
949
  int32_t         vgId = TD_VID(pVnode);
63,292✔
950
  STableSinkInfo* pTableSinkInfo = NULL;
63,292✔
951
  int32_t         code = 0;
63,292✔
952

953
  bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo);
63,292✔
954

955
  if (alreadyCached) {
63,297✔
956
    if (dstTableName[0] == 0) {  // data block does not set the destination table name
20,124✔
957
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
942✔
958
      tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
942!
959
              dstTableName);
960
    } else {
961
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
19,182✔
962
      if (pTableSinkInfo->uid != 0) {
19,182✔
963
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
13,051✔
964
                dstTableName, pTableSinkInfo->uid);
965
      } else {
966
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
6,131✔
967
                id, numOfRows, groupId, dstTableName);
968
      }
969
    }
970
  } else {  // this groupId has not been kept in cache yet
971
    if (dstTableName[0] == 0) {
43,173✔
972
      memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
311✔
973
      code = buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
311✔
974
      if (code) {
311!
975
        tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
×
976
        return code;
×
977
      }
978
    } else {
979
      if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
42,862!
980
          !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
×
981
        tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
×
982
        if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
983
          code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
984
        } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER && stbFullName) {
×
985
          code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
986
        }
987
        if (code != TSDB_CODE_SUCCESS) {
×
988
          return code;
×
989
        }
990
      }
991
    }
992

993
    code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo);
43,175✔
994
    if (code == 0) {
43,170!
995
      tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
43,170✔
996
    } else {
997
      tqDebug("s-task:%s failed to build new sinkTableInfo, dstTable:%s", id, dstTableName);
×
998
      return code;
×
999
    }
1000
  }
1001

1002
  if (alreadyCached) {
63,294✔
1003
    pTableData->uid = pTableSinkInfo->uid;
20,119✔
1004

1005
    if (pTableData->uid == 0) {
20,119✔
1006
      tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
6,366✔
1007
      return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
6,366✔
1008
    } else {
1009
      tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
13,753✔
1010
    }
1011
  } else {
1012
    // The auto-create option will always set to be open for those submit messages, which arrives during the period
1013
    // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
1014
    // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
1015
    // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
1016
    // randomly generated, but false table uid in the WAL.
1017
    SMetaReader mr = {0};
43,175✔
1018
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
43,175✔
1019

1020
    // table not in cache, let's try to extract it from tsdb meta
1021
    if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
43,175✔
1022
      metaReaderClear(&mr);
11,010✔
1023

1024
      if (pTask->outputInfo.tbSink.autoCreateCtb) {
11,010!
1025
        tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
11,010✔
1026

1027
        SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
11,010✔
1028
        if (pTagArray == NULL) {
11,011!
1029
          tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId,
×
1030
                  tstrerror(terrno));
1031
          return terrno;
×
1032
        }
1033

1034
        pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
11,011✔
1035
        code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
22,022✔
1036
                                       IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq);
11,011!
1037
        taosArrayDestroy(pTagArray);
11,011✔
1038

1039
        if (code) {
11,011!
1040
          tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName, tstrerror(code));
×
1041
          taosMemoryFree(pTableSinkInfo);
×
1042
          return code;
×
1043
        }
1044

1045
        pTableSinkInfo->uid = 0;
11,011✔
1046
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
11,011✔
1047
      } else {
1048
        tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
×
1049
                dstTableName);
1050
        return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1051
      }
1052
    } else {
1053
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
32,160✔
1054
      if (!isValid) {
32,160!
1055
        metaReaderClear(&mr);
×
1056
        taosMemoryFree(pTableSinkInfo);
×
1057
        tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
×
1058
                dstTableName);
1059
        return terrno;
×
1060
      } else {
1061
        pTableData->uid = mr.me.uid;
32,160✔
1062
        pTableSinkInfo->uid = mr.me.uid;
32,160✔
1063

1064
        metaReaderClear(&mr);
32,160✔
1065
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
32,164✔
1066
      }
1067
    }
1068
  }
1069

1070
  return code;
56,928✔
1071
}
1072

1073
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
90,307✔
1074
                                 SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
1075
  int32_t numOfRows = pDataBlock->info.rows;
90,307✔
1076
  char*   dstTableName = pDataBlock->info.parTbName;
90,307✔
1077

1078
  tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
90,307✔
1079
          blockIndex + 1, numOfRows, suid);
1080

1081
  // convert all rows
1082
  int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id);
90,307✔
1083
  if (code != TSDB_CODE_SUCCESS) {
90,314!
1084
    tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
×
1085
    return code;
×
1086
  }
1087

1088
  if (pTableData->aRowP != NULL) {
90,314!
1089
    taosArraySort(pTableData->aRowP, tsAscendingSortFn);
90,314✔
1090
    tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
90,312✔
1091
  }
1092

1093
  return code;
90,311✔
1094
}
1095

1096
int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) {
14,605✔
1097
  int32_t          code = TSDB_CODE_SUCCESS;
14,605✔
1098
  const char*      id = pTask->id.idStr;
14,605✔
1099
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,605✔
1100
  int32_t          vgId = pTask->pMeta->vgId;
14,605✔
1101

1102
  if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
14,605✔
1103
    SMetaReader mer1 = {0};
3,393✔
1104
    metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
3,393✔
1105

1106
    code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
3,392✔
1107
    if (code != TSDB_CODE_SUCCESS) {
3,392!
1108
      tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
×
1109
      metaReaderClear(&mer1);
×
1110
      return code;
×
1111
    }
1112

1113
    pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
3,394✔
1114
    metaReaderClear(&mer1);
3,394✔
1115

1116
    if (pOutputInfo->tbSink.pTagSchema == NULL) {
3,394✔
1117
      tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno));
1!
1118
      return terrno;
1✔
1119
    }
1120

1121
    SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
3,393✔
1122
    SSchema*        pCol1 = &pTagSchema->pSchema[0];
3,393✔
1123
    if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
3,393✔
1124
      pOutputInfo->tbSink.autoCreateCtb = true;
2,070✔
1125
    } else {
1126
      pOutputInfo->tbSink.autoCreateCtb = false;
1,323✔
1127
    }
1128
  }
1129

1130
  return code;
14,605✔
1131
}
1132

1133
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
14,606✔
1134
  const SArray*    pBlocks = (const SArray*)data;
14,606✔
1135
  SVnode*          pVnode = (SVnode*)vnode;
14,606✔
1136
  int64_t          suid = pTask->outputInfo.tbSink.stbUid;
14,606✔
1137
  char*            stbFullName = pTask->outputInfo.tbSink.stbFullName;
14,606✔
1138
  STSchema*        pTSchema = pTask->outputInfo.tbSink.pTSchema;
14,606✔
1139
  int32_t          vgId = TD_VID(pVnode);
14,606✔
1140
  int32_t          numOfBlocks = taosArrayGetSize(pBlocks);
14,606✔
1141
  int32_t          code = TSDB_CODE_SUCCESS;
14,611✔
1142
  const char*      id = pTask->id.idStr;
14,611✔
1143
  int64_t          earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
14,611✔
1144
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,607✔
1145

1146
  code = checkTagSchema(pTask, pVnode);
14,607✔
1147
  if (code != TSDB_CODE_SUCCESS) {
14,606!
1148
    return;
×
1149
  }
1150

1151
  code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode);
14,606✔
1152
  if (code != TSDB_CODE_SUCCESS) {
14,610!
1153
    tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id);
×
1154
    // continue processing even if notification fails
1155
  }
1156

1157
  bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
14,610✔
1158
  if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
14,610✔
1159
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
6,831✔
1160
            numOfBlocks);
1161

1162
    for (int32_t i = 0; i < numOfBlocks; ++i) {
25,064✔
1163
      if (streamTaskShouldStop(pTask)) {
18,239✔
1164
        return;
5✔
1165
      }
1166

1167
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
18,235✔
1168
      if (pDataBlock == NULL) {
18,234!
1169
        continue;
×
1170
      }
1171

1172
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
18,234✔
1173
        code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
2,549✔
1174
      } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
15,685✔
1175
        code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
6,804✔
1176
      } else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
8,881!
1177
        continue;
×
1178
      } else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
8,881!
1179
        code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
55✔
1180
      } else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
8,826!
1181
        continue;
×
1182
      } else {
1183
        code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
8,826✔
1184
      }
1185
    }
1186
  } else {
1187
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
7,779✔
1188
    if (streamTaskShouldStop(pTask)) {
7,779!
1189
      return;
×
1190
    }
1191

1192
    rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
7,783✔
1193
  }
1194
}
1195

1196
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1197
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
14,606✔
1198
  for (int32_t i = 0; i < numOfBlocks; ++i) {
97,324✔
1199
    SSDataBlock* p = taosArrayGet(pBlocks, i);
89,140✔
1200
    if (p == NULL) {
89,141!
1201
      continue;
×
1202
    }
1203

1204
    if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
89,141✔
1205
      return false;
6,423✔
1206
    }
1207
  }
1208

1209
  return true;
8,184✔
1210
}
1211

1212
int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
48,818✔
1213
  int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
48,818✔
1214
  if (code != TSDB_CODE_SUCCESS) {
48,818!
1215
    taosMemoryFreeClear(pTableSinkInfo);
×
1216
  } else {
1217
    tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
48,818✔
1218
            pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
1219
  }
1220

1221
  return code;
48,823✔
1222
}
1223

1224
bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
70,148✔
1225
  void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
70,148✔
1226
  if (pVal) {
70,156✔
1227
    *pInfo = *(STableSinkInfo**)pVal;
21,308✔
1228
    return true;
21,308✔
1229
  }
1230

1231
  return false;
48,848✔
1232
}
1233

1234
int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
×
1235
  if (tSimpleHashGetSize(pSinkTableMap) == 0) {
×
1236
    return TSDB_CODE_SUCCESS;
×
1237
  }
1238

1239
  int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
×
1240
  if (code == 0) {
×
1241
    tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
×
1242
  } else {
1243
    tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
×
1244
  }
1245
  return code;
×
1246
}
1247

1248
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
2,549✔
1249
                                int64_t suid) {
1250
  SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
2,549✔
1251
  if (deleteReq.deleteReqs == NULL) {
2,551!
1252
    return terrno;
×
1253
  }
1254

1255
  int32_t code =
1256
      tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask));
2,551!
1257
  if (code != TSDB_CODE_SUCCESS) {
2,550!
1258
    return code;
×
1259
  }
1260

1261
  if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
2,550!
1262
    taosArrayDestroy(deleteReq.deleteReqs);
×
1263
    return TSDB_CODE_SUCCESS;
×
1264
  }
1265

1266
  int32_t len;
1267
  tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
2,549!
1268
  if (code != TSDB_CODE_SUCCESS) {
2,546!
1269
    qError("s-task:%s failed to encode delete request", pTask->id.idStr);
×
1270
    return code;
×
1271
  }
1272

1273
  SEncoder encoder = {0};
2,546✔
1274
  void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
2,546✔
1275
  void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
2,549✔
1276
  tEncoderInit(&encoder, abuf, len);
2,549✔
1277
  code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
2,551✔
1278
  tEncoderClear(&encoder);
2,549✔
1279
  taosArrayDestroy(deleteReq.deleteReqs);
2,551✔
1280

1281
  if (code) {
2,551!
1282
    return code;
×
1283
  }
1284

1285
  ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
2,551✔
1286

1287
  SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
2,551✔
1288
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
2,551!
1289
    tqDebug("failed to put delete req into write-queue since %s", terrstr());
×
1290
  }
1291

1292
  return TSDB_CODE_SUCCESS;
2,551✔
1293
}
1294

1295
void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
7,779✔
1296
  int32_t     code = 0;
7,779✔
1297
  const char* id = pTask->id.idStr;
7,779✔
1298
  int32_t     vgId = pTask->pMeta->vgId;
7,779✔
1299
  int32_t     numOfBlocks = taosArrayGetSize(pBlocks);
7,779✔
1300
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
7,782✔
1301
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
7,782✔
1302
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
7,782✔
1303

1304
  SHashObj* pTableIndexMap =
1305
      taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
7,782✔
1306

1307
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
7,783✔
1308
  if (submitReq.aSubmitTbData == NULL) {
7,783!
1309
    code = terrno;
×
1310
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
×
1311
    taosHashCleanup(pTableIndexMap);
×
1312
    return;
×
1313
  }
1314

1315
  bool hasSubmit = false;
7,783✔
1316
  for (int32_t i = 0; i < numOfBlocks; i++) {
89,272✔
1317
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
81,487✔
1318
    if (pDataBlock == NULL) {
81,488!
1319
      continue;
×
1320
    }
1321

1322
    if (pDataBlock->info.type == STREAM_CHECKPOINT) {
81,488!
1323
      continue;
×
1324
    }
1325

1326
    if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
81,488!
1327
      continue;
×
1328
    }
1329

1330
    hasSubmit = true;
81,488✔
1331
    pTask->execInfo.sink.numOfBlocks += 1;
81,488✔
1332
    uint64_t groupId = pDataBlock->info.id.groupId;
81,488✔
1333

1334
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
81,488✔
1335

1336
    int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
81,488✔
1337
    if (index == NULL) {  // no data yet, append it
81,489✔
1338
      code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
54,468✔
1339
      if (code != TSDB_CODE_SUCCESS) {
54,465!
1340
        tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
×
1341
        continue;
×
1342
      }
1343

1344
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
54,465✔
1345
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
54,466!
1346
        if (tbData.pCreateTbReq != NULL) {
×
1347
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1348
          (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id);
×
1349
          tbData.pCreateTbReq = NULL;
×
1350
        }
1351
        continue;
×
1352
      }
1353

1354
      void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
54,468✔
1355
      if (p == NULL) {
54,466!
1356
        tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
×
1357
        continue;
×
1358
      }
1359

1360
      int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
54,466✔
1361
      code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
54,467✔
1362
      if (code) {
54,468!
1363
        tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
×
1364
        continue;
×
1365
      }
1366
    } else {
1367
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
27,021✔
1368
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
27,023!
1369
        if (tbData.pCreateTbReq != NULL) {
×
1370
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1371
          tbData.pCreateTbReq = NULL;
×
1372
        }
1373
        continue;
×
1374
      }
1375

1376
      SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
27,023✔
1377
      if (pExisted == NULL) {
27,023!
1378
        continue;
×
1379
      }
1380

1381
      code = doMergeExistedRows(pExisted, &tbData, id);
27,023✔
1382
      if (code != TSDB_CODE_SUCCESS) {
27,023!
1383
        continue;
×
1384
      }
1385
    }
1386

1387
    pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
81,491✔
1388
  }
1389

1390
  taosHashCleanup(pTableIndexMap);
7,785✔
1391

1392
  if (hasSubmit) {
7,785✔
1393
    code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
7,784✔
1394
    if (code) {  // failed and continue
7,784!
1395
      tqError("vgId:%d failed to build and send submit msg", vgId);
×
1396
    }
1397
  } else {
1398
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
1✔
1399
    tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
×
1400
  }
1401
}
1402

1403
int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) {
8,826✔
1404
  int32_t     code = 0;
8,826✔
1405
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
8,826✔
1406
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
8,826✔
1407
  const char* id = pTask->id.idStr;
8,826✔
1408
  int32_t     vgId = TD_VID(pVnode);
8,826✔
1409
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
8,826✔
1410

1411
  pTask->execInfo.sink.numOfBlocks += 1;
8,826✔
1412

1413
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
8,826✔
1414
  if (submitReq.aSubmitTbData == NULL) {
8,829!
1415
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno));
×
1416
    return terrno;
×
1417
  }
1418

1419
  SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
8,829✔
1420
  code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
8,829✔
1421
  if (code != TSDB_CODE_SUCCESS) {
8,829✔
1422
    tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
11!
1423
    return code;
11✔
1424
  }
1425

1426
  code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id);
8,818✔
1427
  if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
8,820!
1428
    if (tbData.pCreateTbReq != NULL) {
×
1429
      tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1430
      (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id);
×
1431
      tbData.pCreateTbReq = NULL;
×
1432
    }
1433

1434
    return code;
×
1435
  }
1436

1437
  void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
8,820✔
1438
  if (p == NULL) {
8,820!
1439
    tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
×
1440
    return terrno;
×
1441
  }
1442

1443
  code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
8,820✔
1444
  if (code) {  // failed and continue
8,820!
1445
    tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code));
×
1446
  }
1447

1448
  return code;
8,820✔
1449
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc