• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4348

21 Jun 2025 07:48AM UTC coverage: 62.366% (+1.8%) from 60.571%
#4348

push

travis-ci

web-flow
docs: add OpenMetrics support and configuration details to taosAdapter documentation (#31427)

* docs: add OpenMetrics support and configuration details to taosAdapter documentation

* docs: enhance OpenMetrics section in taosAdapter documentation

156282 of 319947 branches covered (48.85%)

Branch coverage included in aggregate %.

242147 of 318911 relevant lines covered (75.93%)

6151642.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.09
/source/dnode/vnode/src/tq/tqSink.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcommon.h"
17
#include "tq.h"
18

19
typedef struct STableSinkInfo {
20
  uint64_t uid;
21
  tstr     name;
22
} STableSinkInfo;
23

24
static bool    hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
25
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
26
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
27
                                  SSubmitTbData* pTableData);
28
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
29
                                       int64_t suid);
30
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
31
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
32
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
33
                             int64_t earlyTs, const char* id);
34
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
35
                                        const char* dstTableName, int64_t* uid);
36

37
static bool    isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
38
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
39
                                  int32_t numOfTags);
40
static int32_t createDefaultTagColName(SArray** pColNameList);
41
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
42
                                          const char* stbFullName, int64_t gid, bool newSubTableRule, const char* id);
43
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
44
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
45
                                           const char* id);
46
static bool    doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
47
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
48
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
49
static void    rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
50
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
51
                                    int64_t earlyTs);
52
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);
53

54
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
930✔
55
                         const char* pIdStr, bool newSubTableRule) {
56
  int32_t          totalRows = pDataBlock->info.rows;
930✔
57
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
930✔
58
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
930✔
59
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
930✔
60
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
930✔
61

62
  if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
930!
63
    return terrno;
×
64
  }
65

66
  tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
930!
67

68
  for (int32_t row = 0; row < totalRows; row++) {
2,675✔
69
    int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
1,745!
70
    int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
1,745!
71
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
1,745!
72

73
    char* name = NULL;
1,745✔
74
    char* originName = NULL;
1,745✔
75
    void* varTbName = NULL;
1,745✔
76
    if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
3,490✔
77
      varTbName = colDataGetVarData(pTbNameCol, row);
506✔
78
    }
79

80
    if (varTbName != NULL && varTbName != (void*)-1) {
2,236!
81
      size_t cap = TMAX(TSDB_TABLE_NAME_LEN, varDataLen(varTbName) + 1);
491✔
82
      name = taosMemoryMalloc(cap);
491!
83
      if (name == NULL) {
491!
84
        return terrno;
×
85
      }
86

87
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
491✔
88
      name[varDataLen(varTbName)] = '\0';
491✔
89
      if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
491!
90
          stbFullName) {
91
        int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
475✔
92
        if (code != TSDB_CODE_SUCCESS) {
475!
93
          return code;
×
94
        }
95
      }
96
    } else if (stbFullName) {
1,254✔
97
      int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name);
638✔
98
      if (code) {
638!
99
        return code;
×
100
      }
101
    } else {
102
      originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
616!
103
      if (originName == NULL) {
616!
104
        return terrno;
×
105
      }
106

107
      if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) {
616!
108
        name = varDataVal(originName);
616✔
109
      }
110
    }
111

112
    if (!name || *name == '\0') {
1,745!
113
      tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
×
114
             " since invalid tbname:%s",
115
             pIdStr, groupId, skey, ekey, name ? name : "NULL");
116
    } else {
117
      tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr,
1,745!
118
              groupId, name, skey, ekey);
119

120
      SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
1,745✔
121
      tstrncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
1,745✔
122
      void* p = taosArrayPush(deleteReq->deleteReqs, &req);
1,745✔
123
      if (p == NULL) {
1,745!
124
        return terrno;
×
125
      }
126
    }
127

128
    if (originName) {
1,745✔
129
      name = originName;
616✔
130
    }
131

132
    taosMemoryFreeClear(name);
1,745!
133
  }
134

135
  return 0;
930✔
136
}
137

138
static int32_t encodeCreateChildTableForRPC(void* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
2,469✔
139
  int32_t ret = 0;
2,469✔
140

141
  tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
2,469!
142
  if (ret < 0) {
2,469!
143
    ret = -1;
×
144
    goto end;
×
145
  }
146
  *contLen += sizeof(SMsgHead);
2,469✔
147
  *pBuf = rpcMallocCont(*contLen);
2,469✔
148
  if (NULL == *pBuf) {
2,469!
149
    ret = -1;
×
150
    goto end;
×
151
  }
152
  ((SMsgHead*)(*pBuf))->vgId = vgId;
2,469✔
153
  ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
2,469✔
154
  SEncoder coder = {0};
2,469✔
155
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
2,469✔
156
  if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
2,469!
157
    rpcFreeCont(*pBuf);
×
158
    *pBuf = NULL;
×
159
    *contLen = 0;
×
160
    tEncoderClear(&coder);
×
161
    ret = -1;
×
162
    goto end;
×
163
  }
164
  tEncoderClear(&coder);
2,469✔
165

166
end:
2,469✔
167
  return ret;
2,469✔
168
}
169

170
static int32_t encodeDropChildTableForRPC(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen) {
×
171
  int32_t  code = 0;
×
172
  SEncoder ec = {0};
×
173
  tEncodeSize(tEncodeSVDropTbBatchReq, pReqs, *contLen, code);
×
174
  if (code < 0) {
×
175
    code = TSDB_CODE_INVALID_MSG;
×
176
    goto end;
×
177
  }
178
  *contLen += sizeof(SMsgHead);
×
179
  *ppBuf = rpcMallocCont(*contLen);
×
180

181
  if (!*ppBuf) {
×
182
    code = terrno;
×
183
    goto end;
×
184
  }
185

186
  ((SMsgHead*)(*ppBuf))->vgId = vgId;
×
187
  ((SMsgHead*)(*ppBuf))->contLen = htonl(*contLen);
×
188

189
  tEncoderInit(&ec, POINTER_SHIFT(*ppBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
×
190
  code = tEncodeSVDropTbBatchReq(&ec, pReqs);
×
191
  tEncoderClear(&ec);
×
192
  if (code < 0) {
×
193
    rpcFreeCont(*ppBuf);
×
194
    *ppBuf = NULL;
×
195
    *contLen = 0;
×
196
    code = TSDB_CODE_INVALID_MSG;
×
197
    goto end;
×
198
  }
199
end:
×
200
  return code;
×
201
}
202

203
static int32_t tqPutReqToQueue(SVnode* pVnode, void* pReqs, int32_t(*encoder)(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen), tmsg_t msgType) {
2,469✔
204
  void*   buf = NULL;
2,469✔
205
  int32_t tlen = 0;
2,469✔
206
  int32_t vgId = TD_VID(pVnode);
2,469✔
207

208
  int32_t code = encoder(pReqs, TD_VID(pVnode), &buf, &tlen);
2,469✔
209
  if (code) {
2,469!
210
    tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", vgId, tstrerror(code));
×
211
    return code;
×
212
  }
213

214
  SRpcMsg msg = {.msgType = msgType, .pCont = buf, .contLen = tlen};
2,469✔
215
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
2,469✔
216
  if (code) {
2,469!
217
    tqError("vgId:%d failed to put into write-queue since %s", vgId, tstrerror(code));
×
218
  }
219

220
  return code;
2,469✔
221
}
222

223
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
3,304✔
224
  pCreateTableReq->flags = 0;
3,304✔
225
  pCreateTableReq->type = TSDB_CHILD_TABLE;
3,304✔
226
  pCreateTableReq->ctb.suid = suid;
3,304✔
227

228
  // set super table name
229
  SName name = {0};
3,304✔
230

231
  int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
3,304✔
232
  if (code == 0) {
3,305!
233
    pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
3,305!
234
    if (pCreateTableReq->ctb.stbName == NULL) {  // ignore this error code
3,305!
235
      tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
×
236
      code = terrno;
×
237
    }
238
  }
239

240
  pCreateTableReq->ctb.tagNum = numOfTags;
3,305✔
241
  return code;
3,305✔
242
}
243

244
int32_t createDefaultTagColName(SArray** pColNameList) {
1,398✔
245
  *pColNameList = NULL;
1,398✔
246

247
  SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
1,398✔
248
  if (pTagColNameList == NULL) {
1,398!
249
    return terrno;
×
250
  }
251

252
  char  tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
1,398✔
253
  void* p = taosArrayPush(pTagColNameList, tagNameStr);
1,398✔
254
  if (p == NULL) {
1,398!
255
    taosArrayDestroy(pTagColNameList);
×
256
    return terrno;
×
257
  }
258

259
  *pColNameList = pTagColNameList;
1,398✔
260
  return TSDB_CODE_SUCCESS;
1,398✔
261
}
262

263
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
3,305✔
264
                                   int64_t gid, bool newSubTableRule, const char* id) {
265
  if (pDataBlock->info.parTbName[0]) {
3,305✔
266
    if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
3,286✔
267
        !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
714!
268
      pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
22!
269
      if (pCreateTableReq->name == NULL) {
22!
270
        return terrno;
×
271
      }
272

273
      tstrncpy(pCreateTableReq->name, pDataBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
22✔
274
      int32_t code = buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid, TSDB_TABLE_NAME_LEN);
22✔
275
      if (code != TSDB_CODE_SUCCESS) {
22!
276
        return code;
×
277
      }
278
      tqDebug("s-task:%s gen name from:%s blockdata", id, pDataBlock->info.parTbName);
22!
279
    } else {
280
      pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
3,264!
281
      if (pCreateTableReq->name == NULL) {
3,264!
282
        return terrno;
×
283
      }
284
      tqDebug("s-task:%s copy name:%s from blockdata", id, pDataBlock->info.parTbName);
3,264!
285
    }
286
  } else {
287
    int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
19✔
288
    tqDebug("s-task:%s no name in blockdata, auto-created table name:%s", id, pCreateTableReq->name);
19!
289
    return code;
19✔
290
  }
291

292
  return 0;
3,286✔
293
}
294

295
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
2,468✔
296
                                            SStreamTask* pTask, int64_t suid) {
297
  STSchema*          pTSchema = pTask->outputInfo.tbSink.pTSchema;
2,468✔
298
  int32_t            rows = pDataBlock->info.rows;
2,468✔
299
  SArray*            tagArray = NULL;
2,468✔
300
  const char*        id = pTask->id.idStr;
2,468✔
301
  int32_t            vgId = pTask->pMeta->vgId;
2,468✔
302
  int32_t            code = 0;
2,468✔
303
  STableSinkInfo*    pInfo = NULL;
2,468✔
304
  SVCreateTbBatchReq reqs = {0};
2,468✔
305
  SArray*            crTblArray = NULL;
2,468✔
306

307
  tqDebug("s-task:%s build create %d table(s) msg", id, rows);
2,468!
308

309
  tagArray = taosArrayInit(4, sizeof(STagVal));
2,469✔
310
  crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
2,469✔
311
  if ((NULL == reqs.pArray) || (tagArray == NULL)) {
2,469!
312
    tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
×
313
    code = terrno;
×
314
    goto _end;
×
315
  }
316

317
  for (int32_t rowId = 0; rowId < rows; rowId++) {
4,938✔
318
    SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
2,469✔
319

320
    int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
2,469✔
321
    int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
2,469✔
322

323
    code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
2,469✔
324
    if (code) {
2,469!
325
      tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
×
326
      continue;
×
327
    }
328

329
    taosArrayClear(tagArray);
2,469✔
330

331
    if (size == 2) {
2,469✔
332
      STagVal tagVal = {
562✔
333
          .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
562✔
334

335
      void* p = taosArrayPush(tagArray, &tagVal);
562✔
336
      if (p == NULL) {
562!
337
        return terrno;
×
338
      }
339

340
      code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
562✔
341
      if (code) {
562!
342
        return code;
×
343
      }
344
    } else {
345
      for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
18,852✔
346
        SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
16,945✔
347
        if (pTagData == NULL) {
16,945!
348
          continue;
7,885✔
349
        }
350

351
        STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
16,945✔
352
        void*   pData = colDataGetData(pTagData, rowId);
16,945!
353
        if (colDataIsNull_s(pTagData, rowId)) {
33,890!
354
          continue;
7,885✔
355
        } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
9,060!
356
          tagVal.nData = varDataLen(pData);
3,438✔
357
          tagVal.pData = (uint8_t*)varDataVal(pData);
3,438✔
358
        } else {
359
          memcpy(&tagVal.i64, pData, pTagData->info.bytes);
5,622✔
360
        }
361
        void* p = taosArrayPush(tagArray, &tagVal);
9,060✔
362
        if (p == NULL) {
9,060!
363
          code = terrno;
×
364
          goto _end;
×
365
        }
366
      }
367
    }
368

369
    code = tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
2,469✔
370
    taosArrayDestroy(tagArray);
2,469✔
371
    tagArray = NULL;
2,469✔
372

373
    if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
2,469!
374
      tdDestroySVCreateTbReq(pCreateTbReq);
375
      code = TSDB_CODE_OUT_OF_MEMORY;
×
376
      goto _end;
×
377
    }
378

379
    uint64_t gid = pDataBlock->info.id.groupId;
2,469✔
380
    if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
2,469!
381
      SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
2,469✔
382
      if (pGpIdColInfo == NULL) {
2,469!
383
        continue;
×
384
      }
385

386
      // todo remove this
387
      void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
2,469!
388
      if (gid != *(int64_t*)pGpIdData) {
2,469!
389
        tqError("s-task:%s vgId:%d invalid groupId:%" PRId64 " actual:%" PRId64 " in sink task", id, vgId, gid,
×
390
                *(int64_t*)pGpIdData);
391
      }
392
    }
393

394
    code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask),
2,469!
395
                                      pTask->id.idStr);
396
    if (code) {
2,469!
397
      goto _end;
×
398
    }
399

400
    void* p = taosArrayPush(reqs.pArray, pCreateTbReq);
2,469✔
401
    if (p == NULL) {
2,469!
402
      code = terrno;
×
403
      goto _end;
×
404
    }
405

406
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo);
2,469✔
407
    if (!alreadyCached) {
2,469✔
408
      code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo);
1,884✔
409
      if (code) {
1,884!
410
        tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
×
411
        continue;
×
412
      }
413

414
      code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id);
1,884✔
415
      if (code) {
1,884!
416
        tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
×
417
      }
418
    }
419

420
    tqDebug("s-task:%s build create table:%s msg complete", id, pCreateTbReq->name);
2,469!
421
  }
422

423
  reqs.nReqs = taosArrayGetSize(reqs.pArray);
2,469✔
424
  code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
2,469✔
425
  if (code != TSDB_CODE_SUCCESS) {
2,469!
426
    tqError("s-task:%s failed to send create table msg, code:%s", id, tstrerror(code));
×
427
  }
428

429
_end:
2,469✔
430
  taosArrayDestroy(tagArray);
2,469✔
431
  taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
2,469✔
432
  return code;
2,469✔
433
}
434

435
static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SSDataBlock* pDataBlock,
×
436
                                          SStreamTask* pTask, int64_t suid) {
437
  int32_t          lino = 0;
×
438
  int32_t          code = 0;
×
439
  int32_t          rows = pDataBlock->info.rows;
×
440
  const char*      id = pTask->id.idStr;
×
441
  SVDropTbBatchReq batchReq = {0};
×
442
  SVDropTbReq      req = {0};
×
443

444
  if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
×
445

446
  batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
×
447
  if (!batchReq.pArray) return terrno;
×
448
  batchReq.nReqs = rows;
×
449
  req.suid = suid;
×
450
  req.igNotExists = true;
×
451

452
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
×
453
  char             tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
×
454
  int32_t          i = 0;
×
455
  void*            pData = colDataGetVarData(pTbNameCol, i);
×
456
  memcpy(tbName, varDataVal(pData), varDataLen(pData));
×
457
  tbName[varDataLen(pData) + 1] = 0;
×
458
  req.name = tbName;
×
459
  if (taosArrayPush(batchReq.pArray, &req) == NULL) {
×
460
    TSDB_CHECK_CODE(terrno, lino, _exit);
×
461
  }
462

463
  SMetaReader mr = {0};
×
464
  metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
×
465

466
  code = metaGetTableEntryByName(&mr, tbName);
×
467
  if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
×
468
    STableSinkInfo* pTableSinkInfo = NULL;
×
469
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
×
470
    if (alreadyCached) {
×
471
      pTableSinkInfo->uid = mr.me.uid;
×
472
    }
473
  }
474
  metaReaderClear(&mr);
×
475
  tqDebug("s-task:%s build drop %d table(s) msg", id, rows);
×
476
  code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
×
477
  TSDB_CHECK_CODE(code, lino, _exit);
×
478

479

480
  code = doWaitForDstTableDropped(pVnode, pTask, tbName);
×
481
  TSDB_CHECK_CODE(code, lino, _exit);
×
482

483
_exit:
×
484
  if (batchReq.pArray) {
×
485
    taosArrayDestroy(batchReq.pArray);
×
486
  }
487
  return code;
×
488
}
489

490
int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
7,031✔
491
  const char* id = pTask->id.idStr;
7,031✔
492
  int32_t     vgId = TD_VID(pVnode);
7,031✔
493
  int32_t     len = 0;
7,031✔
494
  void*       pBuf = NULL;
7,031✔
495
  int32_t     numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData);
7,031✔
496

497
  int32_t code = buildSubmitMsgImpl(pReq, vgId, &pBuf, &len);
7,031✔
498
  if (code != TSDB_CODE_SUCCESS) {
7,031!
499
    tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
×
500
    return code;
×
501
  }
502

503
  SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
7,031✔
504
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
7,031✔
505
  if (code == TSDB_CODE_SUCCESS) {
7,031!
506
    tqDebug("s-task:%s vgId:%d comp %d blocks into %d and send to dstTable(s) completed", id, vgId, numOfBlocks,
7,031!
507
            numOfFinalBlocks);
508
  } else {
509
    tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
×
510
  }
511

512
  SSinkRecorder* pRec = &pTask->execInfo.sink;
7,031✔
513

514
  pRec->numOfSubmit += 1;
7,031✔
515
  if ((pRec->numOfSubmit % 1000) == 0) {
7,031!
516
    double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0;
×
517
    tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
×
518
           " submit into dst table, %.2fMiB duration:%.2f Sec.",
519
           id, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), el);
520
  }
521

522
  return TSDB_CODE_SUCCESS;
7,031✔
523
}
524

525
// merge the new submit table block with the existed blocks
526
// if ts in the new data block overlap with existed one, replace it
527
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
460✔
528
  int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
460✔
529
  int32_t newLen = taosArrayGetSize(pNew->aRowP);
460✔
530
  int32_t numOfPk = 0;
460✔
531

532
  int32_t j = 0, k = 0;
460✔
533
  SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
460✔
534
  if (pFinal == NULL) {
460!
535
    tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno));
×
536
    return terrno;
×
537
  }
538

539
  while (j < newLen && k < oldLen) {
1,645,324✔
540
    SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
1,644,864✔
541
    SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
1,644,864✔
542

543
    if (pNewRow->ts < pOldRow->ts) {
1,644,864✔
544
      void* p = taosArrayPush(pFinal, &pNewRow);
28✔
545
      if (p == NULL) {
28!
546
        return terrno;
×
547
      }
548
      j += 1;
28✔
549
    } else if (pNewRow->ts > pOldRow->ts) {
1,644,836✔
550
      void* p = taosArrayPush(pFinal, &pOldRow);
1,644,534✔
551
      if (p == NULL) {
1,644,534!
552
        return terrno;
×
553
      }
554

555
      k += 1;
1,644,534✔
556
    } else {
557
      // check for the existance of primary key
558
      if (pNewRow->numOfPKs == 0) {
302!
559
        void* p = taosArrayPush(pFinal, &pNewRow);
302✔
560
        if (p == NULL) {
302!
561
          return terrno;
×
562
        }
563

564
        k += 1;
302✔
565
        j += 1;
302✔
566
        tRowDestroy(pOldRow);
302✔
567
      } else {
568
        numOfPk = pNewRow->numOfPKs;
×
569

570
        SRowKey kNew, kOld;
571
        tRowGetKey(pNewRow, &kNew);
×
572
        tRowGetKey(pOldRow, &kOld);
×
573

574
        int32_t ret = tRowKeyCompare(&kNew, &kOld);
×
575
        if (ret <= 0) {
×
576
          void* p = taosArrayPush(pFinal, &pNewRow);
×
577
          if (p == NULL) {
×
578
            return terrno;
×
579
          }
580

581
          j += 1;
×
582

583
          if (ret == 0) {
×
584
            k += 1;
×
585
            tRowDestroy(pOldRow);
×
586
          }
587
        } else {
588
          void* p = taosArrayPush(pFinal, &pOldRow);
×
589
          if (p == NULL) {
×
590
            return terrno;
×
591
          }
592

593
          k += 1;
×
594
        }
595
      }
596
    }
597
  }
598

599
  while (j < newLen) {
270,061✔
600
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
269,601✔
601
    void* p = taosArrayPush(pFinal, &pRow);
269,601✔
602
    if (p == NULL) {
269,601!
603
      return terrno;
×
604
    }
605
  }
606

607
  while (k < oldLen) {
813✔
608
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
353✔
609
    void* p = taosArrayPush(pFinal, &pRow);
353✔
610
    if (p == NULL) {
353!
611
      return terrno;
×
612
    }
613
  }
614

615
  taosArrayDestroy(pNew->aRowP);
460✔
616
  taosArrayDestroy(pExisted->aRowP);
460✔
617
  pExisted->aRowP = pFinal;
460✔
618

619
  tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
460!
620
          (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
621
          (pNew->pCreateTbReq != NULL));
622

623
  tdDestroySVCreateTbReq(pNew->pCreateTbReq);
460!
624
  taosMemoryFree(pNew->pCreateTbReq);
460!
625
  return TSDB_CODE_SUCCESS;
460✔
626
}
627

628
bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) {
2,358✔
629
  if (pReader->me.type != TSDB_CHILD_TABLE) {
2,358!
630
    tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
×
631
    terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE;
×
632
    return false;
×
633
  }
634

635
  if (pReader->me.ctbEntry.suid != suid) {
2,358!
636
    tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
×
637
            ctbName, suid, pReader->me.ctbEntry.suid);
638
    terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
639
    return false;
×
640
  }
641

642
  terrno = 0;
2,358✔
643
  return true;
2,358✔
644
}
645

646
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
836✔
647
                                SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq, const char* id) {
648
  *pReq = NULL;
836✔
649

650
  SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
836!
651
  if (pCreateTbReq == NULL) {
836!
652
    return terrno;
×
653
  }
654

655
  taosArrayClear(pTagArray);
836✔
656
  int32_t code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
836✔
657
  if (code != 0) {
836!
658
    return code;
×
659
  }
660

661
  STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
836✔
662
  void* p = taosArrayPush(pTagArray, &tagVal);
836✔
663
  if (p == NULL) {
836!
664
    return terrno;
×
665
  }
666

667
  code = tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
836✔
668
  if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
836!
669
    tdDestroySVCreateTbReq(pCreateTbReq);
670
    taosMemoryFreeClear(pCreateTbReq);
×
671
    return code;
×
672
  }
673

674
  code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
836✔
675
  if (code) {
836!
676
    return code;
×
677
  }
678

679
  // set table name
680
  code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule,
836✔
681
                                    id);
682
  if (code) {
836!
683
    return code;
×
684
  }
685

686
  *pReq = pCreateTbReq;
836✔
687
  return code;
836✔
688
}
689

690
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
7,031✔
691
  int32_t code = 0;
7,031✔
692
  void*   pBuf = NULL;
7,031✔
693
  *msgLen = 0;
7,031✔
694

695
  // encode
696
  int32_t len = 0;
7,031✔
697
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
7,031!
698

699
  SEncoder encoder = {0};
7,030✔
700
  len += sizeof(SSubmitReq2Msg);
7,030✔
701

702
  pBuf = rpcMallocCont(len);
7,030✔
703
  if (NULL == pBuf) {
7,031!
704
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
705
    return terrno;
×
706
  }
707

708
  ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
7,031✔
709
  ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
7,031✔
710
  ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
7,031✔
711

712
  tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
7,031✔
713
  if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) {
7,031!
714
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
715
    tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
×
716
    tEncoderClear(&encoder);
×
717
    rpcFreeCont(pBuf);
×
718
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
719
    return code;
×
720
  }
721

722
  tEncoderClear(&encoder);
7,030✔
723
  tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
7,031✔
724

725
  *msgLen = len;
7,031✔
726
  *pMsg = pBuf;
7,031✔
727
  return TSDB_CODE_SUCCESS;
7,031✔
728
}
729

730
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
35,687,098✔
731
  SRow* pRow1 = *(SRow**)p1;
35,687,098✔
732
  SRow* pRow2 = *(SRow**)p2;
35,687,098✔
733

734
  if (pRow1->ts == pRow2->ts) {
35,687,098!
735
    return 0;
×
736
  } else {
737
    return pRow1->ts > pRow2->ts ? 1 : -1;
35,687,098!
738
  }
739
}
740

741
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs,
8,049✔
742
                      const char* id) {
743
  int32_t numOfRows = pDataBlock->info.rows;
8,049✔
744
  int32_t code = TSDB_CODE_SUCCESS;
8,049✔
745

746
  SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
8,049✔
747
  pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
8,049✔
748

749
  if (pTableData->aRowP == NULL || pVals == NULL) {
8,049!
750
    taosArrayDestroy(pTableData->aRowP);
×
751
    pTableData->aRowP = NULL;
×
752
    taosArrayDestroy(pVals);
×
753
    code = terrno;
×
754
    tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
×
755
    return code;
×
756
  }
757

758
  for (int32_t j = 0; j < numOfRows; j++) {
6,164,158✔
759
    taosArrayClear(pVals);
6,156,158✔
760

761
    int32_t dataIndex = 0;
6,156,137✔
762
    int64_t ts = 0;
6,156,137✔
763

764
    for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
59,406,510✔
765
      const STColumn* pCol = &pTSchema->columns[k];
53,224,958✔
766

767
      // primary timestamp column, for debug purpose
768
      if (k == 0) {
53,224,958✔
769
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
6,156,059✔
770
        if (pColData == NULL) {
6,155,942!
771
          continue;
×
772
        }
773

774
        ts = *(int64_t*)colDataGetData(pColData, j);
6,155,942!
775
        tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
6,155,942✔
776

777
        if (ts < earlyTs) {
6,176,391!
778
          tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
×
779
                  ts, earlyTs);
780
          taosArrayDestroy(pTableData->aRowP);
×
781
          pTableData->aRowP = NULL;
×
782
          taosArrayDestroy(pVals);
×
783
          return TSDB_CODE_SUCCESS;
×
784
        }
785
      }
786

787
      if (IS_SET_NULL(pCol)) {
53,245,290✔
788
        if (pCol->flags & COL_IS_KEY) {
670!
789
          qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
×
790
                 pCol->colId, pCol->type);
791
          break;
×
792
        }
793
        SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
670✔
794
        void* p = taosArrayPush(pVals, &cv);
670✔
795
        if (p == NULL) {
670!
796
          return terrno;
×
797
        }
798
      } else {
799
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
53,244,620✔
800
        if (pColData == NULL) {
53,241,777!
801
          continue;
×
802
        }
803

804
        if (colDataIsNull_s(pColData, j)) {
106,483,554✔
805
          if (pCol->flags & COL_IS_KEY) {
5,902,346!
806
            qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
×
807
                   ts, pCol->colId, pCol->type);
808
            break;
×
809
          }
810

811
          SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
5,902,346✔
812
          void* p = taosArrayPush(pVals, &cv);
5,902,346✔
813
          if (p == NULL) {
5,902,346!
814
            return terrno;
×
815
          }
816

817
          dataIndex++;
5,902,346✔
818
        } else {
819
          void* colData = colDataGetData(pColData, j);
47,339,431!
820
          if (IS_VAR_DATA_TYPE(pCol->type)) {  // address copy, no value
53,142,441!
821
            SValue sv =
5,799,602✔
822
                (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
5,799,602✔
823
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
5,799,602✔
824
            void* p = taosArrayPush(pVals, &cv);
5,803,010✔
825
            if (p == NULL) {
5,803,010!
826
              return terrno;
×
827
            }
828
          } else {
829
            SValue sv = {.type = pCol->type};
41,539,829✔
830
            valueSetDatum(&sv, pCol->type, colData, tDataTypes[pCol->type].bytes);
41,539,829✔
831
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
41,539,799✔
832
            void* p = taosArrayPush(pVals, &cv);
41,544,347✔
833
            if (p == NULL) {
41,544,347!
834
              return terrno;
×
835
            }
836
          }
837
          dataIndex++;
47,347,357✔
838
        }
839
      }
840
    }
841

842
    SRow* pRow = NULL;
6,181,552✔
843
    code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
6,181,552✔
844
    if (code != TSDB_CODE_SUCCESS) {
6,156,432!
845
      tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
×
846
      taosArrayDestroy(pVals);
×
847
      tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
×
848
      return code;
×
849
    }
850

851
    void* p = taosArrayPush(pTableData->aRowP, &pRow);
6,156,432✔
852
    if (p == NULL) {
6,156,109!
853
      return terrno;
×
854
    }
855
  }
856

857
  taosArrayDestroy(pVals);
8,000✔
858
  return TSDB_CODE_SUCCESS;
8,049✔
859
}
860

861
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
2,318✔
862
                                 const char* dstTableName, int64_t* uid) {
863
  int32_t     vgId = TD_VID(pVnode);
2,318✔
864
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
2,318✔
865
  const char* id = pTask->id.idStr;
2,318✔
866
  int32_t     timeout = 60;  // 1min
2,318✔
867
  int64_t     start = taosGetTimestampSec();
2,318✔
868

869
  while (pTableSinkInfo->uid == 0) {
3,510!
870
    if (streamTaskShouldStop(pTask)) {
3,510✔
871
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
1!
872
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
2,318✔
873
    }
874

875
    int64_t waitingDuration = taosGetTimestampSec() - start;
3,509✔
876
    if (waitingDuration > timeout) {
3,509!
877
      tqError("s-task:%s wait for table-creating:%s more than %dsec, failed", id, dstTableName, timeout);
×
878
      return  TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
879
    }
880

881
    // wait for the table to be created
882
    SMetaReader mr = {0};
3,509✔
883
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
3,509✔
884

885
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
3,509✔
886
    if (code == 0) {  // table already exists, check its type and uid
3,509✔
887
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
2,317✔
888
      if (isValid) {  // not valid table, ignore it
2,317!
889
        tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
2,317!
890
        // set the destination table uid
891
        (*uid) = mr.me.uid;
2,317✔
892
        pTableSinkInfo->uid = mr.me.uid;
2,317✔
893
      }
894

895
      metaReaderClear(&mr);
2,317✔
896
      return terrno;
2,317✔
897
    } else {  // not exist, wait and retry
898
      metaReaderClear(&mr);
1,192✔
899
      taosMsleep(100);
1,192✔
900
      tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
1,192!
901
    }
902
  }
903

904
  return TSDB_CODE_SUCCESS;
×
905
}
906

907
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
×
908
  int32_t vgId = TD_VID(pVnode);
×
909
  int64_t suid = pTask->outputInfo.tbSink.stbUid;
×
910
  const char* id = pTask->id.idStr;
×
911

912
  while (1) {
×
913
    if (streamTaskShouldStop(pTask)) {
×
914
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s drop", id, dstTableName);
×
915
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
×
916
    }
917
    SMetaReader mr = {0};
×
918
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
×
919
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
×
920
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
921
      metaReaderClear(&mr);
×
922
      break;
×
923
    } else if (TSDB_CODE_SUCCESS == code) {
×
924
      if (isValidDstChildTable(&mr, vgId, dstTableName, suid)) {
×
925
        metaReaderClear(&mr);
×
926
        taosMsleep(100);
×
927
        tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
×
928
      } else {
929
        metaReaderClear(&mr);
×
930
        break;
×
931
      }
932
    } else {
933
      tqError("s-task:%s failed to wait for table:%s drop", id, dstTableName);
×
934
      metaReaderClear(&mr);
×
935
      return terrno;
×
936
    }
937
  }
938
  return TSDB_CODE_SUCCESS;
×
939
}
940

941
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
2,761✔
942
  int32_t nameLen = strlen(pDstTableName);
2,761✔
943
  (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
2,761!
944
  if (*pInfo == NULL) {
2,761!
945
    return terrno;
×
946
  }
947

948
  (*pInfo)->name.len = nameLen;
2,761✔
949
  memcpy((*pInfo)->name.data, pDstTableName, nameLen);
2,761✔
950
  return TSDB_CODE_SUCCESS;
2,761✔
951
}
952

953
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
7,590✔
954
                           SSubmitTbData* pTableData) {
955
  uint64_t        groupId = pDataBlock->info.id.groupId;
7,590✔
956
  char*           dstTableName = pDataBlock->info.parTbName;
7,590✔
957
  int32_t         numOfRows = pDataBlock->info.rows;
7,590✔
958
  const char*     id = pTask->id.idStr;
7,590✔
959
  int64_t         suid = pTask->outputInfo.tbSink.stbUid;
7,590✔
960
  STSchema*       pTSchema = pTask->outputInfo.tbSink.pTSchema;
7,590✔
961
  int32_t         vgId = TD_VID(pVnode);
7,590✔
962
  STableSinkInfo* pTableSinkInfo = NULL;
7,590✔
963
  int32_t         code = 0;
7,590✔
964

965
  bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo);
7,590✔
966

967
  if (alreadyCached) {
7,590✔
968
    if (dstTableName[0] == 0) {  // data block does not set the destination table name
6,713✔
969
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
936✔
970
      tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
936!
971
              dstTableName);
972
    } else {
973
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
5,777✔
974
      if (pTableSinkInfo->uid != 0) {
5,777✔
975
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
3,658!
976
                dstTableName, pTableSinkInfo->uid);
977
      } else {
978
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
2,119!
979
                id, numOfRows, groupId, dstTableName);
980
      }
981
    }
982
  } else {  // this groupId has not been kept in cache yet
983
    if (dstTableName[0] == 0) {
877✔
984
      memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
311✔
985
      code = buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
311✔
986
      if (code) {
311!
987
        tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
×
988
        return code;
×
989
      } else {
990
        tqDebug("s-task:%s no table name given, generated sub-table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
311!
991
      }
992
    } else {
993
      if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
566!
994
          !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
×
995
        tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
×
996
        if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
997
          code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
998
        } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER && stbFullName) {
×
999
          code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
1000
        }
1001
        if (code != TSDB_CODE_SUCCESS) {
×
1002
          return code;
×
1003
        }
1004
      }
1005
    }
1006

1007
    code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo);
877✔
1008
    if (code == 0) {
877!
1009
      tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s, groupId:%" PRId64, id, dstTableName, groupId);
877!
1010
    } else {
1011
      tqDebug("s-task:%s failed to build new sinkTableInfo, dstTable:%s, groupId:%" PRId64, id, dstTableName, groupId);
×
1012
      return code;
×
1013
    }
1014
  }
1015

1016
  if (alreadyCached) {
7,590✔
1017
    pTableData->uid = pTableSinkInfo->uid;
6,713✔
1018

1019
    if (pTableData->uid == 0) {
6,713✔
1020
      tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
2,318✔
1021
      return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
2,318✔
1022
    } else {
1023
      tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
4,395✔
1024
    }
1025
  } else {
1026
    // The auto-create option will always set to be open for those submit messages, which arrives during the period
1027
    // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
1028
    // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
1029
    // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
1030
    // randomly generated, but false table uid in the WAL.
1031
    SMetaReader mr = {0};
877✔
1032
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
877✔
1033

1034
    // table not in cache, let's try to extract it from tsdb meta
1035
    if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
877✔
1036
      metaReaderClear(&mr);
836✔
1037

1038
      if (pTask->outputInfo.tbSink.autoCreateCtb) {
836!
1039
        tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
836!
1040

1041
        SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
836✔
1042
        if (pTagArray == NULL) {
836!
1043
          tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId,
×
1044
                  tstrerror(terrno));
1045
          return terrno;
×
1046
        }
1047

1048
        pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
836✔
1049
        code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
1,672✔
1050
                                       IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq, id);
836!
1051
        taosArrayDestroy(pTagArray);
836✔
1052

1053
        if (code) {
836!
1054
          tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName, tstrerror(code));
×
1055
          taosMemoryFree(pTableSinkInfo);
×
1056
          return code;
×
1057
        }
1058

1059
        pTableSinkInfo->uid = 0;
836✔
1060
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
836✔
1061
      } else {
1062
        tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
×
1063
                dstTableName);
1064
        return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1065
      }
1066
    } else {
1067
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
41✔
1068
      if (!isValid) {
41!
1069
        metaReaderClear(&mr);
×
1070
        taosMemoryFree(pTableSinkInfo);
×
1071
        tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
×
1072
                dstTableName);
1073
        return terrno;
×
1074
      } else {
1075
        pTableData->uid = mr.me.uid;
41✔
1076
        pTableSinkInfo->uid = mr.me.uid;
41✔
1077

1078
        metaReaderClear(&mr);
41✔
1079
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
41✔
1080
      }
1081
    }
1082
  }
1083

1084
  return code;
5,272✔
1085
}
1086

1087
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
8,049✔
1088
                                 SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
1089
  int32_t numOfRows = pDataBlock->info.rows;
8,049✔
1090
  char*   dstTableName = pDataBlock->info.parTbName;
8,049✔
1091

1092
  tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
8,049!
1093
          blockIndex + 1, numOfRows, suid);
1094

1095
  // convert all rows
1096
  int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id);
8,049✔
1097
  if (code != TSDB_CODE_SUCCESS) {
8,049!
1098
    tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
×
1099
    return code;
×
1100
  }
1101

1102
  if (pTableData->aRowP != NULL) {
8,049!
1103
    taosArraySort(pTableData->aRowP, tsAscendingSortFn);
8,049✔
1104
    tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
8,047✔
1105
  }
1106

1107
  return code;
8,047✔
1108
}
1109

1110
int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) {
5,661✔
1111
  int32_t          code = TSDB_CODE_SUCCESS;
5,661✔
1112
  const char*      id = pTask->id.idStr;
5,661✔
1113
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
5,661✔
1114
  int32_t          vgId = pTask->pMeta->vgId;
5,661✔
1115

1116
  if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
5,661✔
1117
    SMetaReader mer1 = {0};
1,638✔
1118
    metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
1,638✔
1119

1120
    code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
1,638✔
1121
    if (code != TSDB_CODE_SUCCESS) {
1,638!
1122
      tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
×
1123
      metaReaderClear(&mer1);
×
1124
      return code;
×
1125
    }
1126

1127
    pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
1,638✔
1128
    metaReaderClear(&mer1);
1,638✔
1129

1130
    if (pOutputInfo->tbSink.pTagSchema == NULL) {
1,638!
1131
      tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno));
×
1132
      return terrno;
×
1133
    }
1134

1135
    SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
1,638✔
1136
    SSchema*        pCol1 = &pTagSchema->pSchema[0];
1,638✔
1137
    if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
1,638!
1138
      pOutputInfo->tbSink.autoCreateCtb = true;
1,029✔
1139
    } else {
1140
      pOutputInfo->tbSink.autoCreateCtb = false;
609✔
1141
    }
1142
  }
1143

1144
  return code;
5,661✔
1145
}
1146

1147
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
5,661✔
1148
  const SArray*    pBlocks = (const SArray*)data;
5,661✔
1149
  SVnode*          pVnode = (SVnode*)vnode;
5,661✔
1150
  int64_t          suid = pTask->outputInfo.tbSink.stbUid;
5,661✔
1151
  char*            stbFullName = pTask->outputInfo.tbSink.stbFullName;
5,661✔
1152
  STSchema*        pTSchema = pTask->outputInfo.tbSink.pTSchema;
5,661✔
1153
  int32_t          vgId = TD_VID(pVnode);
5,661✔
1154
  int32_t          numOfBlocks = taosArrayGetSize(pBlocks);
5,661✔
1155
  int32_t          code = TSDB_CODE_SUCCESS;
5,662✔
1156
  const char*      id = pTask->id.idStr;
5,662✔
1157
  int64_t          earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
5,662✔
1158
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
5,662✔
1159

1160
  code = checkTagSchema(pTask, pVnode);
5,662✔
1161
  if (code != TSDB_CODE_SUCCESS) {
5,662!
1162
    return;
×
1163
  }
1164

1165
  code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode);
5,662✔
1166
  if (code != TSDB_CODE_SUCCESS) {
5,662!
1167
    tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id);
×
1168
    // continue processing even if notification fails
1169
  }
1170

1171
  bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
5,662✔
1172
  if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
5,662✔
1173
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has other type block, submit one-by-one", vgId,
2,488!
1174
            id, numOfBlocks);
1175

1176
    for (int32_t i = 0; i < numOfBlocks; ++i) {
9,729✔
1177
      if (streamTaskShouldStop(pTask)) {
7,241!
1178
        return;
×
1179
      }
1180

1181
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
7,241✔
1182
      if (pDataBlock == NULL) {
7,240!
1183
        continue;
×
1184
      }
1185

1186
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
7,240✔
1187
        code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
914✔
1188
      } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
6,326✔
1189
        code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
2,469✔
1190
      } else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
3,857!
1191
        continue;
×
1192
      } else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
3,857!
1193
        code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
×
1194
      } else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
3,857!
1195
        continue;
×
1196
      } else {
1197
        code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
3,857✔
1198
      }
1199
    }
1200
  } else {
1201
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
3,174!
1202
    if (streamTaskShouldStop(pTask)) {
3,174!
1203
      return;
×
1204
    }
1205

1206
    rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
3,174✔
1207
  }
1208
}
1209

1210
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1211
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
5,661✔
1212
  for (int32_t i = 0; i < numOfBlocks; ++i) {
10,880✔
1213
    SSDataBlock* p = taosArrayGet(pBlocks, i);
7,377✔
1214
    if (p == NULL) {
7,377!
1215
      continue;
×
1216
    }
1217

1218
    if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
7,377✔
1219
      return false;
2,158✔
1220
    }
1221
  }
1222

1223
  return true;
3,503✔
1224
}
1225

1226
int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
2,761✔
1227
  int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
2,761✔
1228
  if (code != TSDB_CODE_SUCCESS) {
2,761!
1229
    taosMemoryFreeClear(pTableSinkInfo);
×
1230
  } else {
1231
    tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
2,761!
1232
            pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
1233
  }
1234

1235
  return code;
2,761✔
1236
}
1237

1238
bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
10,059✔
1239
  void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
10,059✔
1240
  if (pVal) {
10,059✔
1241
    *pInfo = *(STableSinkInfo**)pVal;
7,298✔
1242
    return true;
7,298✔
1243
  }
1244

1245
  return false;
2,761✔
1246
}
1247

1248
int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
×
1249
  if (tSimpleHashGetSize(pSinkTableMap) == 0) {
×
1250
    return TSDB_CODE_SUCCESS;
×
1251
  }
1252

1253
  int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
×
1254
  if (code == 0) {
×
1255
    tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
×
1256
  } else {
1257
    tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
×
1258
  }
1259
  return code;
×
1260
}
1261

1262
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
914✔
1263
                                int64_t suid) {
1264
  SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
914✔
1265
  if (deleteReq.deleteReqs == NULL) {
914!
1266
    return terrno;
×
1267
  }
1268

1269
  int32_t code =
1270
      tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask));
914!
1271
  if (code != TSDB_CODE_SUCCESS) {
914!
1272
    return code;
×
1273
  }
1274

1275
  if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
914!
1276
    taosArrayDestroy(deleteReq.deleteReqs);
×
1277
    return TSDB_CODE_SUCCESS;
×
1278
  }
1279

1280
  int32_t len;
1281
  tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
914!
1282
  if (code != TSDB_CODE_SUCCESS) {
914!
1283
    qError("s-task:%s failed to encode delete request", pTask->id.idStr);
×
1284
    return code;
×
1285
  }
1286

1287
  SEncoder encoder = {0};
914✔
1288
  void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
914✔
1289
  void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
914✔
1290
  tEncoderInit(&encoder, abuf, len);
914✔
1291
  code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
914✔
1292
  tEncoderClear(&encoder);
914✔
1293
  taosArrayDestroy(deleteReq.deleteReqs);
914✔
1294

1295
  if (code) {
914!
1296
    return code;
×
1297
  }
1298

1299
  ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
914✔
1300

1301
  SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
914✔
1302
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
914!
1303
    tqDebug("failed to put delete req into write-queue since %s", terrstr());
×
1304
  }
1305

1306
  return TSDB_CODE_SUCCESS;
914✔
1307
}
1308

1309
void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
3,174✔
1310
  int32_t     code = 0;
3,174✔
1311
  const char* id = pTask->id.idStr;
3,174✔
1312
  int32_t     vgId = pTask->pMeta->vgId;
3,174✔
1313
  int32_t     numOfBlocks = taosArrayGetSize(pBlocks);
3,174✔
1314
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
3,174✔
1315
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
3,174✔
1316
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
3,174✔
1317

1318
  SHashObj* pTableIndexMap =
1319
      taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
3,174✔
1320

1321
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
3,174✔
1322
  if (submitReq.aSubmitTbData == NULL) {
3,174!
1323
    code = terrno;
×
1324
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
×
1325
    taosHashCleanup(pTableIndexMap);
×
1326
    return;
×
1327
  }
1328

1329
  bool hasSubmit = false;
3,174✔
1330
  for (int32_t i = 0; i < numOfBlocks; i++) {
7,367✔
1331
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
4,192✔
1332
    if (pDataBlock == NULL) {
4,192!
1333
      continue;
1✔
1334
    }
1335

1336
    if (pDataBlock->info.type == STREAM_CHECKPOINT) {
4,192!
1337
      continue;
×
1338
    }
1339

1340
    if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
4,192!
1341
      continue;
×
1342
    }
1343

1344
    hasSubmit = true;
4,192✔
1345
    pTask->execInfo.sink.numOfBlocks += 1;
4,192✔
1346
    uint64_t groupId = pDataBlock->info.id.groupId;
4,192✔
1347

1348
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
4,192✔
1349

1350
    int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
4,192✔
1351
    if (index == NULL) {  // no data yet, append it
4,192✔
1352
      code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
3,732✔
1353
      if (code != TSDB_CODE_SUCCESS) {
3,732!
1354
        tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
×
1355
        continue;
1✔
1356
      }
1357

1358
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
3,732✔
1359
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
3,731!
1360
        if (tbData.pCreateTbReq != NULL) {
1!
1361
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1362
          (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id);
×
1363
          tbData.pCreateTbReq = NULL;
×
1364
        }
1365
        continue;
1✔
1366
      }
1367

1368
      void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
3,730✔
1369
      if (p == NULL) {
3,730!
1370
        tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
×
1371
        continue;
×
1372
      }
1373

1374
      int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
3,730✔
1375
      code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
3,730✔
1376
      if (code) {
3,732!
1377
        tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
×
1378
        continue;
×
1379
      }
1380
    } else {
1381
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
460✔
1382
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
460!
1383
        if (tbData.pCreateTbReq != NULL) {
×
1384
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1385
          tbData.pCreateTbReq = NULL;
×
1386
        }
1387
        continue;
×
1388
      }
1389

1390
      SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
460✔
1391
      if (pExisted == NULL) {
460!
1392
        continue;
×
1393
      }
1394

1395
      code = doMergeExistedRows(pExisted, &tbData, id);
460✔
1396
      if (code != TSDB_CODE_SUCCESS) {
460!
1397
        continue;
×
1398
      }
1399
    }
1400

1401
    pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
4,192✔
1402
  }
1403

1404
  taosHashCleanup(pTableIndexMap);
3,175✔
1405

1406
  if (hasSubmit) {
3,174!
1407
    code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
3,174✔
1408
    if (code) {  // failed and continue
3,174!
1409
      tqError("vgId:%d failed to build and send submit msg", vgId);
×
1410
    }
1411
  } else {
1412
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1413
    tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
×
1414
  }
1415
}
1416

1417
int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) {
3,858✔
1418
  int32_t     code = 0;
3,858✔
1419
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
3,858✔
1420
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
3,858✔
1421
  const char* id = pTask->id.idStr;
3,858✔
1422
  int32_t     vgId = TD_VID(pVnode);
3,858✔
1423
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
3,858✔
1424

1425
  pTask->execInfo.sink.numOfBlocks += 1;
3,858✔
1426

1427
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
3,858✔
1428
  if (submitReq.aSubmitTbData == NULL) {
3,858!
1429
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno));
×
1430
    return terrno;
×
1431
  }
1432

1433
  SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
3,858✔
1434
  code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
3,858✔
1435
  if (code != TSDB_CODE_SUCCESS) {
3,858✔
1436
    tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
1!
1437
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
1✔
1438
    return code;
1✔
1439
  }
1440

1441
  code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id);
3,857✔
1442
  if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
3,857!
1443
    if (tbData.pCreateTbReq != NULL) {
×
1444
      tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1445
      (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id);
×
1446
      tbData.pCreateTbReq = NULL;
×
1447
    }
1448

1449
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1450
    return code;
×
1451
  }
1452

1453
  void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
3,857✔
1454
  if (p == NULL) {
3,857!
1455
    tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
×
1456
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1457
    return terrno;
×
1458
  }
1459

1460
  code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
3,857✔
1461
  if (code) {  // failed and continue
3,857!
1462
    tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code));
×
1463
  }
1464

1465
  return code;
3,857✔
1466
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc