• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4335

20 Jun 2025 05:45AM UTC coverage: 60.571% (-2.3%) from 62.916%
#4335

push

travis-ci

web-flow
fix: compatibility ci problems. (#31430)

149119 of 315107 branches covered (47.32%)

Branch coverage included in aggregate %.

231167 of 312731 relevant lines covered (73.92%)

6342953.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.21
/source/dnode/vnode/src/tq/tqSink.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcommon.h"
17
#include "tq.h"
18

19
typedef struct STableSinkInfo {
20
  uint64_t uid;
21
  tstr     name;
22
} STableSinkInfo;
23

24
static bool    hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
25
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
26
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
27
                                  SSubmitTbData* pTableData);
28
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
29
                                       int64_t suid);
30
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
31
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
32
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
33
                             int64_t earlyTs, const char* id);
34
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
35
                                        const char* dstTableName, int64_t* uid);
36

37
static bool    isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
38
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
39
                                  int32_t numOfTags);
40
static int32_t createDefaultTagColName(SArray** pColNameList);
41
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
42
                                          const char* stbFullName, int64_t gid, bool newSubTableRule, const char* id);
43
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
44
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
45
                                           const char* id);
46
static bool    doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
47
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
48
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
49
static void    rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
50
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
51
                                    int64_t earlyTs);
52
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);
53

54
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
1,022✔
55
                         const char* pIdStr, bool newSubTableRule) {
56
  int32_t          totalRows = pDataBlock->info.rows;
1,022✔
57
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
1,022✔
58
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
1,022✔
59
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1,022✔
60
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1,022✔
61

62
  if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
1,022!
63
    return terrno;
×
64
  }
65

66
  tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
1,022!
67

68
  for (int32_t row = 0; row < totalRows; row++) {
2,862✔
69
    int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
1,840!
70
    int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
1,840!
71
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
1,840!
72

73
    char* name = NULL;
1,840✔
74
    char* originName = NULL;
1,840✔
75
    void* varTbName = NULL;
1,840✔
76
    if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
3,680✔
77
      varTbName = colDataGetVarData(pTbNameCol, row);
511✔
78
    }
79

80
    if (varTbName != NULL && varTbName != (void*)-1) {
2,336!
81
      size_t cap = TMAX(TSDB_TABLE_NAME_LEN, varDataLen(varTbName) + 1);
496✔
82
      name = taosMemoryMalloc(cap);
496!
83
      if (name == NULL) {
496!
84
        return terrno;
×
85
      }
86

87
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
496✔
88
      name[varDataLen(varTbName)] = '\0';
496✔
89
      if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
496!
90
          stbFullName) {
91
        int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
484✔
92
        if (code != TSDB_CODE_SUCCESS) {
484!
93
          return code;
×
94
        }
95
      }
96
    } else if (stbFullName) {
1,344✔
97
      int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name);
728✔
98
      if (code) {
728!
99
        return code;
×
100
      }
101
    } else {
102
      originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
616!
103
      if (originName == NULL) {
616!
104
        return terrno;
×
105
      }
106

107
      if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) {
616!
108
        name = varDataVal(originName);
616✔
109
      }
110
    }
111

112
    if (!name || *name == '\0') {
1,840!
113
      tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
×
114
             " since invalid tbname:%s",
115
             pIdStr, groupId, skey, ekey, name ? name : "NULL");
116
    } else {
117
      tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr,
1,840!
118
              groupId, name, skey, ekey);
119

120
      SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
1,840✔
121
      tstrncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
1,840✔
122
      void* p = taosArrayPush(deleteReq->deleteReqs, &req);
1,840✔
123
      if (p == NULL) {
1,840!
124
        return terrno;
×
125
      }
126
    }
127

128
    if (originName) {
1,840✔
129
      name = originName;
616✔
130
    }
131

132
    taosMemoryFreeClear(name);
1,840!
133
  }
134

135
  return 0;
1,022✔
136
}
137

138
static int32_t encodeCreateChildTableForRPC(void* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
2,464✔
139
  int32_t ret = 0;
2,464✔
140

141
  tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
2,464!
142
  if (ret < 0) {
2,464!
143
    ret = -1;
×
144
    goto end;
×
145
  }
146
  *contLen += sizeof(SMsgHead);
2,464✔
147
  *pBuf = rpcMallocCont(*contLen);
2,464✔
148
  if (NULL == *pBuf) {
2,464!
149
    ret = -1;
×
150
    goto end;
×
151
  }
152
  ((SMsgHead*)(*pBuf))->vgId = vgId;
2,464✔
153
  ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
2,464✔
154
  SEncoder coder = {0};
2,464✔
155
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
2,464✔
156
  if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
2,464!
157
    rpcFreeCont(*pBuf);
×
158
    *pBuf = NULL;
×
159
    *contLen = 0;
×
160
    tEncoderClear(&coder);
×
161
    ret = -1;
×
162
    goto end;
×
163
  }
164
  tEncoderClear(&coder);
2,464✔
165

166
end:
2,464✔
167
  return ret;
2,464✔
168
}
169

170
static int32_t encodeDropChildTableForRPC(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen) {
×
171
  int32_t  code = 0;
×
172
  SEncoder ec = {0};
×
173
  tEncodeSize(tEncodeSVDropTbBatchReq, pReqs, *contLen, code);
×
174
  if (code < 0) {
×
175
    code = TSDB_CODE_INVALID_MSG;
×
176
    goto end;
×
177
  }
178
  *contLen += sizeof(SMsgHead);
×
179
  *ppBuf = rpcMallocCont(*contLen);
×
180

181
  if (!*ppBuf) {
×
182
    code = terrno;
×
183
    goto end;
×
184
  }
185

186
  ((SMsgHead*)(*ppBuf))->vgId = vgId;
×
187
  ((SMsgHead*)(*ppBuf))->contLen = htonl(*contLen);
×
188

189
  tEncoderInit(&ec, POINTER_SHIFT(*ppBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
×
190
  code = tEncodeSVDropTbBatchReq(&ec, pReqs);
×
191
  tEncoderClear(&ec);
×
192
  if (code < 0) {
×
193
    rpcFreeCont(*ppBuf);
×
194
    *ppBuf = NULL;
×
195
    *contLen = 0;
×
196
    code = TSDB_CODE_INVALID_MSG;
×
197
    goto end;
×
198
  }
199
end:
×
200
  return code;
×
201
}
202

203
static int32_t tqPutReqToQueue(SVnode* pVnode, void* pReqs, int32_t(*encoder)(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen), tmsg_t msgType) {
2,464✔
204
  void*   buf = NULL;
2,464✔
205
  int32_t tlen = 0;
2,464✔
206
  int32_t vgId = TD_VID(pVnode);
2,464✔
207

208
  int32_t code = encoder(pReqs, TD_VID(pVnode), &buf, &tlen);
2,464✔
209
  if (code) {
2,464!
210
    tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", vgId, tstrerror(code));
×
211
    return code;
×
212
  }
213

214
  SRpcMsg msg = {.msgType = msgType, .pCont = buf, .contLen = tlen};
2,464✔
215
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
2,464✔
216
  if (code) {
2,464!
217
    tqError("vgId:%d failed to put into write-queue since %s", vgId, tstrerror(code));
×
218
  }
219

220
  return code;
2,464✔
221
}
222

223
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
3,310✔
224
  pCreateTableReq->flags = 0;
3,310✔
225
  pCreateTableReq->type = TSDB_CHILD_TABLE;
3,310✔
226
  pCreateTableReq->ctb.suid = suid;
3,310✔
227

228
  // set super table name
229
  SName name = {0};
3,310✔
230

231
  int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
3,310✔
232
  if (code == 0) {
3,309!
233
    pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
3,309!
234
    if (pCreateTableReq->ctb.stbName == NULL) {  // ignore this error code
3,310!
235
      tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
×
236
      code = terrno;
×
237
    }
238
  }
239

240
  pCreateTableReq->ctb.tagNum = numOfTags;
3,310✔
241
  return code;
3,310✔
242
}
243

244
int32_t createDefaultTagColName(SArray** pColNameList) {
1,426✔
245
  *pColNameList = NULL;
1,426✔
246

247
  SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
1,426✔
248
  if (pTagColNameList == NULL) {
1,427!
249
    return terrno;
×
250
  }
251

252
  char  tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
1,427✔
253
  void* p = taosArrayPush(pTagColNameList, tagNameStr);
1,427✔
254
  if (p == NULL) {
1,427!
255
    taosArrayDestroy(pTagColNameList);
×
256
    return terrno;
×
257
  }
258

259
  *pColNameList = pTagColNameList;
1,427✔
260
  return TSDB_CODE_SUCCESS;
1,427✔
261
}
262

263
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
3,310✔
264
                                   int64_t gid, bool newSubTableRule, const char* id) {
265
  if (pDataBlock->info.parTbName[0]) {
3,310✔
266
    if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
3,291✔
267
        !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
729!
268
      pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
22!
269
      if (pCreateTableReq->name == NULL) {
22!
270
        return terrno;
×
271
      }
272

273
      tstrncpy(pCreateTableReq->name, pDataBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
22✔
274
      int32_t code = buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid, TSDB_TABLE_NAME_LEN);
22✔
275
      if (code != TSDB_CODE_SUCCESS) {
22!
276
        return code;
×
277
      }
278
      tqDebug("s-task:%s gen name from:%s blockdata", id, pDataBlock->info.parTbName);
22!
279
    } else {
280
      pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
3,269!
281
      if (pCreateTableReq->name == NULL) {
3,269!
282
        return terrno;
×
283
      }
284
      tqDebug("s-task:%s copy name:%s from blockdata", id, pDataBlock->info.parTbName);
3,269✔
285
    }
286
  } else {
287
    int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
19✔
288
    tqDebug("s-task:%s no name in blockdata, auto-created table name:%s", id, pCreateTableReq->name);
19!
289
    return code;
19✔
290
  }
291

292
  return 0;
3,291✔
293
}
294

295
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
2,464✔
296
                                            SStreamTask* pTask, int64_t suid) {
297
  STSchema*          pTSchema = pTask->outputInfo.tbSink.pTSchema;
2,464✔
298
  int32_t            rows = pDataBlock->info.rows;
2,464✔
299
  SArray*            tagArray = NULL;
2,464✔
300
  const char*        id = pTask->id.idStr;
2,464✔
301
  int32_t            vgId = pTask->pMeta->vgId;
2,464✔
302
  int32_t            code = 0;
2,464✔
303
  STableSinkInfo*    pInfo = NULL;
2,464✔
304
  SVCreateTbBatchReq reqs = {0};
2,464✔
305
  SArray*            crTblArray = NULL;
2,464✔
306

307
  tqDebug("s-task:%s build create %d table(s) msg", id, rows);
2,464!
308

309
  tagArray = taosArrayInit(4, sizeof(STagVal));
2,464✔
310
  crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
2,464✔
311
  if ((NULL == reqs.pArray) || (tagArray == NULL)) {
2,464!
312
    tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
×
313
    code = terrno;
×
314
    goto _end;
×
315
  }
316

317
  for (int32_t rowId = 0; rowId < rows; rowId++) {
4,928✔
318
    SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
2,464✔
319

320
    int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
2,464✔
321
    int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
2,464✔
322

323
    code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
2,464✔
324
    if (code) {
2,464!
325
      tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
×
326
      continue;
×
327
    }
328

329
    taosArrayClear(tagArray);
2,464✔
330

331
    if (size == 2) {
2,464✔
332
      STagVal tagVal = {
581✔
333
          .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
581✔
334

335
      void* p = taosArrayPush(tagArray, &tagVal);
581✔
336
      if (p == NULL) {
581!
337
        return terrno;
×
338
      }
339

340
      code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
581✔
341
      if (code) {
581!
342
        return code;
×
343
      }
344
    } else {
345
      for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
18,516✔
346
        SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
16,633✔
347
        if (pTagData == NULL) {
16,633!
348
          continue;
7,621✔
349
        }
350

351
        STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
16,633✔
352
        void*   pData = colDataGetData(pTagData, rowId);
16,633!
353
        if (colDataIsNull_s(pTagData, rowId)) {
33,266!
354
          continue;
7,621✔
355
        } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
9,012!
356
          tagVal.nData = varDataLen(pData);
3,431✔
357
          tagVal.pData = (uint8_t*)varDataVal(pData);
3,431✔
358
        } else {
359
          memcpy(&tagVal.i64, pData, pTagData->info.bytes);
5,581✔
360
        }
361
        void* p = taosArrayPush(tagArray, &tagVal);
9,012✔
362
        if (p == NULL) {
9,012!
363
          code = terrno;
×
364
          goto _end;
×
365
        }
366
      }
367
    }
368

369
    code = tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
2,464✔
370
    taosArrayDestroy(tagArray);
2,464✔
371
    tagArray = NULL;
2,464✔
372

373
    if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
2,464!
374
      tdDestroySVCreateTbReq(pCreateTbReq);
375
      code = TSDB_CODE_OUT_OF_MEMORY;
×
376
      goto _end;
×
377
    }
378

379
    uint64_t gid = pDataBlock->info.id.groupId;
2,464✔
380
    if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
2,464!
381
      SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
2,464✔
382
      if (pGpIdColInfo == NULL) {
2,464!
383
        continue;
×
384
      }
385

386
      // todo remove this
387
      void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
2,464!
388
      if (gid != *(int64_t*)pGpIdData) {
2,464!
389
        tqError("s-task:%s vgId:%d invalid groupId:%" PRId64 " actual:%" PRId64 " in sink task", id, vgId, gid,
×
390
                *(int64_t*)pGpIdData);
391
      }
392
    }
393

394
    code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask),
2,464!
395
                                      pTask->id.idStr);
396
    if (code) {
2,464!
397
      goto _end;
×
398
    }
399

400
    void* p = taosArrayPush(reqs.pArray, pCreateTbReq);
2,464✔
401
    if (p == NULL) {
2,464!
402
      code = terrno;
×
403
      goto _end;
×
404
    }
405

406
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo);
2,464✔
407
    if (!alreadyCached) {
2,464✔
408
      code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo);
1,898✔
409
      if (code) {
1,898!
410
        tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
×
411
        continue;
×
412
      }
413

414
      code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id);
1,898✔
415
      if (code) {
1,898!
416
        tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
×
417
      }
418
    }
419

420
    tqDebug("s-task:%s build create table:%s msg complete", id, pCreateTbReq->name);
2,464!
421
  }
422

423
  reqs.nReqs = taosArrayGetSize(reqs.pArray);
2,464✔
424
  code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
2,464✔
425
  if (code != TSDB_CODE_SUCCESS) {
2,464!
426
    tqError("s-task:%s failed to send create table msg, code:%s", id, tstrerror(code));
×
427
  }
428

429
_end:
2,464✔
430
  taosArrayDestroy(tagArray);
2,464✔
431
  taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
2,464✔
432
  return code;
2,464✔
433
}
434

435
static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SSDataBlock* pDataBlock,
×
436
                                          SStreamTask* pTask, int64_t suid) {
437
  int32_t          lino = 0;
×
438
  int32_t          code = 0;
×
439
  int32_t          rows = pDataBlock->info.rows;
×
440
  const char*      id = pTask->id.idStr;
×
441
  SVDropTbBatchReq batchReq = {0};
×
442
  SVDropTbReq      req = {0};
×
443

444
  if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
×
445

446
  batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
×
447
  if (!batchReq.pArray) return terrno;
×
448
  batchReq.nReqs = rows;
×
449
  req.suid = suid;
×
450
  req.igNotExists = true;
×
451

452
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
×
453
  char             tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
×
454
  int32_t          i = 0;
×
455
  void*            pData = colDataGetVarData(pTbNameCol, i);
×
456
  memcpy(tbName, varDataVal(pData), varDataLen(pData));
×
457
  tbName[varDataLen(pData) + 1] = 0;
×
458
  req.name = tbName;
×
459
  if (taosArrayPush(batchReq.pArray, &req) == NULL) {
×
460
    TSDB_CHECK_CODE(terrno, lino, _exit);
×
461
  }
462

463
  SMetaReader mr = {0};
×
464
  metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
×
465

466
  code = metaGetTableEntryByName(&mr, tbName);
×
467
  if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
×
468
    STableSinkInfo* pTableSinkInfo = NULL;
×
469
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
×
470
    if (alreadyCached) {
×
471
      pTableSinkInfo->uid = mr.me.uid;
×
472
    }
473
  }
474
  metaReaderClear(&mr);
×
475
  tqDebug("s-task:%s build drop %d table(s) msg", id, rows);
×
476
  code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
×
477
  TSDB_CHECK_CODE(code, lino, _exit);
×
478

479

480
  code = doWaitForDstTableDropped(pVnode, pTask, tbName);
×
481
  TSDB_CHECK_CODE(code, lino, _exit);
×
482

483
_exit:
×
484
  if (batchReq.pArray) {
×
485
    taosArrayDestroy(batchReq.pArray);
×
486
  }
487
  return code;
×
488
}
489

490
int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
7,296✔
491
  const char* id = pTask->id.idStr;
7,296✔
492
  int32_t     vgId = TD_VID(pVnode);
7,296✔
493
  int32_t     len = 0;
7,296✔
494
  void*       pBuf = NULL;
7,296✔
495
  int32_t     numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData);
7,296✔
496

497
  int32_t code = buildSubmitMsgImpl(pReq, vgId, &pBuf, &len);
7,297✔
498
  if (code != TSDB_CODE_SUCCESS) {
7,296!
499
    tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
×
500
    return code;
×
501
  }
502

503
  SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
7,296✔
504
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
7,296✔
505
  if (code == TSDB_CODE_SUCCESS) {
7,298!
506
    tqDebug("s-task:%s vgId:%d comp %d blocks into %d and send to dstTable(s) completed", id, vgId, numOfBlocks,
7,298✔
507
            numOfFinalBlocks);
508
  } else {
509
    tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
×
510
  }
511

512
  SSinkRecorder* pRec = &pTask->execInfo.sink;
7,298✔
513

514
  pRec->numOfSubmit += 1;
7,298✔
515
  if ((pRec->numOfSubmit % 1000) == 0) {
7,298!
516
    double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0;
×
517
    tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
×
518
           " submit into dst table, %.2fMiB duration:%.2f Sec.",
519
           id, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), el);
520
  }
521

522
  return TSDB_CODE_SUCCESS;
7,298✔
523
}
524

525
// merge the new submit table block with the existed blocks
526
// if ts in the new data block overlap with existed one, replace it
527
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
483✔
528
  int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
483✔
529
  int32_t newLen = taosArrayGetSize(pNew->aRowP);
483✔
530
  int32_t numOfPk = 0;
483✔
531

532
  int32_t j = 0, k = 0;
483✔
533
  SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
483✔
534
  if (pFinal == NULL) {
483!
535
    tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno));
×
536
    return terrno;
×
537
  }
538

539
  while (j < newLen && k < oldLen) {
1,645,183✔
540
    SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
1,644,700✔
541
    SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
1,644,700✔
542

543
    if (pNewRow->ts < pOldRow->ts) {
1,644,700✔
544
      void* p = taosArrayPush(pFinal, &pNewRow);
37✔
545
      if (p == NULL) {
37!
546
        return terrno;
×
547
      }
548
      j += 1;
37✔
549
    } else if (pNewRow->ts > pOldRow->ts) {
1,644,663✔
550
      void* p = taosArrayPush(pFinal, &pOldRow);
1,644,167✔
551
      if (p == NULL) {
1,644,167!
552
        return terrno;
×
553
      }
554

555
      k += 1;
1,644,167✔
556
    } else {
557
      // check for the existance of primary key
558
      if (pNewRow->numOfPKs == 0) {
496!
559
        void* p = taosArrayPush(pFinal, &pNewRow);
496✔
560
        if (p == NULL) {
496!
561
          return terrno;
×
562
        }
563

564
        k += 1;
496✔
565
        j += 1;
496✔
566
        tRowDestroy(pOldRow);
496✔
567
      } else {
568
        numOfPk = pNewRow->numOfPKs;
×
569

570
        SRowKey kNew, kOld;
571
        tRowGetKey(pNewRow, &kNew);
×
572
        tRowGetKey(pOldRow, &kOld);
×
573

574
        int32_t ret = tRowKeyCompare(&kNew, &kOld);
×
575
        if (ret <= 0) {
×
576
          void* p = taosArrayPush(pFinal, &pNewRow);
×
577
          if (p == NULL) {
×
578
            return terrno;
×
579
          }
580

581
          j += 1;
×
582

583
          if (ret == 0) {
×
584
            k += 1;
×
585
            tRowDestroy(pOldRow);
×
586
          }
587
        } else {
588
          void* p = taosArrayPush(pFinal, &pOldRow);
×
589
          if (p == NULL) {
×
590
            return terrno;
×
591
          }
592

593
          k += 1;
×
594
        }
595
      }
596
    }
597
  }
598

599
  while (j < newLen) {
270,347✔
600
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
269,864✔
601
    void* p = taosArrayPush(pFinal, &pRow);
269,864✔
602
    if (p == NULL) {
269,864!
603
      return terrno;
×
604
    }
605
  }
606

607
  while (k < oldLen) {
805✔
608
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
322✔
609
    void* p = taosArrayPush(pFinal, &pRow);
322✔
610
    if (p == NULL) {
322!
611
      return terrno;
×
612
    }
613
  }
614

615
  taosArrayDestroy(pNew->aRowP);
483✔
616
  taosArrayDestroy(pExisted->aRowP);
483✔
617
  pExisted->aRowP = pFinal;
483✔
618

619
  tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
483!
620
          (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
621
          (pNew->pCreateTbReq != NULL));
622

623
  tdDestroySVCreateTbReq(pNew->pCreateTbReq);
483!
624
  taosMemoryFree(pNew->pCreateTbReq);
483!
625
  return TSDB_CODE_SUCCESS;
483✔
626
}
627

628
bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) {
2,387✔
629
  if (pReader->me.type != TSDB_CHILD_TABLE) {
2,387!
630
    tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
×
631
    terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE;
×
632
    return false;
×
633
  }
634

635
  if (pReader->me.ctbEntry.suid != suid) {
2,387!
636
    tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
×
637
            ctbName, suid, pReader->me.ctbEntry.suid);
638
    terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
639
    return false;
×
640
  }
641

642
  terrno = 0;
2,387✔
643
  return true;
2,387✔
644
}
645

646
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
846✔
647
                                SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq, const char* id) {
648
  *pReq = NULL;
846✔
649

650
  SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
846!
651
  if (pCreateTbReq == NULL) {
846!
652
    return terrno;
×
653
  }
654

655
  taosArrayClear(pTagArray);
846✔
656
  int32_t code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
846✔
657
  if (code != 0) {
846!
658
    return code;
×
659
  }
660

661
  STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
846✔
662
  void* p = taosArrayPush(pTagArray, &tagVal);
846✔
663
  if (p == NULL) {
846!
664
    return terrno;
×
665
  }
666

667
  code = tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
846✔
668
  if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
845!
669
    tdDestroySVCreateTbReq(pCreateTbReq);
670
    taosMemoryFreeClear(pCreateTbReq);
×
671
    return code;
×
672
  }
673

674
  code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
845✔
675
  if (code) {
846!
676
    return code;
×
677
  }
678

679
  // set table name
680
  code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule,
846✔
681
                                    id);
682
  if (code) {
846!
683
    return code;
×
684
  }
685

686
  *pReq = pCreateTbReq;
846✔
687
  return code;
846✔
688
}
689

690
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
7,296✔
691
  int32_t code = 0;
7,296✔
692
  void*   pBuf = NULL;
7,296✔
693
  *msgLen = 0;
7,296✔
694

695
  // encode
696
  int32_t len = 0;
7,296✔
697
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
7,296!
698

699
  SEncoder encoder = {0};
7,294✔
700
  len += sizeof(SSubmitReq2Msg);
7,294✔
701

702
  pBuf = rpcMallocCont(len);
7,294✔
703
  if (NULL == pBuf) {
7,298!
704
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
705
    return terrno;
×
706
  }
707

708
  ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
7,298✔
709
  ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
7,298✔
710
  ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
7,298✔
711

712
  tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
7,295✔
713
  if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) {
7,295!
714
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
715
    tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
×
716
    tEncoderClear(&encoder);
×
717
    rpcFreeCont(pBuf);
×
718
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
719
    return code;
×
720
  }
721

722
  tEncoderClear(&encoder);
7,294✔
723
  tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
7,294✔
724

725
  *msgLen = len;
7,297✔
726
  *pMsg = pBuf;
7,297✔
727
  return TSDB_CODE_SUCCESS;
7,297✔
728
}
729

730
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
35,726,855✔
731
  SRow* pRow1 = *(SRow**)p1;
35,726,855✔
732
  SRow* pRow2 = *(SRow**)p2;
35,726,855✔
733

734
  if (pRow1->ts == pRow2->ts) {
35,726,855!
735
    return 0;
×
736
  } else {
737
    return pRow1->ts > pRow2->ts ? 1 : -1;
35,726,855!
738
  }
739
}
740

741
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs,
8,401✔
742
                      const char* id) {
743
  int32_t numOfRows = pDataBlock->info.rows;
8,401✔
744
  int32_t code = TSDB_CODE_SUCCESS;
8,401✔
745

746
  SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
8,401✔
747
  pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
8,401✔
748

749
  if (pTableData->aRowP == NULL || pVals == NULL) {
8,401!
750
    taosArrayDestroy(pTableData->aRowP);
×
751
    pTableData->aRowP = NULL;
×
752
    taosArrayDestroy(pVals);
×
753
    code = terrno;
×
754
    tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
×
755
    return code;
×
756
  }
757

758
  for (int32_t j = 0; j < numOfRows; j++) {
6,176,405✔
759
    taosArrayClear(pVals);
6,167,888✔
760

761
    int32_t dataIndex = 0;
6,167,872✔
762
    int64_t ts = 0;
6,167,872✔
763

764
    for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
59,458,339✔
765
      const STColumn* pCol = &pTSchema->columns[k];
53,259,993✔
766

767
      // primary timestamp column, for debug purpose
768
      if (k == 0) {
53,259,993✔
769
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
6,167,796✔
770
        if (pColData == NULL) {
6,167,676!
771
          continue;
×
772
        }
773

774
        ts = *(int64_t*)colDataGetData(pColData, j);
6,167,676!
775
        tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
6,167,676✔
776

777
        if (ts < earlyTs) {
6,192,356!
778
          tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
×
779
                  ts, earlyTs);
780
          taosArrayDestroy(pTableData->aRowP);
×
781
          pTableData->aRowP = NULL;
×
782
          taosArrayDestroy(pVals);
×
783
          return TSDB_CODE_SUCCESS;
×
784
        }
785
      }
786

787
      if (IS_SET_NULL(pCol)) {
53,284,553✔
788
        if (pCol->flags & COL_IS_KEY) {
670!
789
          qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
×
790
                 pCol->colId, pCol->type);
791
          break;
×
792
        }
793
        SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
670✔
794
        void* p = taosArrayPush(pVals, &cv);
670✔
795
        if (p == NULL) {
670!
796
          return terrno;
×
797
        }
798
      } else {
799
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
53,283,883✔
800
        if (pColData == NULL) {
53,280,765!
801
          continue;
×
802
        }
803

804
        if (colDataIsNull_s(pColData, j)) {
106,561,530✔
805
          if (pCol->flags & COL_IS_KEY) {
5,902,533!
806
            qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
×
807
                   ts, pCol->colId, pCol->type);
808
            break;
×
809
          }
810

811
          SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
5,902,533✔
812
          void* p = taosArrayPush(pVals, &cv);
5,902,541✔
813
          if (p == NULL) {
5,902,541!
814
            return terrno;
×
815
          }
816

817
          dataIndex++;
5,902,541✔
818
        } else {
819
          void* colData = colDataGetData(pColData, j);
47,378,232!
820
          if (IS_VAR_DATA_TYPE(pCol->type)) {  // address copy, no value
53,181,471!
821
            SValue sv =
5,800,530✔
822
                (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
5,800,530✔
823
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
5,800,530✔
824
            void* p = taosArrayPush(pVals, &cv);
5,803,239✔
825
            if (p == NULL) {
5,803,239!
826
              return terrno;
×
827
            }
828
          } else {
829
            SValue sv = {.type = pCol->type};
41,577,702✔
830
            valueSetDatum(&sv, pCol->type, colData, tDataTypes[pCol->type].bytes);
41,577,702✔
831
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
41,577,250✔
832
            void* p = taosArrayPush(pVals, &cv);
41,584,017✔
833
            if (p == NULL) {
41,584,017!
834
              return terrno;
×
835
            }
836
          }
837
          dataIndex++;
47,387,256✔
838
        }
839
      }
840
    }
841

842
    SRow* pRow = NULL;
6,198,346✔
843
    code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
6,198,346✔
844
    if (code != TSDB_CODE_SUCCESS) {
6,168,493!
845
      tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
×
846
      taosArrayDestroy(pVals);
×
847
      tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
×
848
      return code;
×
849
    }
850

851
    void* p = taosArrayPush(pTableData->aRowP, &pRow);
6,168,493✔
852
    if (p == NULL) {
6,168,004!
853
      return terrno;
×
854
    }
855
  }
856

857
  taosArrayDestroy(pVals);
8,517✔
858
  return TSDB_CODE_SUCCESS;
8,401✔
859
}
860

861
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
2,341✔
862
                                 const char* dstTableName, int64_t* uid) {
863
  int32_t     vgId = TD_VID(pVnode);
2,341✔
864
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
2,341✔
865
  const char* id = pTask->id.idStr;
2,341✔
866
  int32_t     timeout = 60;  // 1min
2,341✔
867
  int64_t     start = taosGetTimestampSec();
2,341✔
868

869
  while (pTableSinkInfo->uid == 0) {
3,539!
870
    if (streamTaskShouldStop(pTask)) {
3,539✔
871
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
4!
872
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
2,341✔
873
    }
874

875
    int64_t waitingDuration = taosGetTimestampSec() - start;
3,535✔
876
    if (waitingDuration > timeout) {
3,535!
877
      tqError("s-task:%s wait for table-creating:%s more than %dsec, failed", id, dstTableName, timeout);
×
878
      return  TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
879
    }
880

881
    // wait for the table to be created
882
    SMetaReader mr = {0};
3,535✔
883
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
3,535✔
884

885
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
3,535✔
886
    if (code == 0) {  // table already exists, check its type and uid
3,535✔
887
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
2,337✔
888
      if (isValid) {  // not valid table, ignore it
2,337!
889
        tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
2,337✔
890
        // set the destination table uid
891
        (*uid) = mr.me.uid;
2,337✔
892
        pTableSinkInfo->uid = mr.me.uid;
2,337✔
893
      }
894

895
      metaReaderClear(&mr);
2,337✔
896
      return terrno;
2,337✔
897
    } else {  // not exist, wait and retry
898
      metaReaderClear(&mr);
1,198✔
899
      taosMsleep(100);
1,198✔
900
      tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
1,198!
901
    }
902
  }
903

904
  return TSDB_CODE_SUCCESS;
×
905
}
906

907
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
×
908
  int32_t vgId = TD_VID(pVnode);
×
909
  int64_t suid = pTask->outputInfo.tbSink.stbUid;
×
910
  const char* id = pTask->id.idStr;
×
911

912
  while (1) {
×
913
    if (streamTaskShouldStop(pTask)) {
×
914
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s drop", id, dstTableName);
×
915
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
×
916
    }
917
    SMetaReader mr = {0};
×
918
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
×
919
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
×
920
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
921
      metaReaderClear(&mr);
×
922
      break;
×
923
    } else if (TSDB_CODE_SUCCESS == code) {
×
924
      if (isValidDstChildTable(&mr, vgId, dstTableName, suid)) {
×
925
        metaReaderClear(&mr);
×
926
        taosMsleep(100);
×
927
        tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
×
928
      } else {
929
        metaReaderClear(&mr);
×
930
        break;
×
931
      }
932
    } else {
933
      tqError("s-task:%s failed to wait for table:%s drop", id, dstTableName);
×
934
      metaReaderClear(&mr);
×
935
      return terrno;
×
936
    }
937
  }
938
  return TSDB_CODE_SUCCESS;
×
939
}
940

941
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
2,794✔
942
  int32_t nameLen = strlen(pDstTableName);
2,794✔
943
  (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
2,794!
944
  if (*pInfo == NULL) {
2,794!
945
    return terrno;
×
946
  }
947

948
  (*pInfo)->name.len = nameLen;
2,794✔
949
  memcpy((*pInfo)->name.data, pDstTableName, nameLen);
2,794✔
950
  return TSDB_CODE_SUCCESS;
2,794✔
951
}
952

953
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
7,921✔
954
                           SSubmitTbData* pTableData) {
955
  uint64_t        groupId = pDataBlock->info.id.groupId;
7,921✔
956
  char*           dstTableName = pDataBlock->info.parTbName;
7,921✔
957
  int32_t         numOfRows = pDataBlock->info.rows;
7,921✔
958
  const char*     id = pTask->id.idStr;
7,921✔
959
  int64_t         suid = pTask->outputInfo.tbSink.stbUid;
7,921✔
960
  STSchema*       pTSchema = pTask->outputInfo.tbSink.pTSchema;
7,921✔
961
  int32_t         vgId = TD_VID(pVnode);
7,921✔
962
  STableSinkInfo* pTableSinkInfo = NULL;
7,921✔
963
  int32_t         code = 0;
7,921✔
964

965
  bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo);
7,921✔
966

967
  if (alreadyCached) {
7,922✔
968
    if (dstTableName[0] == 0) {  // data block does not set the destination table name
7,026✔
969
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
1,012✔
970
      tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
1,012!
971
              dstTableName);
972
    } else {
973
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
6,014✔
974
      if (pTableSinkInfo->uid != 0) {
6,014✔
975
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
3,871✔
976
                dstTableName, pTableSinkInfo->uid);
977
      } else {
978
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
2,143✔
979
                id, numOfRows, groupId, dstTableName);
980
      }
981
    }
982
  } else {  // this groupId has not been kept in cache yet
983
    if (dstTableName[0] == 0) {
896✔
984
      memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
313✔
985
      code = buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
313✔
986
      if (code) {
313!
987
        tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
×
988
        return code;
×
989
      } else {
990
        tqDebug("s-task:%s no table name given, generated sub-table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
313!
991
      }
992
    } else {
993
      if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
583!
994
          !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
×
995
        tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
×
996
        if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
997
          code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
998
        } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER && stbFullName) {
×
999
          code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
1000
        }
1001
        if (code != TSDB_CODE_SUCCESS) {
×
1002
          return code;
×
1003
        }
1004
      }
1005
    }
1006

1007
    code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo);
896✔
1008
    if (code == 0) {
896!
1009
      tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s, groupId:%" PRId64, id, dstTableName, groupId);
896✔
1010
    } else {
1011
      tqDebug("s-task:%s failed to build new sinkTableInfo, dstTable:%s, groupId:%" PRId64, id, dstTableName, groupId);
×
1012
      return code;
×
1013
    }
1014
  }
1015

1016
  if (alreadyCached) {
7,922✔
1017
    pTableData->uid = pTableSinkInfo->uid;
7,026✔
1018

1019
    if (pTableData->uid == 0) {
7,026✔
1020
      tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
2,341✔
1021
      return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
2,341✔
1022
    } else {
1023
      tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
4,685✔
1024
    }
1025
  } else {
1026
    // The auto-create option will always set to be open for those submit messages, which arrives during the period
1027
    // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
1028
    // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
1029
    // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
1030
    // randomly generated, but false table uid in the WAL.
1031
    SMetaReader mr = {0};
896✔
1032
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
896✔
1033

1034
    // table not in cache, let's try to extract it from tsdb meta
1035
    if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
896✔
1036
      metaReaderClear(&mr);
846✔
1037

1038
      if (pTask->outputInfo.tbSink.autoCreateCtb) {
846!
1039
        tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
846✔
1040

1041
        SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
846✔
1042
        if (pTagArray == NULL) {
846!
1043
          tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId,
×
1044
                  tstrerror(terrno));
1045
          return terrno;
×
1046
        }
1047

1048
        pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
846✔
1049
        code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
1,692✔
1050
                                       IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq, id);
846!
1051
        taosArrayDestroy(pTagArray);
846✔
1052

1053
        if (code) {
846!
1054
          tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName, tstrerror(code));
×
1055
          taosMemoryFree(pTableSinkInfo);
×
1056
          return code;
×
1057
        }
1058

1059
        pTableSinkInfo->uid = 0;
846✔
1060
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
846✔
1061
      } else {
1062
        tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
×
1063
                dstTableName);
1064
        return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1065
      }
1066
    } else {
1067
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
50✔
1068
      if (!isValid) {
50!
1069
        metaReaderClear(&mr);
×
1070
        taosMemoryFree(pTableSinkInfo);
×
1071
        tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
×
1072
                dstTableName);
1073
        return terrno;
×
1074
      } else {
1075
        pTableData->uid = mr.me.uid;
50✔
1076
        pTableSinkInfo->uid = mr.me.uid;
50✔
1077

1078
        metaReaderClear(&mr);
50✔
1079
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
50✔
1080
      }
1081
    }
1082
  }
1083

1084
  return code;
5,581✔
1085
}
1086

1087
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
8,401✔
1088
                                 SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
1089
  int32_t numOfRows = pDataBlock->info.rows;
8,401✔
1090
  char*   dstTableName = pDataBlock->info.parTbName;
8,401✔
1091

1092
  tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
8,401✔
1093
          blockIndex + 1, numOfRows, suid);
1094

1095
  // convert all rows
1096
  int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id);
8,401✔
1097
  if (code != TSDB_CODE_SUCCESS) {
8,401!
1098
    tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
×
1099
    return code;
×
1100
  }
1101

1102
  if (pTableData->aRowP != NULL) {
8,401✔
1103
    taosArraySort(pTableData->aRowP, tsAscendingSortFn);
8,399✔
1104
    tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
8,399✔
1105
  }
1106

1107
  return code;
8,399✔
1108
}
1109

1110
int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) {
5,857✔
1111
  int32_t          code = TSDB_CODE_SUCCESS;
5,857✔
1112
  const char*      id = pTask->id.idStr;
5,857✔
1113
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
5,857✔
1114
  int32_t          vgId = pTask->pMeta->vgId;
5,857✔
1115

1116
  if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
5,857✔
1117
    SMetaReader mer1 = {0};
1,655✔
1118
    metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
1,655✔
1119

1120
    code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
1,655✔
1121
    if (code != TSDB_CODE_SUCCESS) {
1,655!
1122
      tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
×
1123
      metaReaderClear(&mer1);
×
1124
      return code;
×
1125
    }
1126

1127
    pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
1,655✔
1128
    metaReaderClear(&mer1);
1,655✔
1129

1130
    if (pOutputInfo->tbSink.pTagSchema == NULL) {
1,655!
1131
      tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno));
×
1132
      return terrno;
×
1133
    }
1134

1135
    SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
1,655✔
1136
    SSchema*        pCol1 = &pTagSchema->pSchema[0];
1,655✔
1137
    if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
1,655!
1138
      pOutputInfo->tbSink.autoCreateCtb = true;
1,047✔
1139
    } else {
1140
      pOutputInfo->tbSink.autoCreateCtb = false;
608✔
1141
    }
1142
  }
1143

1144
  return code;
5,857✔
1145
}
1146

1147
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
5,856✔
1148
  const SArray*    pBlocks = (const SArray*)data;
5,856✔
1149
  SVnode*          pVnode = (SVnode*)vnode;
5,856✔
1150
  int64_t          suid = pTask->outputInfo.tbSink.stbUid;
5,856✔
1151
  char*            stbFullName = pTask->outputInfo.tbSink.stbFullName;
5,856✔
1152
  STSchema*        pTSchema = pTask->outputInfo.tbSink.pTSchema;
5,856✔
1153
  int32_t          vgId = TD_VID(pVnode);
5,856✔
1154
  int32_t          numOfBlocks = taosArrayGetSize(pBlocks);
5,856✔
1155
  int32_t          code = TSDB_CODE_SUCCESS;
5,858✔
1156
  const char*      id = pTask->id.idStr;
5,858✔
1157
  int64_t          earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
5,858✔
1158
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
5,858✔
1159

1160
  code = checkTagSchema(pTask, pVnode);
5,858✔
1161
  if (code != TSDB_CODE_SUCCESS) {
5,858!
1162
    return;
×
1163
  }
1164

1165
  code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode);
5,858✔
1166
  if (code != TSDB_CODE_SUCCESS) {
5,858!
1167
    tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id);
×
1168
    // continue processing even if notification fails
1169
  }
1170

1171
  bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
5,858✔
1172
  if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
5,858✔
1173
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has other type block, submit one-by-one", vgId,
2,493!
1174
            id, numOfBlocks);
1175

1176
    for (int32_t i = 0; i < numOfBlocks; ++i) {
9,900✔
1177
      if (streamTaskShouldStop(pTask)) {
7,408✔
1178
        return;
1✔
1179
      }
1180

1181
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
7,407✔
1182
      if (pDataBlock == NULL) {
7,407!
1183
        continue;
×
1184
      }
1185

1186
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
7,407✔
1187
        code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
1,006✔
1188
      } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
6,401✔
1189
        code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
2,464✔
1190
      } else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
3,937!
1191
        continue;
×
1192
      } else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
3,937!
1193
        code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
×
1194
      } else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
3,937!
1195
        continue;
×
1196
      } else {
1197
        code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
3,937✔
1198
      }
1199
    }
1200
  } else {
1201
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
3,365✔
1202
    if (streamTaskShouldStop(pTask)) {
3,365!
1203
      return;
×
1204
    }
1205

1206
    rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
3,365✔
1207
  }
1208
}
1209

1210
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1211
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
5,858✔
1212
  for (int32_t i = 0; i < numOfBlocks; ++i) {
11,357✔
1213
    SSDataBlock* p = taosArrayGet(pBlocks, i);
7,663✔
1214
    if (p == NULL) {
7,663!
1215
      continue;
×
1216
    }
1217

1218
    if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
7,663✔
1219
      return false;
2,164✔
1220
    }
1221
  }
1222

1223
  return true;
3,694✔
1224
}
1225

1226
int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
2,794✔
1227
  int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
2,794✔
1228
  if (code != TSDB_CODE_SUCCESS) {
2,794!
1229
    taosMemoryFreeClear(pTableSinkInfo);
×
1230
  } else {
1231
    tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
2,794✔
1232
            pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
1233
  }
1234

1235
  return code;
2,794✔
1236
}
1237

1238
bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
10,385✔
1239
  void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
10,385✔
1240
  if (pVal) {
10,386✔
1241
    *pInfo = *(STableSinkInfo**)pVal;
7,592✔
1242
    return true;
7,592✔
1243
  }
1244

1245
  return false;
2,794✔
1246
}
1247

1248
int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
×
1249
  if (tSimpleHashGetSize(pSinkTableMap) == 0) {
×
1250
    return TSDB_CODE_SUCCESS;
×
1251
  }
1252

1253
  int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
×
1254
  if (code == 0) {
×
1255
    tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
×
1256
  } else {
1257
    tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
×
1258
  }
1259
  return code;
×
1260
}
1261

1262
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
1,006✔
1263
                                int64_t suid) {
1264
  SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
1,006✔
1265
  if (deleteReq.deleteReqs == NULL) {
1,006!
1266
    return terrno;
×
1267
  }
1268

1269
  int32_t code =
1270
      tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask));
1,006!
1271
  if (code != TSDB_CODE_SUCCESS) {
1,006!
1272
    return code;
×
1273
  }
1274

1275
  if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
1,006!
1276
    taosArrayDestroy(deleteReq.deleteReqs);
×
1277
    return TSDB_CODE_SUCCESS;
×
1278
  }
1279

1280
  int32_t len;
1281
  tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
1,006!
1282
  if (code != TSDB_CODE_SUCCESS) {
1,006!
1283
    qError("s-task:%s failed to encode delete request", pTask->id.idStr);
×
1284
    return code;
×
1285
  }
1286

1287
  SEncoder encoder = {0};
1,006✔
1288
  void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
1,006✔
1289
  void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
1,006✔
1290
  tEncoderInit(&encoder, abuf, len);
1,006✔
1291
  code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
1,006✔
1292
  tEncoderClear(&encoder);
1,006✔
1293
  taosArrayDestroy(deleteReq.deleteReqs);
1,006✔
1294

1295
  if (code) {
1,006!
1296
    return code;
×
1297
  }
1298

1299
  ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
1,006✔
1300

1301
  SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
1,006✔
1302
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
1,006!
1303
    tqDebug("failed to put delete req into write-queue since %s", terrstr());
×
1304
  }
1305

1306
  return TSDB_CODE_SUCCESS;
1,006✔
1307
}
1308

1309
void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
3,365✔
1310
  int32_t     code = 0;
3,365✔
1311
  const char* id = pTask->id.idStr;
3,365✔
1312
  int32_t     vgId = pTask->pMeta->vgId;
3,365✔
1313
  int32_t     numOfBlocks = taosArrayGetSize(pBlocks);
3,365✔
1314
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
3,365✔
1315
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
3,365✔
1316
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
3,365✔
1317

1318
  SHashObj* pTableIndexMap =
1319
      taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
3,365✔
1320

1321
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
3,365✔
1322
  if (submitReq.aSubmitTbData == NULL) {
3,365!
1323
    code = terrno;
×
1324
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
×
1325
    taosHashCleanup(pTableIndexMap);
×
1326
    return;
×
1327
  }
1328

1329
  bool hasSubmit = false;
3,365✔
1330
  for (int32_t i = 0; i < numOfBlocks; i++) {
7,834✔
1331
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
4,467✔
1332
    if (pDataBlock == NULL) {
4,467!
1333
      continue;
1✔
1334
    }
1335

1336
    if (pDataBlock->info.type == STREAM_CHECKPOINT) {
4,467!
1337
      continue;
×
1338
    }
1339

1340
    if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
4,467!
1341
      continue;
×
1342
    }
1343

1344
    hasSubmit = true;
4,467✔
1345
    pTask->execInfo.sink.numOfBlocks += 1;
4,467✔
1346
    uint64_t groupId = pDataBlock->info.id.groupId;
4,467✔
1347

1348
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
4,467✔
1349

1350
    int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
4,467✔
1351
    if (index == NULL) {  // no data yet, append it
4,467✔
1352
      code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
3,984✔
1353
      if (code != TSDB_CODE_SUCCESS) {
3,985!
1354
        tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
×
1355
        continue;
1✔
1356
      }
1357

1358
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
3,985✔
1359
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
3,984!
1360
        if (tbData.pCreateTbReq != NULL) {
1!
1361
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1362
          (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id);
×
1363
          tbData.pCreateTbReq = NULL;
×
1364
        }
1365
        continue;
1✔
1366
      }
1367

1368
      void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
3,983✔
1369
      if (p == NULL) {
3,983!
1370
        tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
×
1371
        continue;
×
1372
      }
1373

1374
      int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
3,983✔
1375
      code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
3,985✔
1376
      if (code) {
3,985!
1377
        tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
×
1378
        continue;
×
1379
      }
1380
    } else {
1381
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
483✔
1382
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
483!
1383
        if (tbData.pCreateTbReq != NULL) {
×
1384
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1385
          tbData.pCreateTbReq = NULL;
×
1386
        }
1387
        continue;
×
1388
      }
1389

1390
      SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
483✔
1391
      if (pExisted == NULL) {
483!
1392
        continue;
×
1393
      }
1394

1395
      code = doMergeExistedRows(pExisted, &tbData, id);
483✔
1396
      if (code != TSDB_CODE_SUCCESS) {
483!
1397
        continue;
×
1398
      }
1399
    }
1400

1401
    pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
4,468✔
1402
  }
1403

1404
  taosHashCleanup(pTableIndexMap);
3,367✔
1405

1406
  if (hasSubmit) {
3,365✔
1407
    code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
3,363✔
1408
    if (code) {  // failed and continue
3,365!
1409
      tqError("vgId:%d failed to build and send submit msg", vgId);
×
1410
    }
1411
  } else {
1412
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
2✔
1413
    tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
×
1414
  }
1415
}
1416

1417
int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) {
3,937✔
1418
  int32_t     code = 0;
3,937✔
1419
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
3,937✔
1420
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
3,937✔
1421
  const char* id = pTask->id.idStr;
3,937✔
1422
  int32_t     vgId = TD_VID(pVnode);
3,937✔
1423
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
3,937✔
1424

1425
  pTask->execInfo.sink.numOfBlocks += 1;
3,937✔
1426

1427
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
3,937✔
1428
  if (submitReq.aSubmitTbData == NULL) {
3,937!
1429
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno));
×
1430
    return terrno;
×
1431
  }
1432

1433
  SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
3,937✔
1434
  code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
3,937✔
1435
  if (code != TSDB_CODE_SUCCESS) {
3,937✔
1436
    tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
4!
1437
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
4✔
1438
    return code;
4✔
1439
  }
1440

1441
  code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id);
3,933✔
1442
  if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
3,933!
1443
    if (tbData.pCreateTbReq != NULL) {
×
1444
      tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1445
      (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id);
×
1446
      tbData.pCreateTbReq = NULL;
×
1447
    }
1448

1449
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1450
    return code;
×
1451
  }
1452

1453
  void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
3,933✔
1454
  if (p == NULL) {
3,933!
1455
    tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
×
1456
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1457
    return terrno;
×
1458
  }
1459

1460
  code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
3,933✔
1461
  if (code) {  // failed and continue
3,933!
1462
    tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code));
×
1463
  }
1464

1465
  return code;
3,933✔
1466
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc