• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3842

07 Apr 2025 11:21AM UTC coverage: 62.696% (-0.3%) from 63.027%
#3842

push

travis-ci

web-flow
merge: from main to 3.0 branch (#30679)

154855 of 315075 branches covered (49.15%)

Branch coverage included in aggregate %.

6 of 8 new or added lines in 5 files covered. (75.0%)

2309 existing lines in 130 files now uncovered.

240176 of 314995 relevant lines covered (76.25%)

19119980.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.81
/source/dnode/vnode/src/tq/tqSink.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcommon.h"
17
#include "tq.h"
18

19
typedef struct STableSinkInfo {
20
  uint64_t uid;
21
  tstr     name;
22
} STableSinkInfo;
23

24
static bool    hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
25
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
26
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
27
                                  SSubmitTbData* pTableData);
28
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
29
                                       int64_t suid);
30
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
31
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
32
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
33
                             int64_t earlyTs, const char* id);
34
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
35
                                        const char* dstTableName, int64_t* uid);
36

37
static bool    isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
38
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
39
                                  int32_t numOfTags);
40
static int32_t createDefaultTagColName(SArray** pColNameList);
41
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
42
                                          const char* stbFullName, int64_t gid, bool newSubTableRule, const char* id);
43
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
44
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
45
                                           const char* id);
46
static bool    doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
47
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
48
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
49
static void    rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
50
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
51
                                    int64_t earlyTs);
52
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);
53

54
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
2,549✔
55
                         const char* pIdStr, bool newSubTableRule) {
56
  int32_t          totalRows = pDataBlock->info.rows;
2,549✔
57
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
2,549✔
58
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
2,549✔
59
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
2,549✔
60
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
2,549✔
61

62
  if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
2,548!
63
    return terrno;
×
64
  }
65

66
  tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
2,548✔
67

68
  for (int32_t row = 0; row < totalRows; row++) {
6,110✔
69
    int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
3,563!
70
    int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
3,563!
71
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
3,563!
72

73
    char* name = NULL;
3,563✔
74
    char* originName = NULL;
3,563✔
75
    void* varTbName = NULL;
3,563✔
76
    if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
7,126✔
77
      varTbName = colDataGetVarData(pTbNameCol, row);
1,929✔
78
    }
79

80
    if (varTbName != NULL && varTbName != (void*)-1) {
5,477!
81
      size_t cap = TMAX(TSDB_TABLE_NAME_LEN, varDataLen(varTbName) + 1);
1,914✔
82
      name = taosMemoryMalloc(cap);
1,914!
83
      if (name == NULL) {
1,914!
84
        return terrno;
×
85
      }
86

87
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
1,914✔
88
      name[varDataLen(varTbName)] = '\0';
1,914✔
89
      if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
1,914!
90
          stbFullName) {
91
        int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
1,751✔
92
        if (code != TSDB_CODE_SUCCESS) {
1,752!
93
          return code;
×
94
        }
95
      }
96
    } else if (stbFullName) {
1,649✔
97
      int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name);
1,034✔
98
      if (code) {
1,034!
99
        return code;
×
100
      }
101
    } else {
102
      originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
615!
103
      if (originName == NULL) {
616!
104
        return terrno;
×
105
      }
106

107
      if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) {
616!
108
        name = varDataVal(originName);
616✔
109
      }
110
    }
111

112
    if (!name || *name == '\0') {
3,564!
113
      tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
1!
114
             " since invalid tbname:%s",
115
             pIdStr, groupId, skey, ekey, name ? name : "NULL");
116
    } else {
117
      tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr,
3,563✔
118
              groupId, name, skey, ekey);
119

120
      SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
3,563✔
121
      tstrncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
3,563✔
122
      void* p = taosArrayPush(deleteReq->deleteReqs, &req);
3,563✔
123
      if (p == NULL) {
3,563!
124
        return terrno;
×
125
      }
126
    }
127

128
    if (originName) {
3,563✔
129
      name = originName;
616✔
130
    }
131

132
    taosMemoryFreeClear(name);
3,563!
133
  }
134

135
  return 0;
2,547✔
136
}
137

138
static int32_t encodeCreateChildTableForRPC(void* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
7,301✔
139
  int32_t ret = 0;
7,301✔
140

141
  tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
7,301!
142
  if (ret < 0) {
7,298!
143
    ret = -1;
×
144
    goto end;
×
145
  }
146
  *contLen += sizeof(SMsgHead);
7,298✔
147
  *pBuf = rpcMallocCont(*contLen);
7,298✔
148
  if (NULL == *pBuf) {
7,298!
149
    ret = -1;
×
150
    goto end;
×
151
  }
152
  ((SMsgHead*)(*pBuf))->vgId = vgId;
7,298✔
153
  ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
7,298✔
154
  SEncoder coder = {0};
7,298✔
155
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
7,298✔
156
  if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
7,299!
157
    rpcFreeCont(*pBuf);
×
158
    *pBuf = NULL;
×
159
    *contLen = 0;
×
160
    tEncoderClear(&coder);
×
161
    ret = -1;
×
162
    goto end;
×
163
  }
164
  tEncoderClear(&coder);
7,301✔
165

166
end:
7,302✔
167
  return ret;
7,302✔
168
}
169

170
static int32_t encodeDropChildTableForRPC(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen) {
55✔
171
  int32_t  code = 0;
55✔
172
  SEncoder ec = {0};
55✔
173
  tEncodeSize(tEncodeSVDropTbBatchReq, pReqs, *contLen, code);
55!
174
  if (code < 0) {
55!
175
    code = TSDB_CODE_INVALID_MSG;
×
176
    goto end;
×
177
  }
178
  *contLen += sizeof(SMsgHead);
55✔
179
  *ppBuf = rpcMallocCont(*contLen);
55✔
180

181
  if (!*ppBuf) {
55!
182
    code = terrno;
×
183
    goto end;
×
184
  }
185

186
  ((SMsgHead*)(*ppBuf))->vgId = vgId;
55✔
187
  ((SMsgHead*)(*ppBuf))->contLen = htonl(*contLen);
55✔
188

189
  tEncoderInit(&ec, POINTER_SHIFT(*ppBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
55✔
190
  code = tEncodeSVDropTbBatchReq(&ec, pReqs);
55✔
191
  tEncoderClear(&ec);
55✔
192
  if (code < 0) {
55!
193
    rpcFreeCont(*ppBuf);
×
194
    *ppBuf = NULL;
×
195
    *contLen = 0;
×
196
    code = TSDB_CODE_INVALID_MSG;
×
197
    goto end;
×
198
  }
199
end:
55✔
200
  return code;
55✔
201
}
202

203
static int32_t tqPutReqToQueue(SVnode* pVnode, void* pReqs, int32_t(*encoder)(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen), tmsg_t msgType) {
7,356✔
204
  void*   buf = NULL;
7,356✔
205
  int32_t tlen = 0;
7,356✔
206

207
  int32_t code = encoder(pReqs, TD_VID(pVnode), &buf, &tlen);
7,356✔
208
  if (code) {
7,357!
209
    tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", TD_VID(pVnode), tstrerror(code));
×
210
    return code;
×
211
  }
212

213
  SRpcMsg msg = {.msgType = msgType, .pCont = buf, .contLen = tlen};
7,357✔
214
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
7,357✔
215
  if (code) {
7,355!
216
    tqError("failed to put into write-queue since %s", terrstr());
×
217
  }
218

219
  return code;
7,355✔
220
}
221

222
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
18,389✔
223
  pCreateTableReq->flags = 0;
18,389✔
224
  pCreateTableReq->type = TSDB_CHILD_TABLE;
18,389✔
225
  pCreateTableReq->ctb.suid = suid;
18,389✔
226

227
  // set super table name
228
  SName name = {0};
18,389✔
229

230
  int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
18,389✔
231
  if (code == 0) {
18,392!
232
    pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
18,392!
233
    if (pCreateTableReq->ctb.stbName == NULL) {  // ignore this error code
18,391!
234
      tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
×
235
      code = terrno;
×
236
    }
237
  }
238

239
  pCreateTableReq->ctb.tagNum = numOfTags;
18,391✔
240
  return code;
18,391✔
241
}
242

243
int32_t createDefaultTagColName(SArray** pColNameList) {
13,926✔
244
  *pColNameList = NULL;
13,926✔
245

246
  SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
13,926✔
247
  if (pTagColNameList == NULL) {
13,927!
248
    return terrno;
×
249
  }
250

251
  char  tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
13,927✔
252
  void* p = taosArrayPush(pTagColNameList, tagNameStr);
13,928✔
253
  if (p == NULL) {
13,928!
254
    taosArrayDestroy(pTagColNameList);
×
255
    return terrno;
×
256
  }
257

258
  *pColNameList = pTagColNameList;
13,928✔
259
  return TSDB_CODE_SUCCESS;
13,928✔
260
}
261

262
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
18,388✔
263
                                   int64_t gid, bool newSubTableRule, const char* id) {
264
  if (pDataBlock->info.parTbName[0]) {
18,388✔
265
    if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
18,370✔
266
        !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
3,497!
267
      pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
24!
268
      if (pCreateTableReq->name == NULL) {
24!
269
        return terrno;
×
270
      }
271

272
      tstrncpy(pCreateTableReq->name, pDataBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
24✔
273
      int32_t code = buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid, TSDB_TABLE_NAME_LEN);
24✔
274
      if (code != TSDB_CODE_SUCCESS) {
24!
275
        return code;
×
276
      }
277
      tqDebug("s-task:%s gen name from:%s blockdata", id, pDataBlock->info.parTbName);
24!
278
    } else {
279
      pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
18,348!
280
      if (pCreateTableReq->name == NULL) {
18,344!
281
        return terrno;
×
282
      }
283
      tqDebug("s-task:%s copy name:%s from blockdata", id, pDataBlock->info.parTbName);
18,344✔
284
    }
285
  } else {
286
    int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
18✔
287
    tqDebug("s-task:%s no name in blockdata, auto-created table name:%s", id, pCreateTableReq->name);
19!
288
    return code;
19✔
289
  }
290

291
  return 0;
18,368✔
292
}
293

294
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
7,297✔
295
                                            SStreamTask* pTask, int64_t suid) {
296
  STSchema*          pTSchema = pTask->outputInfo.tbSink.pTSchema;
7,297✔
297
  int32_t            rows = pDataBlock->info.rows;
7,297✔
298
  SArray*            tagArray = NULL;
7,297✔
299
  const char*        id = pTask->id.idStr;
7,297✔
300
  int32_t            vgId = pTask->pMeta->vgId;
7,297✔
301
  int32_t            code = 0;
7,297✔
302
  STableSinkInfo*    pInfo = NULL;
7,297✔
303
  SVCreateTbBatchReq reqs = {0};
7,297✔
304
  SArray*            crTblArray = NULL;
7,297✔
305

306
  tqDebug("s-task:%s build create %d table(s) msg", id, rows);
7,297✔
307

308
  tagArray = taosArrayInit(4, sizeof(STagVal));
7,297✔
309
  crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
7,299✔
310
  if ((NULL == reqs.pArray) || (tagArray == NULL)) {
7,299!
UNCOV
311
    tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
×
UNCOV
312
    code = terrno;
×
313
    goto _end;
×
314
  }
315

316
  for (int32_t rowId = 0; rowId < rows; rowId++) {
14,600✔
317
    SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
7,298✔
318

319
    int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
7,298✔
320
    int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
7,299✔
321

322
    code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
7,299✔
323
    if (code) {
7,302!
324
      tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
×
325
      continue;
×
326
    }
327

328
    taosArrayClear(tagArray);
7,302✔
329

330
    if (size == 2) {
7,300✔
331
      STagVal tagVal = {
2,837✔
332
          .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
2,837✔
333

334
      void* p = taosArrayPush(tagArray, &tagVal);
2,837✔
335
      if (p == NULL) {
2,837!
336
        return terrno;
×
337
      }
338

339
      code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
2,837✔
340
      if (code) {
2,837!
341
        return code;
×
342
      }
343
    } else {
344
      for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
47,563✔
345
        SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
43,095✔
346
        if (pTagData == NULL) {
43,093!
347
          continue;
18,729✔
348
        }
349

350
        STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
43,093✔
351
        void*   pData = colDataGetData(pTagData, rowId);
43,093!
352
        if (colDataIsNull_s(pTagData, rowId)) {
86,186!
353
          continue;
18,729✔
354
        } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
24,364!
355
          tagVal.nData = varDataLen(pData);
7,781✔
356
          tagVal.pData = (uint8_t*)varDataVal(pData);
7,781✔
357
        } else {
358
          memcpy(&tagVal.i64, pData, pTagData->info.bytes);
16,583✔
359
        }
360
        void* p = taosArrayPush(tagArray, &tagVal);
24,371✔
361
        if (p == NULL) {
24,371!
362
          code = terrno;
×
363
          goto _end;
×
364
        }
365
      }
366
    }
367

368
    code = tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
7,305✔
369
    taosArrayDestroy(tagArray);
7,300✔
370
    tagArray = NULL;
7,298✔
371

372
    if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
7,298!
373
      tdDestroySVCreateTbReq(pCreateTbReq);
374
      code = TSDB_CODE_OUT_OF_MEMORY;
×
375
      goto _end;
×
376
    }
377

378
    uint64_t gid = pDataBlock->info.id.groupId;
7,298✔
379
    if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
7,298!
380
      SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
7,299✔
381
      if (pGpIdColInfo == NULL) {
7,299!
382
        continue;
×
383
      }
384

385
      // todo remove this
386
      void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
7,299!
387
      if (gid != *(int64_t*)pGpIdData) {
7,299!
388
        tqError("s-task:%s vgId:%d invalid groupId:%" PRId64 " actual:%" PRId64 " in sink task", id, vgId, gid,
×
389
                *(int64_t*)pGpIdData);
390
      }
391
    }
392

393
    code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask),
7,298!
394
                                      pTask->id.idStr);
395
    if (code) {
7,296!
396
      goto _end;
×
397
    }
398

399
    void* p = taosArrayPush(reqs.pArray, pCreateTbReq);
7,296✔
400
    if (p == NULL) {
7,296!
401
      code = terrno;
×
402
      goto _end;
×
403
    }
404

405
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo);
7,296✔
406
    if (!alreadyCached) {
7,299✔
407
      code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo);
6,123✔
408
      if (code) {
6,125!
409
        tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
×
410
        continue;
×
411
      }
412

413
      code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id);
6,125✔
414
      if (code) {
6,125!
415
        tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
×
416
      }
417
    }
418

419
    tqDebug("s-task:%s build create table:%s msg complete", id, pCreateTbReq->name);
7,301✔
420
  }
421

422
  reqs.nReqs = taosArrayGetSize(reqs.pArray);
7,302✔
423
  code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
7,301✔
424
  if (code != TSDB_CODE_SUCCESS) {
7,300!
425
    tqError("s-task:%s failed to send create table msg, code:%s", id, tstrerror(code));
×
426
  }
427

428
_end:
7,300✔
429
  taosArrayDestroy(tagArray);
7,300✔
430
  taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
7,299✔
431
  return code;
7,297✔
432
}
433

434
static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SSDataBlock* pDataBlock,
55✔
435
                                          SStreamTask* pTask, int64_t suid) {
436
  int32_t          lino = 0;
55✔
437
  int32_t          code = 0;
55✔
438
  int32_t          rows = pDataBlock->info.rows;
55✔
439
  const char*      id = pTask->id.idStr;
55✔
440
  SVDropTbBatchReq batchReq = {0};
55✔
441
  SVDropTbReq      req = {0};
55✔
442

443
  if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
55!
444

445
  batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
55✔
446
  if (!batchReq.pArray) return terrno;
55!
447
  batchReq.nReqs = rows;
55✔
448
  req.suid = suid;
55✔
449
  req.igNotExists = true;
55✔
450

451
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
55✔
452
  char             tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
55✔
453
  int32_t          i = 0;
55✔
454
  void*            pData = colDataGetVarData(pTbNameCol, i);
55✔
455
  memcpy(tbName, varDataVal(pData), varDataLen(pData));
55✔
456
  tbName[varDataLen(pData) + 1] = 0;
55✔
457
  req.name = tbName;
55✔
458
  if (taosArrayPush(batchReq.pArray, &req) == NULL) {
110!
459
    TSDB_CHECK_CODE(terrno, lino, _exit);
×
460
  }
461

462
  SMetaReader mr = {0};
55✔
463
  metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
55✔
464

465
  code = metaGetTableEntryByName(&mr, tbName);
55✔
466
  if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
55!
467
    STableSinkInfo* pTableSinkInfo = NULL;
55✔
468
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
55✔
469
    if (alreadyCached) {
55✔
470
      pTableSinkInfo->uid = mr.me.uid;
15✔
471
    }
472
  }
473
  metaReaderClear(&mr);
55✔
474
  tqDebug("s-task:%s build drop %d table(s) msg", id, rows);
55!
475
  code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
55✔
476
  TSDB_CHECK_CODE(code, lino, _exit);
55!
477

478

479
  code = doWaitForDstTableDropped(pVnode, pTask, tbName);
55✔
480
  TSDB_CHECK_CODE(code, lino, _exit);
55!
481

482
_exit:
55✔
483
  if (batchReq.pArray) {
55!
484
    taosArrayDestroy(batchReq.pArray);
55✔
485
  }
486
  return code;
55✔
487
}
488

489
int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
17,631✔
490
  const char* id = pTask->id.idStr;
17,631✔
491
  int32_t     vgId = TD_VID(pVnode);
17,631✔
492
  int32_t     len = 0;
17,631✔
493
  void*       pBuf = NULL;
17,631✔
494
  int32_t     numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData);
17,631✔
495

496
  int32_t code = buildSubmitMsgImpl(pReq, vgId, &pBuf, &len);
17,635✔
497
  if (code != TSDB_CODE_SUCCESS) {
17,633!
498
    tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
×
499
    return code;
×
500
  }
501

502
  SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
17,633✔
503
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
17,633✔
504
  if (code == TSDB_CODE_SUCCESS) {
17,634!
505
    tqDebug("s-task:%s vgId:%d comp %d blocks into %d and send to dstTable(s) completed", id, vgId, numOfBlocks,
17,634✔
506
            numOfFinalBlocks);
507
  } else {
508
    tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
×
509
  }
510

511
  SSinkRecorder* pRec = &pTask->execInfo.sink;
17,634✔
512

513
  pRec->numOfSubmit += 1;
17,634✔
514
  if ((pRec->numOfSubmit % 1000) == 0) {
17,634✔
515
    double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0;
1✔
516
    tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
1!
517
           " submit into dst table, %.2fMiB duration:%.2f Sec.",
518
           id, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), el);
519
  }
520

521
  return TSDB_CODE_SUCCESS;
17,632✔
522
}
523

524
// merge the new submit table block with the existed blocks
525
// if ts in the new data block overlap with existed one, replace it
526
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
56,046✔
527
  int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
56,046✔
528
  int32_t newLen = taosArrayGetSize(pNew->aRowP);
56,046✔
529
  int32_t numOfPk = 0;
56,046✔
530

531
  int32_t j = 0, k = 0;
56,046✔
532
  SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
56,046✔
533
  if (pFinal == NULL) {
56,047!
534
    tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno));
×
535
    return terrno;
×
536
  }
537

538
  while (j < newLen && k < oldLen) {
2,349,436✔
539
    SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
2,293,407✔
540
    SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
2,293,407✔
541

542
    if (pNewRow->ts < pOldRow->ts) {
2,293,407✔
543
      void* p = taosArrayPush(pFinal, &pNewRow);
21✔
544
      if (p == NULL) {
21!
545
        return terrno;
×
546
      }
547
      j += 1;
21✔
548
    } else if (pNewRow->ts > pOldRow->ts) {
2,293,386✔
549
      void* p = taosArrayPush(pFinal, &pOldRow);
2,236,976✔
550
      if (p == NULL) {
2,236,976!
551
        return terrno;
×
552
      }
553

554
      k += 1;
2,236,976✔
555
    } else {
556
      // check for the existance of primary key
557
      if (pNewRow->numOfPKs == 0) {
56,412!
558
        void* p = taosArrayPush(pFinal, &pNewRow);
56,422✔
559
        if (p == NULL) {
56,422!
560
          return terrno;
×
561
        }
562

563
        k += 1;
56,422✔
564
        j += 1;
56,422✔
565
        tRowDestroy(pOldRow);
56,422✔
566
      } else {
567
        numOfPk = pNewRow->numOfPKs;
×
568

569
        SRowKey kNew, kOld;
570
        tRowGetKey(pNewRow, &kNew);
×
571
        tRowGetKey(pOldRow, &kOld);
×
572

573
        int32_t ret = tRowKeyCompare(&kNew, &kOld);
×
574
        if (ret <= 0) {
×
575
          void* p = taosArrayPush(pFinal, &pNewRow);
×
576
          if (p == NULL) {
×
577
            return terrno;
×
578
          }
579

580
          j += 1;
×
581

582
          if (ret == 0) {
×
583
            k += 1;
×
584
            tRowDestroy(pOldRow);
×
585
          }
586
        } else {
587
          void* p = taosArrayPush(pFinal, &pOldRow);
×
588
          if (p == NULL) {
×
589
            return terrno;
×
590
          }
591

592
          k += 1;
×
593
        }
594
      }
595
    }
596
  }
597

598
  while (j < newLen) {
552,580✔
599
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
496,557✔
600
    void* p = taosArrayPush(pFinal, &pRow);
496,551✔
601
    if (p == NULL) {
496,551!
602
      return terrno;
×
603
    }
604
  }
605

606
  while (k < oldLen) {
56,338✔
607
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
315✔
608
    void* p = taosArrayPush(pFinal, &pRow);
315✔
609
    if (p == NULL) {
315!
610
      return terrno;
×
611
    }
612
  }
613

614
  taosArrayDestroy(pNew->aRowP);
56,023✔
615
  taosArrayDestroy(pExisted->aRowP);
56,045✔
616
  pExisted->aRowP = pFinal;
56,046✔
617

618
  tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
56,046!
619
          (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
620
          (pNew->pCreateTbReq != NULL));
621

622
  tdDestroySVCreateTbReq(pNew->pCreateTbReq);
56,046!
623
  taosMemoryFree(pNew->pCreateTbReq);
56,046!
624
  return TSDB_CODE_SUCCESS;
56,046✔
625
}
626

627
bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) {
22,368✔
628
  if (pReader->me.type != TSDB_CHILD_TABLE) {
22,368!
629
    tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
×
630
    terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE;
×
631
    return false;
×
632
  }
633

634
  if (pReader->me.ctbEntry.suid != suid) {
22,368!
635
    tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
×
636
            ctbName, suid, pReader->me.ctbEntry.suid);
637
    terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
638
    return false;
×
639
  }
640

641
  terrno = 0;
22,368✔
642
  return true;
22,367✔
643
}
644

645
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
11,090✔
646
                                SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq, const char* id) {
647
  *pReq = NULL;
11,090✔
648

649
  SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
11,090!
650
  if (pCreateTbReq == NULL) {
11,091!
651
    return terrno;
×
652
  }
653

654
  taosArrayClear(pTagArray);
11,091✔
655
  int32_t code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
11,091✔
656
  if (code != 0) {
11,090!
657
    return code;
×
658
  }
659

660
  STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
11,090✔
661
  void* p = taosArrayPush(pTagArray, &tagVal);
11,091✔
662
  if (p == NULL) {
11,091!
663
    return terrno;
×
664
  }
665

666
  code = tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
11,091✔
667
  if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
11,089!
668
    tdDestroySVCreateTbReq(pCreateTbReq);
669
    taosMemoryFreeClear(pCreateTbReq);
×
670
    return code;
×
671
  }
672

673
  code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
11,090✔
674
  if (code) {
11,091!
675
    return code;
×
676
  }
677

678
  // set table name
679
  code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule,
11,091✔
680
                                    id);
681
  if (code) {
11,091!
682
    return code;
×
683
  }
684

685
  *pReq = pCreateTbReq;
11,091✔
686
  return code;
11,091✔
687
}
688

689
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
17,631✔
690
  int32_t code = 0;
17,631✔
691
  void*   pBuf = NULL;
17,631✔
692
  *msgLen = 0;
17,631✔
693

694
  // encode
695
  int32_t len = 0;
17,631✔
696
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
17,631!
697

698
  SEncoder encoder = {0};
17,619✔
699
  len += sizeof(SSubmitReq2Msg);
17,619✔
700

701
  pBuf = rpcMallocCont(len);
17,619✔
702
  if (NULL == pBuf) {
17,633!
703
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
704
    return terrno;
×
705
  }
706

707
  ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
17,633✔
708
  ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
17,633✔
709
  ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
17,633✔
710

711
  tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
17,630✔
712
  if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) {
17,631!
713
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
714
    tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
×
715
    tEncoderClear(&encoder);
×
716
    rpcFreeCont(pBuf);
×
717
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
718
    return code;
×
719
  }
720

721
  tEncoderClear(&encoder);
17,620✔
722
  tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
17,630✔
723

724
  *msgLen = len;
17,635✔
725
  *pMsg = pBuf;
17,635✔
726
  return TSDB_CODE_SUCCESS;
17,635✔
727
}
728

729
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
40,708,410✔
730
  SRow* pRow1 = *(SRow**)p1;
40,708,410✔
731
  SRow* pRow2 = *(SRow**)p2;
40,708,410✔
732

733
  if (pRow1->ts == pRow2->ts) {
40,708,410!
734
    return 0;
×
735
  } else {
736
    return pRow1->ts > pRow2->ts ? 1 : -1;
40,708,410!
737
  }
738
}
739

740
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs,
107,395✔
741
                      const char* id) {
742
  int32_t numOfRows = pDataBlock->info.rows;
107,395✔
743
  int32_t code = TSDB_CODE_SUCCESS;
107,395✔
744

745
  SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
107,395✔
746
  pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
107,400✔
747

748
  if (pTableData->aRowP == NULL || pVals == NULL) {
107,403!
749
    taosArrayDestroy(pTableData->aRowP);
5✔
750
    pTableData->aRowP = NULL;
×
751
    taosArrayDestroy(pVals);
×
752
    code = terrno;
×
753
    tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
×
754
    return code;
×
755
  }
756

757
  for (int32_t j = 0; j < numOfRows; j++) {
8,121,537✔
758
    taosArrayClear(pVals);
8,014,078✔
759

760
    int32_t dataIndex = 0;
8,013,825✔
761
    int64_t ts = 0;
8,013,825✔
762

763
    for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
67,059,009✔
764
      const STColumn* pCol = &pTSchema->columns[k];
58,973,207✔
765

766
      // primary timestamp column, for debug purpose
767
      if (k == 0) {
58,973,207✔
768
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
8,012,332✔
769
        if (pColData == NULL) {
8,008,204!
770
          continue;
×
771
        }
772

773
        ts = *(int64_t*)colDataGetData(pColData, j);
8,008,204!
774
        tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
8,008,204✔
775

776
        if (ts < earlyTs) {
8,024,150!
777
          tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
×
778
                  ts, earlyTs);
779
          taosArrayDestroy(pTableData->aRowP);
×
780
          pTableData->aRowP = NULL;
×
781
          taosArrayDestroy(pVals);
×
782
          return TSDB_CODE_SUCCESS;
×
783
        }
784
      }
785

786
      if (IS_SET_NULL(pCol)) {
58,985,025✔
787
        if (pCol->flags & COL_IS_KEY) {
3,891!
788
          qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
×
789
                 pCol->colId, pCol->type);
790
          break;
×
791
        }
792
        SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
3,891✔
793
        void* p = taosArrayPush(pVals, &cv);
3,891✔
794
        if (p == NULL) {
3,891!
795
          return terrno;
×
796
        }
797
      } else {
798
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
58,981,134✔
799
        if (pColData == NULL) {
58,953,531!
800
          continue;
×
801
        }
802

803
        if (colDataIsNull_s(pColData, j)) {
117,907,062✔
804
          if (pCol->flags & COL_IS_KEY) {
5,913,135!
805
            qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
×
806
                   ts, pCol->colId, pCol->type);
807
            break;
×
808
          }
809

810
          SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
5,913,135✔
811
          void* p = taosArrayPush(pVals, &cv);
5,913,207✔
812
          if (p == NULL) {
5,913,207!
813
            return terrno;
×
814
          }
815

816
          dataIndex++;
5,913,207✔
817
        } else {
818
          void* colData = colDataGetData(pColData, j);
53,040,396!
819
          if (IS_VAR_DATA_TYPE(pCol->type)) {  // address copy, no value
58,874,827!
820
            SValue sv =
5,816,143✔
821
                (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
5,816,143✔
822
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
5,816,143✔
823
            void* p = taosArrayPush(pVals, &cv);
5,834,431✔
824
            if (p == NULL) {
5,834,431!
825
              return terrno;
×
826
            }
827
          } else {
828
            SValue sv = {.type = pCol->type};
47,224,253✔
829
            valueSetDatum(&sv, pCol->type, colData, tDataTypes[pCol->type].bytes);
47,224,253✔
830
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
47,232,064✔
831
            void* p = taosArrayPush(pVals, &cv);
47,293,655✔
832
            if (p == NULL) {
47,293,655!
833
              return terrno;
×
834
            }
835
          }
836
          dataIndex++;
53,128,086✔
837
        }
838
      }
839
    }
840

841
    SRow* pRow = NULL;
8,085,802✔
842
    code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
8,085,802✔
843
    if (code != TSDB_CODE_SUCCESS) {
8,018,477!
844
      tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
×
845
      taosArrayDestroy(pVals);
×
846
      tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
×
847
      return code;
×
848
    }
849

850
    void* p = taosArrayPush(pTableData->aRowP, &pRow);
8,018,477✔
851
    if (p == NULL) {
8,014,139!
852
      return terrno;
×
853
    }
854
  }
855

856
  taosArrayDestroy(pVals);
107,459✔
857
  return TSDB_CODE_SUCCESS;
107,402✔
858
}
859

860
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
7,521✔
861
                                 const char* dstTableName, int64_t* uid) {
862
  int32_t     vgId = TD_VID(pVnode);
7,521✔
863
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
7,521✔
864
  const char* id = pTask->id.idStr;
7,521✔
865
  int32_t     timeout = 60;  // 1min
7,521✔
866
  int64_t     start = taosGetTimestampSec();
7,521✔
867

868
  while (pTableSinkInfo->uid == 0) {
11,261!
869
    if (streamTaskShouldStop(pTask)) {
11,265✔
870
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
1!
871
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
7,524✔
872
    }
873

874
    int64_t waitingDuration = taosGetTimestampSec() - start;
11,266✔
875
    if (waitingDuration > timeout) {
11,260!
876
      tqError("s-task:%s wait for table-creating:%s more than %dsec, failed", id, dstTableName, timeout);
×
877
      return  TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
878
    }
879

880
    // wait for the table to be created
881
    SMetaReader mr = {0};
11,260✔
882
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
11,260✔
883

884
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
11,266✔
885
    if (code == 0) {  // table already exists, check its type and uid
11,263✔
886
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
7,520✔
887
      if (isValid) {  // not valid table, ignore it
7,518!
888
        tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
7,518✔
889
        // set the destination table uid
890
        (*uid) = mr.me.uid;
7,519✔
891
        pTableSinkInfo->uid = mr.me.uid;
7,519✔
892
      }
893

894
      metaReaderClear(&mr);
7,519✔
895
      return terrno;
7,523✔
896
    } else {  // not exist, wait and retry
897
      metaReaderClear(&mr);
3,743✔
898
      taosMsleep(100);
3,744✔
899
      tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
3,740✔
900
    }
901
  }
902

903
  return TSDB_CODE_SUCCESS;
×
904
}
905

906
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
55✔
907
  int32_t vgId = TD_VID(pVnode);
55✔
908
  int64_t suid = pTask->outputInfo.tbSink.stbUid;
55✔
909
  const char* id = pTask->id.idStr;
55✔
910

911
  while (1) {
55✔
912
    if (streamTaskShouldStop(pTask)) {
110!
913
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s drop", id, dstTableName);
×
914
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
×
915
    }
916
    SMetaReader mr = {0};
110✔
917
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
110✔
918
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
110✔
919
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
110✔
920
      metaReaderClear(&mr);
55✔
921
      break;
55✔
922
    } else if (TSDB_CODE_SUCCESS == code) {
55!
923
      if (isValidDstChildTable(&mr, vgId, dstTableName, suid)) {
55!
924
        metaReaderClear(&mr);
55✔
925
        taosMsleep(100);
55✔
926
        tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
55!
927
      } else {
928
        metaReaderClear(&mr);
×
929
        break;
×
930
      }
931
    } else {
932
      tqError("s-task:%s failed to wait for table:%s drop", id, dstTableName);
×
933
      metaReaderClear(&mr);
×
934
      return terrno;
×
935
    }
936
  }
937
  return TSDB_CODE_SUCCESS;
55✔
938
}
939

940
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
31,953✔
941
  int32_t nameLen = strlen(pDstTableName);
31,953✔
942
  (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
31,953!
943
  if (*pInfo == NULL) {
31,957!
944
    return terrno;
×
945
  }
946

947
  (*pInfo)->name.len = nameLen;
31,957✔
948
  memcpy((*pInfo)->name.data, pDstTableName, nameLen);
31,957✔
949
  return TSDB_CODE_SUCCESS;
31,957✔
950
}
951

952
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
51,355✔
953
                           SSubmitTbData* pTableData) {
954
  uint64_t        groupId = pDataBlock->info.id.groupId;
51,355✔
955
  char*           dstTableName = pDataBlock->info.parTbName;
51,355✔
956
  int32_t         numOfRows = pDataBlock->info.rows;
51,355✔
957
  const char*     id = pTask->id.idStr;
51,355✔
958
  int64_t         suid = pTask->outputInfo.tbSink.stbUid;
51,355✔
959
  STSchema*       pTSchema = pTask->outputInfo.tbSink.pTSchema;
51,355✔
960
  int32_t         vgId = TD_VID(pVnode);
51,355✔
961
  STableSinkInfo* pTableSinkInfo = NULL;
51,355✔
962
  int32_t         code = 0;
51,355✔
963

964
  bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo);
51,355✔
965

966
  if (alreadyCached) {
51,357✔
967
    if (dstTableName[0] == 0) {  // data block does not set the destination table name
25,527✔
968
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
965✔
969
      tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
965!
970
              dstTableName);
971
    } else {
972
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
24,562✔
973
      if (pTableSinkInfo->uid != 0) {
24,562✔
974
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
17,237✔
975
                dstTableName, pTableSinkInfo->uid);
976
      } else {
977
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
7,325✔
978
                id, numOfRows, groupId, dstTableName);
979
      }
980
    }
981
  } else {  // this groupId has not been kept in cache yet
982
    if (dstTableName[0] == 0) {
25,830✔
983
      memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
321✔
984
      code = buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
321✔
985
      if (code) {
321!
986
        tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
×
987
        return code;
×
988
      } else {
989
        tqDebug("s-task:%s no table name given, generated sub-table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
321!
990
      }
991
    } else {
992
      if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
25,509!
993
          !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
×
994
        tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
×
995
        if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
996
          code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
997
        } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER && stbFullName) {
×
998
          code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
999
        }
1000
        if (code != TSDB_CODE_SUCCESS) {
×
1001
          return code;
×
1002
        }
1003
      }
1004
    }
1005

1006
    code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo);
25,830✔
1007
    if (code == 0) {
25,832✔
1008
      tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
25,831✔
1009
    } else {
1010
      tqDebug("s-task:%s failed to build new sinkTableInfo, dstTable:%s", id, dstTableName);
1!
1011
      return code;
×
1012
    }
1013
  }
1014

1015
  if (alreadyCached) {
51,354✔
1016
    pTableData->uid = pTableSinkInfo->uid;
25,522✔
1017

1018
    if (pTableData->uid == 0) {
25,522✔
1019
      tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
7,524✔
1020
      return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
7,524✔
1021
    } else {
1022
      tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
17,998✔
1023
    }
1024
  } else {
1025
    // The auto-create option will always set to be open for those submit messages, which arrives during the period
1026
    // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
1027
    // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
1028
    // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
1029
    // randomly generated, but false table uid in the WAL.
1030
    SMetaReader mr = {0};
25,832✔
1031
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
25,832✔
1032

1033
    // table not in cache, let's try to extract it from tsdb meta
1034
    if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
25,831✔
1035
      metaReaderClear(&mr);
11,091✔
1036

1037
      if (pTask->outputInfo.tbSink.autoCreateCtb) {
11,090!
1038
        tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
11,091✔
1039

1040
        SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
11,091✔
1041
        if (pTagArray == NULL) {
11,091!
1042
          tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId,
×
1043
                  tstrerror(terrno));
1044
          return terrno;
×
1045
        }
1046

1047
        pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
11,091✔
1048
        code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
22,182✔
1049
                                       IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq, id);
11,091!
1050
        taosArrayDestroy(pTagArray);
11,091✔
1051

1052
        if (code) {
11,091!
1053
          tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName, tstrerror(code));
×
1054
          taosMemoryFree(pTableSinkInfo);
×
1055
          return code;
×
1056
        }
1057

1058
        pTableSinkInfo->uid = 0;
11,091✔
1059
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
11,091✔
1060
      } else {
1061
        tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
×
1062
                dstTableName);
1063
        return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1064
      }
1065
    } else {
1066
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
14,739✔
1067
      if (!isValid) {
14,739!
1068
        metaReaderClear(&mr);
×
1069
        taosMemoryFree(pTableSinkInfo);
×
1070
        tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
×
1071
                dstTableName);
1072
        return terrno;
×
1073
      } else {
1074
        pTableData->uid = mr.me.uid;
14,739✔
1075
        pTableSinkInfo->uid = mr.me.uid;
14,739✔
1076

1077
        metaReaderClear(&mr);
14,739✔
1078
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
14,741✔
1079
      }
1080
    }
1081
  }
1082

1083
  return code;
43,829✔
1084
}
1085

1086
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
107,391✔
1087
                                 SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
1088
  int32_t numOfRows = pDataBlock->info.rows;
107,391✔
1089
  char*   dstTableName = pDataBlock->info.parTbName;
107,391✔
1090

1091
  tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
107,391✔
1092
          blockIndex + 1, numOfRows, suid);
1093

1094
  // convert all rows
1095
  int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id);
107,397✔
1096
  if (code != TSDB_CODE_SUCCESS) {
107,401!
1097
    tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
×
1098
    return code;
×
1099
  }
1100

1101
  if (pTableData->aRowP != NULL) {
107,401✔
1102
    taosArraySort(pTableData->aRowP, tsAscendingSortFn);
107,398✔
1103
    tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
107,395✔
1104
  }
1105

1106
  return code;
107,395✔
1107
}
1108

1109
int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) {
14,557✔
1110
  int32_t          code = TSDB_CODE_SUCCESS;
14,557✔
1111
  const char*      id = pTask->id.idStr;
14,557✔
1112
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,557✔
1113
  int32_t          vgId = pTask->pMeta->vgId;
14,557✔
1114

1115
  if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
14,557✔
1116
    SMetaReader mer1 = {0};
3,424✔
1117
    metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
3,424✔
1118

1119
    code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
3,425✔
1120
    if (code != TSDB_CODE_SUCCESS) {
3,425!
1121
      tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
×
1122
      metaReaderClear(&mer1);
×
1123
      return code;
×
1124
    }
1125

1126
    pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
3,426✔
1127
    metaReaderClear(&mer1);
3,426✔
1128

1129
    if (pOutputInfo->tbSink.pTagSchema == NULL) {
3,426!
UNCOV
1130
      tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno));
×
UNCOV
1131
      return terrno;
×
1132
    }
1133

1134
    SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
3,426✔
1135
    SSchema*        pCol1 = &pTagSchema->pSchema[0];
3,426✔
1136
    if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
3,426!
1137
      pOutputInfo->tbSink.autoCreateCtb = true;
2,078✔
1138
    } else {
1139
      pOutputInfo->tbSink.autoCreateCtb = false;
1,348✔
1140
    }
1141
  }
1142

1143
  return code;
14,559✔
1144
}
1145

1146
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
14,554✔
1147
  const SArray*    pBlocks = (const SArray*)data;
14,554✔
1148
  SVnode*          pVnode = (SVnode*)vnode;
14,554✔
1149
  int64_t          suid = pTask->outputInfo.tbSink.stbUid;
14,554✔
1150
  char*            stbFullName = pTask->outputInfo.tbSink.stbFullName;
14,554✔
1151
  STSchema*        pTSchema = pTask->outputInfo.tbSink.pTSchema;
14,554✔
1152
  int32_t          vgId = TD_VID(pVnode);
14,554✔
1153
  int32_t          numOfBlocks = taosArrayGetSize(pBlocks);
14,554✔
1154
  int32_t          code = TSDB_CODE_SUCCESS;
14,565✔
1155
  const char*      id = pTask->id.idStr;
14,565✔
1156
  int64_t          earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
14,565✔
1157
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,561✔
1158

1159
  code = checkTagSchema(pTask, pVnode);
14,561✔
1160
  if (code != TSDB_CODE_SUCCESS) {
14,561!
1161
    return;
×
1162
  }
1163

1164
  code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode);
14,561✔
1165
  if (code != TSDB_CODE_SUCCESS) {
14,564!
1166
    tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id);
×
1167
    // continue processing even if notification fails
1168
  }
1169

1170
  bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
14,564✔
1171
  if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
14,566✔
1172
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has other type block, submit one-by-one", vgId,
6,438✔
1173
            id, numOfBlocks);
1174

1175
    for (int32_t i = 0; i < numOfBlocks; ++i) {
25,826✔
1176
      if (streamTaskShouldStop(pTask)) {
19,389!
UNCOV
1177
        return;
×
1178
      }
1179

1180
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
19,390✔
1181
      if (pDataBlock == NULL) {
19,389!
1182
        continue;
×
1183
      }
1184

1185
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
19,389✔
1186
        code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
2,531✔
1187
      } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
16,858✔
1188
        code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
7,299✔
1189
      } else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
9,559!
1190
        continue;
×
1191
      } else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
9,559!
1192
        code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
55✔
1193
      } else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
9,504!
1194
        continue;
×
1195
      } else {
1196
        code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
9,504✔
1197
      }
1198
    }
1199
  } else {
1200
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
8,128✔
1201
    if (streamTaskShouldStop(pTask)) {
8,128!
1202
      return;
×
1203
    }
1204

1205
    rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
8,128✔
1206
  }
1207
}
1208

1209
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1210
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
14,562✔
1211
  for (int32_t i = 0; i < numOfBlocks; ++i) {
113,547✔
1212
    SSDataBlock* p = taosArrayGet(pBlocks, i);
105,068✔
1213
    if (p == NULL) {
105,067!
1214
      continue;
×
1215
    }
1216

1217
    if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
105,067✔
1218
      return false;
6,082✔
1219
    }
1220
  }
1221

1222
  return true;
8,479✔
1223
}
1224

1225
int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
31,954✔
1226
  int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
31,954✔
1227
  if (code != TSDB_CODE_SUCCESS) {
31,956!
1228
    taosMemoryFreeClear(pTableSinkInfo);
×
1229
  } else {
1230
    tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
31,956✔
1231
            pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
1232
  }
1233

1234
  return code;
31,956✔
1235
}
1236

1237
bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
58,705✔
1238
  void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
58,705✔
1239
  if (pVal) {
58,712✔
1240
    *pInfo = *(STableSinkInfo**)pVal;
26,718✔
1241
    return true;
26,718✔
1242
  }
1243

1244
  return false;
31,994✔
1245
}
1246

1247
int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
×
1248
  if (tSimpleHashGetSize(pSinkTableMap) == 0) {
×
1249
    return TSDB_CODE_SUCCESS;
×
1250
  }
1251

1252
  int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
×
1253
  if (code == 0) {
×
1254
    tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
×
1255
  } else {
1256
    tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
×
1257
  }
1258
  return code;
×
1259
}
1260

1261
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
2,530✔
1262
                                int64_t suid) {
1263
  SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
2,530✔
1264
  if (deleteReq.deleteReqs == NULL) {
2,533!
1265
    return terrno;
×
1266
  }
1267

1268
  int32_t code =
1269
      tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask));
2,533!
1270
  if (code != TSDB_CODE_SUCCESS) {
2,531!
1271
    return code;
×
1272
  }
1273

1274
  if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
2,531!
1275
    taosArrayDestroy(deleteReq.deleteReqs);
×
1276
    return TSDB_CODE_SUCCESS;
×
1277
  }
1278

1279
  int32_t len;
1280
  tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
2,531!
1281
  if (code != TSDB_CODE_SUCCESS) {
2,530!
1282
    qError("s-task:%s failed to encode delete request", pTask->id.idStr);
×
1283
    return code;
×
1284
  }
1285

1286
  SEncoder encoder = {0};
2,530✔
1287
  void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
2,530✔
1288
  void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
2,529✔
1289
  tEncoderInit(&encoder, abuf, len);
2,529✔
1290
  code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
2,529✔
1291
  tEncoderClear(&encoder);
2,533✔
1292
  taosArrayDestroy(deleteReq.deleteReqs);
2,533✔
1293

1294
  if (code) {
2,531!
1295
    return code;
×
1296
  }
1297

1298
  ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
2,531✔
1299

1300
  SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
2,531✔
1301
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
2,531!
1302
    tqDebug("failed to put delete req into write-queue since %s", terrstr());
×
1303
  }
1304

1305
  return TSDB_CODE_SUCCESS;
2,532✔
1306
}
1307

1308
void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
8,123✔
1309
  int32_t     code = 0;
8,123✔
1310
  const char* id = pTask->id.idStr;
8,123✔
1311
  int32_t     vgId = pTask->pMeta->vgId;
8,123✔
1312
  int32_t     numOfBlocks = taosArrayGetSize(pBlocks);
8,123✔
1313
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
8,124✔
1314
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
8,124✔
1315
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
8,124✔
1316

1317
  SHashObj* pTableIndexMap =
1318
      taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
8,124✔
1319

1320
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
8,125✔
1321
  if (submitReq.aSubmitTbData == NULL) {
8,128!
1322
    code = terrno;
×
1323
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
×
1324
    taosHashCleanup(pTableIndexMap);
×
1325
    return;
×
1326
  }
1327

1328
  bool hasSubmit = false;
8,128✔
1329
  for (int32_t i = 0; i < numOfBlocks; i++) {
106,021✔
1330
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
97,894✔
1331
    if (pDataBlock == NULL) {
97,893!
1332
      continue;
×
1333
    }
1334

1335
    if (pDataBlock->info.type == STREAM_CHECKPOINT) {
97,893!
1336
      continue;
×
1337
    }
1338

1339
    if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
97,893!
1340
      continue;
×
1341
    }
1342

1343
    hasSubmit = true;
97,893✔
1344
    pTask->execInfo.sink.numOfBlocks += 1;
97,893✔
1345
    uint64_t groupId = pDataBlock->info.id.groupId;
97,893✔
1346

1347
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
97,893✔
1348

1349
    int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
97,893✔
1350
    if (index == NULL) {  // no data yet, append it
97,892✔
1351
      code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
41,847✔
1352
      if (code != TSDB_CODE_SUCCESS) {
41,846!
1353
        tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
×
1354
        continue;
×
1355
      }
1356

1357
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
41,846✔
1358
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
41,845!
1359
        if (tbData.pCreateTbReq != NULL) {
×
1360
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1361
          (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id);
×
1362
          tbData.pCreateTbReq = NULL;
×
1363
        }
1364
        continue;
×
1365
      }
1366

1367
      void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
41,845✔
1368
      if (p == NULL) {
41,841!
1369
        tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
×
1370
        continue;
×
1371
      }
1372

1373
      int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
41,841✔
1374
      code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
41,842✔
1375
      if (code) {
41,848!
1376
        tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
×
1377
        continue;
×
1378
      }
1379
    } else {
1380
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
56,045✔
1381
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
56,046!
1382
        if (tbData.pCreateTbReq != NULL) {
×
1383
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1384
          tbData.pCreateTbReq = NULL;
×
1385
        }
1386
        continue;
×
1387
      }
1388

1389
      SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
56,047✔
1390
      if (pExisted == NULL) {
56,046!
1391
        continue;
×
1392
      }
1393

1394
      code = doMergeExistedRows(pExisted, &tbData, id);
56,046✔
1395
      if (code != TSDB_CODE_SUCCESS) {
56,046!
1396
        continue;
×
1397
      }
1398
    }
1399

1400
    pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
97,894✔
1401
  }
1402

1403
  taosHashCleanup(pTableIndexMap);
8,127✔
1404

1405
  if (hasSubmit) {
8,127!
1406
    code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
8,127✔
1407
    if (code) {  // failed and continue
8,126!
1408
      tqError("vgId:%d failed to build and send submit msg", vgId);
×
1409
    }
1410
  } else {
UNCOV
1411
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1412
    tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
×
1413
  }
1414
}
1415

1416
int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) {
9,509✔
1417
  int32_t     code = 0;
9,509✔
1418
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
9,509✔
1419
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
9,509✔
1420
  const char* id = pTask->id.idStr;
9,509✔
1421
  int32_t     vgId = TD_VID(pVnode);
9,509✔
1422
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
9,509✔
1423

1424
  pTask->execInfo.sink.numOfBlocks += 1;
9,509✔
1425

1426
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
9,509✔
1427
  if (submitReq.aSubmitTbData == NULL) {
9,510!
1428
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno));
×
1429
    return terrno;
×
1430
  }
1431

1432
  SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
9,510✔
1433
  code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
9,510✔
1434
  if (code != TSDB_CODE_SUCCESS) {
9,507✔
1435
    tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
1!
1436
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
1✔
1437
    return code;
1✔
1438
  }
1439

1440
  code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id);
9,506✔
1441
  if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
9,504!
1442
    if (tbData.pCreateTbReq != NULL) {
×
1443
      tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1444
      (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id);
×
1445
      tbData.pCreateTbReq = NULL;
×
1446
    }
1447

1448
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1449
    return code;
×
1450
  }
1451

1452
  void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
9,505✔
1453
  if (p == NULL) {
9,504!
1454
    tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
×
1455
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1456
    return terrno;
×
1457
  }
1458

1459
  code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
9,504✔
1460
  if (code) {  // failed and continue
9,506!
1461
    tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code));
×
1462
  }
1463

1464
  return code;
9,506✔
1465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc