• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tq/tqSink.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcommon.h"
17
#include "tq.h"
18

19
#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
20

21
typedef struct STableSinkInfo {
22
  uint64_t uid;
23
  tstr     name;
24
} STableSinkInfo;
25

26
static bool    hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
27
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
28
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
29
                                  SSubmitTbData* pTableData);
30
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
31
                                       int64_t suid);
32
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
33
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
34
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
35
                             int64_t earlyTs, const char* id);
36
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
37
                                        const char* dstTableName, int64_t* uid);
38

39
static bool    isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
40
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
41
                                  int32_t numOfTags);
42
static int32_t createDefaultTagColName(SArray** pColNameList);
43
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
44
                                          const char* stbFullName, int64_t gid, bool newSubTableRule);
45
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
46
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
47
                                           const char* id);
48
static bool    doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
49
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
50
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
51
static void    rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
52
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
53
                                    int64_t earlyTs);
54
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);
55

UNCOV
56
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
×
57
                         const char* pIdStr, bool newSubTableRule) {
UNCOV
58
  int32_t          totalRows = pDataBlock->info.rows;
×
UNCOV
59
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
UNCOV
60
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
UNCOV
61
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
UNCOV
62
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
×
63

UNCOV
64
  if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
×
65
    return terrno;
×
66
  }
67

UNCOV
68
  tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
×
69

UNCOV
70
  for (int32_t row = 0; row < totalRows; row++) {
×
UNCOV
71
    int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
×
UNCOV
72
    int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
×
UNCOV
73
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
×
74

UNCOV
75
    char* name = NULL;
×
UNCOV
76
    char* originName = NULL;
×
UNCOV
77
    void* varTbName = NULL;
×
UNCOV
78
    if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
×
UNCOV
79
      varTbName = colDataGetVarData(pTbNameCol, row);
×
80
    }
81

UNCOV
82
    if (varTbName != NULL && varTbName != (void*)-1) {
×
UNCOV
83
      size_t cap = TMAX(TSDB_TABLE_NAME_LEN, varDataLen(varTbName) + 1);
×
UNCOV
84
      name = taosMemoryMalloc(cap);
×
UNCOV
85
      if (name == NULL) {
×
86
        return terrno;
×
87
      }
88

UNCOV
89
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
×
UNCOV
90
      name[varDataLen(varTbName)] = '\0';
×
UNCOV
91
      if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
×
92
          stbFullName) {
UNCOV
93
        int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
×
UNCOV
94
        if (code != TSDB_CODE_SUCCESS) {
×
95
          return code;
×
96
        }
97
      }
UNCOV
98
    } else if (stbFullName) {
×
UNCOV
99
      int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name);
×
UNCOV
100
      if (code) {
×
101
        return code;
×
102
      }
103
    } else {
UNCOV
104
      originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
×
UNCOV
105
      if (originName == NULL) {
×
106
        return terrno;
×
107
      }
108

UNCOV
109
      if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) {
×
UNCOV
110
        name = varDataVal(originName);
×
111
      }
112
    }
113

UNCOV
114
    if (!name || *name == '\0') {
×
115
      tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
×
116
             " since invalid tbname:%s",
117
             pIdStr, groupId, skey, ekey, name ? name : "NULL");
118
    } else {
UNCOV
119
      tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr,
×
120
              groupId, name, skey, ekey);
121

UNCOV
122
      SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
×
UNCOV
123
      tstrncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
×
UNCOV
124
      void* p = taosArrayPush(deleteReq->deleteReqs, &req);
×
UNCOV
125
      if (p == NULL) {
×
126
        return terrno;
×
127
      }
128
    }
129

UNCOV
130
    if (originName) {
×
UNCOV
131
      name = originName;
×
132
    }
133

UNCOV
134
    taosMemoryFreeClear(name);
×
135
  }
136

UNCOV
137
  return 0;
×
138
}
139

UNCOV
140
static int32_t encodeCreateChildTableForRPC(void* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
×
UNCOV
141
  int32_t ret = 0;
×
142

UNCOV
143
  tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
×
UNCOV
144
  if (ret < 0) {
×
145
    ret = -1;
×
146
    goto end;
×
147
  }
UNCOV
148
  *contLen += sizeof(SMsgHead);
×
UNCOV
149
  *pBuf = rpcMallocCont(*contLen);
×
UNCOV
150
  if (NULL == *pBuf) {
×
151
    ret = -1;
×
152
    goto end;
×
153
  }
UNCOV
154
  ((SMsgHead*)(*pBuf))->vgId = vgId;
×
UNCOV
155
  ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
×
UNCOV
156
  SEncoder coder = {0};
×
UNCOV
157
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
×
UNCOV
158
  if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
×
159
    rpcFreeCont(*pBuf);
×
160
    *pBuf = NULL;
×
161
    *contLen = 0;
×
162
    tEncoderClear(&coder);
×
163
    ret = -1;
×
164
    goto end;
×
165
  }
UNCOV
166
  tEncoderClear(&coder);
×
167

UNCOV
168
end:
×
UNCOV
169
  return ret;
×
170
}
171

UNCOV
172
static int32_t encodeDropChildTableForRPC(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen) {
×
UNCOV
173
  int32_t  code = 0;
×
UNCOV
174
  SEncoder ec = {0};
×
UNCOV
175
  tEncodeSize(tEncodeSVDropTbBatchReq, pReqs, *contLen, code);
×
UNCOV
176
  if (code < 0) {
×
177
    code = TSDB_CODE_INVALID_MSG;
×
178
    goto end;
×
179
  }
UNCOV
180
  *contLen += sizeof(SMsgHead);
×
UNCOV
181
  *ppBuf = rpcMallocCont(*contLen);
×
182

UNCOV
183
  if (!*ppBuf) {
×
184
    code = terrno;
×
185
    goto end;
×
186
  }
187

UNCOV
188
  ((SMsgHead*)(*ppBuf))->vgId = vgId;
×
UNCOV
189
  ((SMsgHead*)(*ppBuf))->contLen = htonl(*contLen);
×
190

UNCOV
191
  tEncoderInit(&ec, POINTER_SHIFT(*ppBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
×
UNCOV
192
  code = tEncodeSVDropTbBatchReq(&ec, pReqs);
×
UNCOV
193
  tEncoderClear(&ec);
×
UNCOV
194
  if (code < 0) {
×
195
    rpcFreeCont(*ppBuf);
×
196
    *ppBuf = NULL;
×
197
    *contLen = 0;
×
198
    code = TSDB_CODE_INVALID_MSG;
×
199
    goto end;
×
200
  }
UNCOV
201
end:
×
UNCOV
202
  return code;
×
203
}
204

UNCOV
205
static int32_t tqPutReqToQueue(SVnode* pVnode, void* pReqs, int32_t(*encoder)(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen), tmsg_t msgType) {
×
UNCOV
206
  void*   buf = NULL;
×
UNCOV
207
  int32_t tlen = 0;
×
208

UNCOV
209
  int32_t code = encoder(pReqs, TD_VID(pVnode), &buf, &tlen);
×
UNCOV
210
  if (code) {
×
211
    tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", TD_VID(pVnode), tstrerror(code));
×
212
    return code;
×
213
  }
214

UNCOV
215
  SRpcMsg msg = {.msgType = msgType, .pCont = buf, .contLen = tlen};
×
UNCOV
216
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
×
UNCOV
217
  if (code) {
×
218
    tqError("failed to put into write-queue since %s", terrstr());
×
219
  }
220

UNCOV
221
  return code;
×
222
}
223

UNCOV
224
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
×
UNCOV
225
  pCreateTableReq->flags = 0;
×
UNCOV
226
  pCreateTableReq->type = TSDB_CHILD_TABLE;
×
UNCOV
227
  pCreateTableReq->ctb.suid = suid;
×
228

229
  // set super table name
UNCOV
230
  SName name = {0};
×
231

UNCOV
232
  int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
233
  if (code == 0) {
×
UNCOV
234
    pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
×
UNCOV
235
    if (pCreateTableReq->ctb.stbName == NULL) {  // ignore this error code
×
236
      tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
×
237
      code = terrno;
×
238
    }
239
  }
240

UNCOV
241
  pCreateTableReq->ctb.tagNum = numOfTags;
×
UNCOV
242
  return code;
×
243
}
244

UNCOV
245
int32_t createDefaultTagColName(SArray** pColNameList) {
×
UNCOV
246
  *pColNameList = NULL;
×
247

UNCOV
248
  SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
×
UNCOV
249
  if (pTagColNameList == NULL) {
×
250
    return terrno;
×
251
  }
252

UNCOV
253
  char  tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
×
UNCOV
254
  void* p = taosArrayPush(pTagColNameList, tagNameStr);
×
UNCOV
255
  if (p == NULL) {
×
256
    taosArrayDestroy(pTagColNameList);
×
257
    return terrno;
×
258
  }
259

UNCOV
260
  *pColNameList = pTagColNameList;
×
UNCOV
261
  return TSDB_CODE_SUCCESS;
×
262
}
263

UNCOV
264
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
×
265
                                   int64_t gid, bool newSubTableRule) {
UNCOV
266
  if (pDataBlock->info.parTbName[0]) {
×
UNCOV
267
    if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
×
UNCOV
268
        !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
×
UNCOV
269
      pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
×
UNCOV
270
      if (pCreateTableReq->name == NULL) {
×
271
        return terrno;
×
272
      }
273

UNCOV
274
      tstrncpy(pCreateTableReq->name, pDataBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
×
UNCOV
275
      int32_t code = buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid, TSDB_TABLE_NAME_LEN);
×
UNCOV
276
      if (code != TSDB_CODE_SUCCESS) {
×
277
        return code;
×
278
      }
279
      //      tqDebug("gen name from:%s", pDataBlock->info.parTbName);
280
    } else {
UNCOV
281
      pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
×
UNCOV
282
      if (pCreateTableReq->name == NULL) {
×
283
        return terrno;
×
284
      }
285
      //      tqDebug("copy name:%s", pDataBlock->info.parTbName);
286
    }
287
  } else {
UNCOV
288
    int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
×
UNCOV
289
    return code;
×
290
  }
291

UNCOV
292
  return 0;
×
293
}
294

UNCOV
295
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
×
296
                                            SStreamTask* pTask, int64_t suid) {
UNCOV
297
  STSchema*          pTSchema = pTask->outputInfo.tbSink.pTSchema;
×
UNCOV
298
  int32_t            rows = pDataBlock->info.rows;
×
UNCOV
299
  SArray*            tagArray = NULL;
×
UNCOV
300
  const char*        id = pTask->id.idStr;
×
UNCOV
301
  int32_t            vgId = pTask->pMeta->vgId;
×
UNCOV
302
  int32_t            code = 0;
×
UNCOV
303
  STableSinkInfo*    pInfo = NULL;
×
UNCOV
304
  SVCreateTbBatchReq reqs = {0};
×
UNCOV
305
  SArray*            crTblArray = NULL;
×
306

UNCOV
307
  tqDebug("s-task:%s build create %d table(s) msg", id, rows);
×
308

UNCOV
309
  tagArray = taosArrayInit(4, sizeof(STagVal));
×
UNCOV
310
  crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
×
UNCOV
311
  if ((NULL == reqs.pArray) || (tagArray == NULL)) {
×
UNCOV
312
    tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
×
UNCOV
313
    code = terrno;
×
314
    goto _end;
×
315
  }
316

UNCOV
317
  for (int32_t rowId = 0; rowId < rows; rowId++) {
×
UNCOV
318
    SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0});
×
319

UNCOV
320
    int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
×
UNCOV
321
    int32_t numOfTags = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
×
322

UNCOV
323
    code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, numOfTags);
×
UNCOV
324
    if (code) {
×
325
      tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
×
326
      continue;
×
327
    }
328

UNCOV
329
    taosArrayClear(tagArray);
×
330

UNCOV
331
    if (size == 2) {
×
UNCOV
332
      STagVal tagVal = {
×
UNCOV
333
          .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
×
334

UNCOV
335
      void* p = taosArrayPush(tagArray, &tagVal);
×
UNCOV
336
      if (p == NULL) {
×
337
        return terrno;
×
338
      }
339

UNCOV
340
      code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
×
UNCOV
341
      if (code) {
×
342
        return code;
×
343
      }
344
    } else {
UNCOV
345
      for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
×
UNCOV
346
        SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
×
UNCOV
347
        if (pTagData == NULL) {
×
UNCOV
348
          continue;
×
349
        }
350

UNCOV
351
        STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
×
UNCOV
352
        void*   pData = colDataGetData(pTagData, rowId);
×
UNCOV
353
        if (colDataIsNull_s(pTagData, rowId)) {
×
UNCOV
354
          continue;
×
UNCOV
355
        } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
×
UNCOV
356
          tagVal.nData = varDataLen(pData);
×
UNCOV
357
          tagVal.pData = (uint8_t*)varDataVal(pData);
×
358
        } else {
UNCOV
359
          memcpy(&tagVal.i64, pData, pTagData->info.bytes);
×
360
        }
UNCOV
361
        void* p = taosArrayPush(tagArray, &tagVal);
×
UNCOV
362
        if (p == NULL) {
×
363
          code = terrno;
×
364
          goto _end;
×
365
        }
366
      }
367
    }
368

UNCOV
369
    code = tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
×
UNCOV
370
    taosArrayDestroy(tagArray);
×
UNCOV
371
    tagArray = NULL;
×
372

UNCOV
373
    if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
×
374
      tdDestroySVCreateTbReq(pCreateTbReq);
375
      code = TSDB_CODE_OUT_OF_MEMORY;
×
376
      goto _end;
×
377
    }
378

UNCOV
379
    uint64_t gid = pDataBlock->info.id.groupId;
×
UNCOV
380
    if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
×
UNCOV
381
      SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
×
UNCOV
382
      if (pGpIdColInfo == NULL) {
×
383
        continue;
×
384
      }
385

386
      // todo remove this
UNCOV
387
      void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
×
UNCOV
388
      if (gid != *(int64_t*)pGpIdData) {
×
389
        tqError("s-task:%s vgId:%d invalid groupId:%" PRId64 " actual:%" PRId64 " in sink task", id, vgId, gid,
×
390
                *(int64_t*)pGpIdData);
391
      }
392
    }
393

UNCOV
394
    code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask));
×
UNCOV
395
    if (code) {
×
396
      goto _end;
×
397
    }
398

UNCOV
399
    void* p = taosArrayPush(reqs.pArray, pCreateTbReq);
×
UNCOV
400
    if (p == NULL) {
×
401
      code = terrno;
×
402
      goto _end;
×
403
    }
404

UNCOV
405
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo);
×
UNCOV
406
    if (!alreadyCached) {
×
UNCOV
407
      code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo);
×
UNCOV
408
      if (code) {
×
409
        tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
×
410
        continue;
×
411
      }
412

UNCOV
413
      code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id);
×
UNCOV
414
      if (code) {
×
415
        tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
×
416
      }
417
    }
418

UNCOV
419
    tqDebug("s-task:%s build create table:%s msg complete", id, pCreateTbReq->name);
×
420
  }
421

UNCOV
422
  reqs.nReqs = taosArrayGetSize(reqs.pArray);
×
UNCOV
423
  code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
×
UNCOV
424
  if (code != TSDB_CODE_SUCCESS) {
×
425
    tqError("s-task:%s failed to send create table msg", id);
×
426
  }
427

UNCOV
428
_end:
×
UNCOV
429
  taosArrayDestroy(tagArray);
×
UNCOV
430
  taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
×
UNCOV
431
  return code;
×
432
}
433

UNCOV
434
static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SSDataBlock* pDataBlock,
×
435
                                          SStreamTask* pTask, int64_t suid) {
UNCOV
436
  int32_t          lino = 0;
×
UNCOV
437
  int32_t          code = 0;
×
UNCOV
438
  int32_t          rows = pDataBlock->info.rows;
×
UNCOV
439
  const char*      id = pTask->id.idStr;
×
UNCOV
440
  SVDropTbBatchReq batchReq = {0};
×
UNCOV
441
  SVDropTbReq      req = {0};
×
442

UNCOV
443
  if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
×
444

UNCOV
445
  batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
×
UNCOV
446
  if (!batchReq.pArray) return terrno;
×
UNCOV
447
  batchReq.nReqs = rows;
×
UNCOV
448
  req.suid = suid;
×
UNCOV
449
  req.igNotExists = true;
×
450

UNCOV
451
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
×
UNCOV
452
  char             tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
×
UNCOV
453
  int32_t          i = 0;
×
UNCOV
454
  void*            pData = colDataGetVarData(pTbNameCol, i);
×
UNCOV
455
  memcpy(tbName, varDataVal(pData), varDataLen(pData));
×
UNCOV
456
  tbName[varDataLen(pData) + 1] = 0;
×
UNCOV
457
  req.name = tbName;
×
UNCOV
458
  if (taosArrayPush(batchReq.pArray, &req) == NULL) {
×
459
    TSDB_CHECK_CODE(terrno, lino, _exit);
×
460
  }
461

UNCOV
462
  SMetaReader mr = {0};
×
UNCOV
463
  metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
×
464

UNCOV
465
  code = metaGetTableEntryByName(&mr, tbName);
×
UNCOV
466
  if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
×
UNCOV
467
    STableSinkInfo* pTableSinkInfo = NULL;
×
UNCOV
468
    bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
×
UNCOV
469
    if (alreadyCached) {
×
UNCOV
470
      pTableSinkInfo->uid = mr.me.uid;
×
471
    }
472
  }
UNCOV
473
  metaReaderClear(&mr);
×
UNCOV
474
  tqDebug("s-task:%s build drop %d table(s) msg", id, rows);
×
UNCOV
475
  code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
×
UNCOV
476
  TSDB_CHECK_CODE(code, lino, _exit);
×
477

478

UNCOV
479
  code = doWaitForDstTableDropped(pVnode, pTask, tbName);
×
UNCOV
480
  TSDB_CHECK_CODE(code, lino, _exit);
×
481

UNCOV
482
_exit:
×
UNCOV
483
  if (batchReq.pArray) {
×
UNCOV
484
    taosArrayDestroy(batchReq.pArray);
×
485
  }
UNCOV
486
  return code;
×
487
}
488

UNCOV
489
int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
×
UNCOV
490
  const char* id = pTask->id.idStr;
×
UNCOV
491
  int32_t     vgId = TD_VID(pVnode);
×
UNCOV
492
  int32_t     len = 0;
×
UNCOV
493
  void*       pBuf = NULL;
×
UNCOV
494
  int32_t     numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData);
×
495

UNCOV
496
  int32_t code = buildSubmitMsgImpl(pReq, vgId, &pBuf, &len);
×
UNCOV
497
  if (code != TSDB_CODE_SUCCESS) {
×
498
    tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
×
499
    return code;
×
500
  }
501

UNCOV
502
  SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
×
UNCOV
503
  code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
×
UNCOV
504
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
505
    tqDebug("s-task:%s vgId:%d comp %d blocks into %d and send to dstTable(s) completed", id, vgId, numOfBlocks,
×
506
            numOfFinalBlocks);
507
  } else {
508
    tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
×
509
  }
510

UNCOV
511
  SSinkRecorder* pRec = &pTask->execInfo.sink;
×
512

UNCOV
513
  pRec->numOfSubmit += 1;
×
UNCOV
514
  if ((pRec->numOfSubmit % 1000) == 0) {
×
UNCOV
515
    double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0;
×
UNCOV
516
    tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
×
517
           " submit into dst table, %.2fMiB duration:%.2f Sec.",
518
           id, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), el);
519
  }
520

UNCOV
521
  return TSDB_CODE_SUCCESS;
×
522
}
523

524
// merge the new submit table block with the existed blocks
525
// if ts in the new data block overlap with existed one, replace it
UNCOV
526
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
×
UNCOV
527
  int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
×
UNCOV
528
  int32_t newLen = taosArrayGetSize(pNew->aRowP);
×
UNCOV
529
  int32_t numOfPk = 0;
×
530

UNCOV
531
  int32_t j = 0, k = 0;
×
UNCOV
532
  SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
×
UNCOV
533
  if (pFinal == NULL) {
×
534
    tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno));
×
535
    return terrno;
×
536
  }
537

UNCOV
538
  while (j < newLen && k < oldLen) {
×
UNCOV
539
    SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
×
UNCOV
540
    SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
×
541

UNCOV
542
    if (pNewRow->ts < pOldRow->ts) {
×
UNCOV
543
      void* p = taosArrayPush(pFinal, &pNewRow);
×
UNCOV
544
      if (p == NULL) {
×
545
        return terrno;
×
546
      }
UNCOV
547
      j += 1;
×
UNCOV
548
    } else if (pNewRow->ts > pOldRow->ts) {
×
UNCOV
549
      void* p = taosArrayPush(pFinal, &pOldRow);
×
UNCOV
550
      if (p == NULL) {
×
551
        return terrno;
×
552
      }
553

UNCOV
554
      k += 1;
×
555
    } else {
556
      // check for the existance of primary key
UNCOV
557
      if (pNewRow->numOfPKs == 0) {
×
UNCOV
558
        void* p = taosArrayPush(pFinal, &pNewRow);
×
UNCOV
559
        if (p == NULL) {
×
560
          return terrno;
×
561
        }
562

UNCOV
563
        k += 1;
×
UNCOV
564
        j += 1;
×
UNCOV
565
        tRowDestroy(pOldRow);
×
566
      } else {
567
        numOfPk = pNewRow->numOfPKs;
×
568

569
        SRowKey kNew, kOld;
570
        tRowGetKey(pNewRow, &kNew);
×
571
        tRowGetKey(pOldRow, &kOld);
×
572

573
        int32_t ret = tRowKeyCompare(&kNew, &kOld);
×
574
        if (ret <= 0) {
×
575
          void* p = taosArrayPush(pFinal, &pNewRow);
×
576
          if (p == NULL) {
×
577
            return terrno;
×
578
          }
579

580
          j += 1;
×
581

582
          if (ret == 0) {
×
583
            k += 1;
×
584
            tRowDestroy(pOldRow);
×
585
          }
586
        } else {
587
          void* p = taosArrayPush(pFinal, &pOldRow);
×
588
          if (p == NULL) {
×
589
            return terrno;
×
590
          }
591

592
          k += 1;
×
593
        }
594
      }
595
    }
596
  }
597

UNCOV
598
  while (j < newLen) {
×
UNCOV
599
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
×
UNCOV
600
    void* p = taosArrayPush(pFinal, &pRow);
×
UNCOV
601
    if (p == NULL) {
×
602
      return terrno;
×
603
    }
604
  }
605

UNCOV
606
  while (k < oldLen) {
×
UNCOV
607
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
×
UNCOV
608
    void* p = taosArrayPush(pFinal, &pRow);
×
UNCOV
609
    if (p == NULL) {
×
610
      return terrno;
×
611
    }
612
  }
613

UNCOV
614
  taosArrayDestroy(pNew->aRowP);
×
UNCOV
615
  taosArrayDestroy(pExisted->aRowP);
×
UNCOV
616
  pExisted->aRowP = pFinal;
×
617

UNCOV
618
  tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
×
619
          (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
620
          (pNew->pCreateTbReq != NULL));
621

UNCOV
622
  tdDestroySVCreateTbReq(pNew->pCreateTbReq);
×
UNCOV
623
  taosMemoryFree(pNew->pCreateTbReq);
×
UNCOV
624
  return TSDB_CODE_SUCCESS;
×
625
}
626

UNCOV
627
bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) {
×
UNCOV
628
  if (pReader->me.type != TSDB_CHILD_TABLE) {
×
629
    tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type);
×
630
    terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE;
×
631
    return false;
×
632
  }
633

UNCOV
634
  if (pReader->me.ctbEntry.suid != suid) {
×
635
    tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
×
636
            ctbName, suid, pReader->me.ctbEntry.suid);
637
    terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
638
    return false;
×
639
  }
640

UNCOV
641
  terrno = 0;
×
UNCOV
642
  return true;
×
643
}
644

UNCOV
645
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
×
646
                                SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq) {
UNCOV
647
  *pReq = NULL;
×
648

UNCOV
649
  SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
UNCOV
650
  if (pCreateTbReq == NULL) {
×
651
    return terrno;
×
652
  }
653

UNCOV
654
  taosArrayClear(pTagArray);
×
UNCOV
655
  int32_t code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
×
UNCOV
656
  if (code != 0) {
×
657
    return code;
×
658
  }
659

UNCOV
660
  STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
×
UNCOV
661
  void* p = taosArrayPush(pTagArray, &tagVal);
×
UNCOV
662
  if (p == NULL) {
×
663
    return terrno;
×
664
  }
665

UNCOV
666
  code = tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
×
UNCOV
667
  if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
×
668
    tdDestroySVCreateTbReq(pCreateTbReq);
669
    taosMemoryFreeClear(pCreateTbReq);
×
670
    return code;
×
671
  }
672

UNCOV
673
  code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
×
UNCOV
674
  if (code) {
×
675
    return code;
×
676
  }
677

678
  // set table name
UNCOV
679
  code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule);
×
UNCOV
680
  if (code) {
×
681
    return code;
×
682
  }
683

UNCOV
684
  *pReq = pCreateTbReq;
×
UNCOV
685
  return code;
×
686
}
687

UNCOV
688
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
×
UNCOV
689
  int32_t code = 0;
×
UNCOV
690
  void*   pBuf = NULL;
×
UNCOV
691
  *msgLen = 0;
×
692

693
  // encode
UNCOV
694
  int32_t len = 0;
×
UNCOV
695
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
×
696

UNCOV
697
  SEncoder encoder = {0};
×
UNCOV
698
  len += sizeof(SSubmitReq2Msg);
×
699

UNCOV
700
  pBuf = rpcMallocCont(len);
×
UNCOV
701
  if (NULL == pBuf) {
×
702
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
703
    return terrno;
×
704
  }
705

UNCOV
706
  ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
×
UNCOV
707
  ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
×
UNCOV
708
  ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
×
709

UNCOV
710
  tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
×
UNCOV
711
  if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) {
×
712
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
713
    tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
×
714
    tEncoderClear(&encoder);
×
715
    rpcFreeCont(pBuf);
×
716
    tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
717
    return code;
×
718
  }
719

UNCOV
720
  tEncoderClear(&encoder);
×
UNCOV
721
  tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
×
722

UNCOV
723
  *msgLen = len;
×
UNCOV
724
  *pMsg = pBuf;
×
UNCOV
725
  return TSDB_CODE_SUCCESS;
×
726
}
727

UNCOV
728
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
×
UNCOV
729
  SRow* pRow1 = *(SRow**)p1;
×
UNCOV
730
  SRow* pRow2 = *(SRow**)p2;
×
731

UNCOV
732
  if (pRow1->ts == pRow2->ts) {
×
733
    return 0;
×
734
  } else {
UNCOV
735
    return pRow1->ts > pRow2->ts ? 1 : -1;
×
736
  }
737
}
738

UNCOV
739
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs,
×
740
                      const char* id) {
UNCOV
741
  int32_t numOfRows = pDataBlock->info.rows;
×
UNCOV
742
  int32_t code = TSDB_CODE_SUCCESS;
×
743

UNCOV
744
  SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
×
UNCOV
745
  pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
×
746

UNCOV
747
  if (pTableData->aRowP == NULL || pVals == NULL) {
×
748
    taosArrayDestroy(pTableData->aRowP);
×
749
    pTableData->aRowP = NULL;
×
750
    taosArrayDestroy(pVals);
×
751
    code = terrno;
×
752
    tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
×
753
    return code;
×
754
  }
755

UNCOV
756
  for (int32_t j = 0; j < numOfRows; j++) {
×
UNCOV
757
    taosArrayClear(pVals);
×
758

UNCOV
759
    int32_t dataIndex = 0;
×
UNCOV
760
    int64_t ts = 0;
×
761

UNCOV
762
    for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
×
UNCOV
763
      const STColumn* pCol = &pTSchema->columns[k];
×
764

765
      // primary timestamp column, for debug purpose
UNCOV
766
      if (k == 0) {
×
UNCOV
767
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
×
UNCOV
768
        if (pColData == NULL) {
×
769
          continue;
×
770
        }
771

UNCOV
772
        ts = *(int64_t*)colDataGetData(pColData, j);
×
UNCOV
773
        tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
×
774

UNCOV
775
        if (ts < earlyTs) {
×
776
          tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
×
777
                  ts, earlyTs);
778
          taosArrayDestroy(pTableData->aRowP);
×
779
          pTableData->aRowP = NULL;
×
780
          taosArrayDestroy(pVals);
×
781
          return TSDB_CODE_SUCCESS;
×
782
        }
783
      }
784

UNCOV
785
      if (IS_SET_NULL(pCol)) {
×
UNCOV
786
        if (pCol->flags & COL_IS_KEY) {
×
787
          qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
×
788
                 pCol->colId, pCol->type);
789
          break;
×
790
        }
UNCOV
791
        SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
UNCOV
792
        void* p = taosArrayPush(pVals, &cv);
×
UNCOV
793
        if (p == NULL) {
×
794
          return terrno;
×
795
        }
796
      } else {
UNCOV
797
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
×
UNCOV
798
        if (pColData == NULL) {
×
799
          continue;
×
800
        }
801

UNCOV
802
        if (colDataIsNull_s(pColData, j)) {
×
UNCOV
803
          if (pCol->flags & COL_IS_KEY) {
×
804
            qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
×
805
                   ts, pCol->colId, pCol->type);
806
            break;
×
807
          }
808

UNCOV
809
          SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
UNCOV
810
          void* p = taosArrayPush(pVals, &cv);
×
UNCOV
811
          if (p == NULL) {
×
812
            return terrno;
×
813
          }
814

UNCOV
815
          dataIndex++;
×
816
        } else {
UNCOV
817
          void* colData = colDataGetData(pColData, j);
×
UNCOV
818
          if (IS_VAR_DATA_TYPE(pCol->type)) {  // address copy, no value
×
UNCOV
819
            SValue sv =
×
UNCOV
820
                (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
×
UNCOV
821
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
UNCOV
822
            void* p = taosArrayPush(pVals, &cv);
×
UNCOV
823
            if (p == NULL) {
×
824
              return terrno;
×
825
            }
826
          } else {
UNCOV
827
            SValue sv = {.type = pCol->type};
×
UNCOV
828
            memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
×
UNCOV
829
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
UNCOV
830
            void* p = taosArrayPush(pVals, &cv);
×
UNCOV
831
            if (p == NULL) {
×
832
              return terrno;
×
833
            }
834
          }
UNCOV
835
          dataIndex++;
×
836
        }
837
      }
838
    }
839

UNCOV
840
    SRow* pRow = NULL;
×
UNCOV
841
    code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
×
UNCOV
842
    if (code != TSDB_CODE_SUCCESS) {
×
843
      tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
×
844
      taosArrayDestroy(pVals);
×
845
      tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
×
846
      return code;
×
847
    }
848

UNCOV
849
    void* p = taosArrayPush(pTableData->aRowP, &pRow);
×
UNCOV
850
    if (p == NULL) {
×
851
      return terrno;
×
852
    }
853
  }
854

UNCOV
855
  taosArrayDestroy(pVals);
×
UNCOV
856
  return TSDB_CODE_SUCCESS;
×
857
}
858

UNCOV
859
int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
×
860
                                 const char* dstTableName, int64_t* uid) {
UNCOV
861
  int32_t     vgId = TD_VID(pVnode);
×
UNCOV
862
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
×
UNCOV
863
  const char* id = pTask->id.idStr;
×
864

UNCOV
865
  while (pTableSinkInfo->uid == 0) {
×
UNCOV
866
    if (streamTaskShouldStop(pTask)) {
×
UNCOV
867
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
×
UNCOV
868
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
×
869
    }
870

871
    // wait for the table to be created
UNCOV
872
    SMetaReader mr = {0};
×
UNCOV
873
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
×
874

UNCOV
875
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
×
UNCOV
876
    if (code == 0) {  // table already exists, check its type and uid
×
UNCOV
877
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
×
UNCOV
878
      if (isValid) {  // not valid table, ignore it
×
UNCOV
879
        tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
×
880
        // set the destination table uid
UNCOV
881
        (*uid) = mr.me.uid;
×
UNCOV
882
        pTableSinkInfo->uid = mr.me.uid;
×
883
      }
884

UNCOV
885
      metaReaderClear(&mr);
×
UNCOV
886
      return terrno;
×
887
    } else {  // not exist, wait and retry
UNCOV
888
      metaReaderClear(&mr);
×
UNCOV
889
      taosMsleep(100);
×
UNCOV
890
      tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
×
891
    }
892
  }
893

894
  return TSDB_CODE_SUCCESS;
×
895
}
896

UNCOV
897
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
×
UNCOV
898
  int32_t vgId = TD_VID(pVnode);
×
UNCOV
899
  int64_t suid = pTask->outputInfo.tbSink.stbUid;
×
UNCOV
900
  const char* id = pTask->id.idStr;
×
901

UNCOV
902
  while (1) {
×
UNCOV
903
    if (streamTaskShouldStop(pTask)) {
×
904
      tqDebug("s-task:%s task will stop, quit from waiting for table:%s drop", id, dstTableName);
×
905
      return TSDB_CODE_STREAM_EXEC_CANCELLED;
×
906
    }
UNCOV
907
    SMetaReader mr = {0};
×
UNCOV
908
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
×
UNCOV
909
    int32_t code = metaGetTableEntryByName(&mr, dstTableName);
×
UNCOV
910
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
UNCOV
911
      metaReaderClear(&mr);
×
UNCOV
912
      break;
×
UNCOV
913
    } else if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
914
      if (isValidDstChildTable(&mr, vgId, dstTableName, suid)) {
×
UNCOV
915
        metaReaderClear(&mr);
×
UNCOV
916
        taosMsleep(100);
×
UNCOV
917
        tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
×
918
      } else {
919
        metaReaderClear(&mr);
×
920
        break;
×
921
      }
922
    } else {
923
      tqError("s-task:%s failed to wait for table:%s drop", id, dstTableName);
×
924
      metaReaderClear(&mr);
×
925
      return terrno;
×
926
    }
927
  }
UNCOV
928
  return TSDB_CODE_SUCCESS;
×
929
}
930

UNCOV
931
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
×
UNCOV
932
  int32_t nameLen = strlen(pDstTableName);
×
UNCOV
933
  (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
×
UNCOV
934
  if (*pInfo == NULL) {
×
935
    return terrno;
×
936
  }
937

UNCOV
938
  (*pInfo)->name.len = nameLen;
×
UNCOV
939
  memcpy((*pInfo)->name.data, pDstTableName, nameLen);
×
UNCOV
940
  return TSDB_CODE_SUCCESS;
×
941
}
942

UNCOV
943
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
×
944
                           SSubmitTbData* pTableData) {
UNCOV
945
  uint64_t        groupId = pDataBlock->info.id.groupId;
×
UNCOV
946
  char*           dstTableName = pDataBlock->info.parTbName;
×
UNCOV
947
  int32_t         numOfRows = pDataBlock->info.rows;
×
UNCOV
948
  const char*     id = pTask->id.idStr;
×
UNCOV
949
  int64_t         suid = pTask->outputInfo.tbSink.stbUid;
×
UNCOV
950
  STSchema*       pTSchema = pTask->outputInfo.tbSink.pTSchema;
×
UNCOV
951
  int32_t         vgId = TD_VID(pVnode);
×
UNCOV
952
  STableSinkInfo* pTableSinkInfo = NULL;
×
UNCOV
953
  int32_t         code = 0;
×
954

UNCOV
955
  bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo);
×
956

UNCOV
957
  if (alreadyCached) {
×
UNCOV
958
    if (dstTableName[0] == 0) {  // data block does not set the destination table name
×
UNCOV
959
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
×
UNCOV
960
      tqDebug("s-task:%s vgId:%d, gropuId:%" PRIu64 " datablock table name is null, set name:%s", id, vgId, groupId,
×
961
              dstTableName);
962
    } else {
UNCOV
963
      tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1);
×
UNCOV
964
      if (pTableSinkInfo->uid != 0) {
×
UNCOV
965
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(uid:%" PRIu64 ")", id, numOfRows, groupId,
×
966
                dstTableName, pTableSinkInfo->uid);
967
      } else {
UNCOV
968
        tqDebug("s-task:%s write %d rows into groupId:%" PRIu64 " dstTable:%s(not set uid yet for the secondary block)",
×
969
                id, numOfRows, groupId, dstTableName);
970
      }
971
    }
972
  } else {  // this groupId has not been kept in cache yet
UNCOV
973
    if (dstTableName[0] == 0) {
×
UNCOV
974
      memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
×
UNCOV
975
      code = buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
×
UNCOV
976
      if (code) {
×
977
        tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
×
978
        return code;
×
979
      }
980
    } else {
UNCOV
981
      if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
×
982
          !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
×
983
        tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
×
984
        if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
985
          code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
986
        } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
×
987
          code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
×
988
        }
989
        if (code != TSDB_CODE_SUCCESS) {
×
990
          return code;
×
991
        }
992
      }
993
    }
994

UNCOV
995
    code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo);
×
UNCOV
996
    if (code == 0) {
×
UNCOV
997
      tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
×
998
    } else {
999
      tqDebug("s-task:%s failed to build new sinkTableInfo, dstTable:%s", id, dstTableName);
×
1000
      return code;
×
1001
    }
1002
  }
1003

UNCOV
1004
  if (alreadyCached) {
×
UNCOV
1005
    pTableData->uid = pTableSinkInfo->uid;
×
1006

UNCOV
1007
    if (pTableData->uid == 0) {
×
UNCOV
1008
      tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
×
UNCOV
1009
      return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
×
1010
    } else {
UNCOV
1011
      tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
×
1012
    }
1013
  } else {
1014
    // The auto-create option will always set to be open for those submit messages, which arrives during the period
1015
    // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
1016
    // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
1017
    // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
1018
    // randomly generated, but false table uid in the WAL.
UNCOV
1019
    SMetaReader mr = {0};
×
UNCOV
1020
    metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
×
1021

1022
    // table not in cache, let's try to extract it from tsdb meta
UNCOV
1023
    if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
×
UNCOV
1024
      metaReaderClear(&mr);
×
1025

UNCOV
1026
      if (pTask->outputInfo.tbSink.autoCreateCtb) {
×
UNCOV
1027
        tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
×
1028

UNCOV
1029
        SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
×
UNCOV
1030
        if (pTagArray == NULL) {
×
1031
          tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId,
×
1032
                  tstrerror(terrno));
1033
          return terrno;
×
1034
        }
1035

UNCOV
1036
        pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
×
UNCOV
1037
        code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
×
UNCOV
1038
                                       IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq);
×
UNCOV
1039
        taosArrayDestroy(pTagArray);
×
1040

UNCOV
1041
        if (code) {
×
1042
          tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName, tstrerror(code));
×
1043
          taosMemoryFree(pTableSinkInfo);
×
1044
          return code;
×
1045
        }
1046

UNCOV
1047
        pTableSinkInfo->uid = 0;
×
UNCOV
1048
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
×
1049
      } else {
1050
        metaReaderClear(&mr);
×
1051

1052
        tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
×
1053
                dstTableName);
1054
        return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1055
      }
1056
    } else {
UNCOV
1057
      bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
×
UNCOV
1058
      if (!isValid) {
×
1059
        metaReaderClear(&mr);
×
1060
        taosMemoryFree(pTableSinkInfo);
×
1061
        tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
×
1062
                dstTableName);
1063
        return terrno;
×
1064
      } else {
UNCOV
1065
        pTableData->uid = mr.me.uid;
×
UNCOV
1066
        pTableSinkInfo->uid = mr.me.uid;
×
1067

UNCOV
1068
        metaReaderClear(&mr);
×
UNCOV
1069
        code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
×
1070
      }
1071
    }
1072
  }
1073

UNCOV
1074
  return code;
×
1075
}
1076

UNCOV
1077
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
×
1078
                                 SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
UNCOV
1079
  int32_t numOfRows = pDataBlock->info.rows;
×
UNCOV
1080
  char*   dstTableName = pDataBlock->info.parTbName;
×
1081

UNCOV
1082
  tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
×
1083
          blockIndex + 1, numOfRows, suid);
1084

1085
  // convert all rows
UNCOV
1086
  int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id);
×
UNCOV
1087
  if (code != TSDB_CODE_SUCCESS) {
×
1088
    tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
×
1089
    return code;
×
1090
  }
1091

UNCOV
1092
  if (pTableData->aRowP != NULL) {
×
UNCOV
1093
    taosArraySort(pTableData->aRowP, tsAscendingSortFn);
×
UNCOV
1094
    tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
×
1095
  }
1096

UNCOV
1097
  return code;
×
1098
}
1099

UNCOV
1100
int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) {
×
UNCOV
1101
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1102
  const char*      id = pTask->id.idStr;
×
UNCOV
1103
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
×
UNCOV
1104
  int32_t          vgId = pTask->pMeta->vgId;
×
1105

UNCOV
1106
  if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
×
UNCOV
1107
    SMetaReader mer1 = {0};
×
UNCOV
1108
    metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
×
1109

UNCOV
1110
    code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
×
UNCOV
1111
    if (code != TSDB_CODE_SUCCESS) {
×
1112
      tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
×
1113
      metaReaderClear(&mer1);
×
1114
      return code;
×
1115
    }
1116

UNCOV
1117
    pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
×
UNCOV
1118
    metaReaderClear(&mer1);
×
1119

UNCOV
1120
    if (pOutputInfo->tbSink.pTagSchema == NULL) {
×
UNCOV
1121
      tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno));
×
UNCOV
1122
      return terrno;
×
1123
    }
1124

UNCOV
1125
    SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
×
UNCOV
1126
    SSchema*        pCol1 = &pTagSchema->pSchema[0];
×
UNCOV
1127
    if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
×
UNCOV
1128
      pOutputInfo->tbSink.autoCreateCtb = true;
×
1129
    } else {
UNCOV
1130
      pOutputInfo->tbSink.autoCreateCtb = false;
×
1131
    }
1132
  }
1133

UNCOV
1134
  return code;
×
1135
}
1136

UNCOV
1137
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
×
UNCOV
1138
  const SArray*    pBlocks = (const SArray*)data;
×
UNCOV
1139
  SVnode*          pVnode = (SVnode*)vnode;
×
UNCOV
1140
  int64_t          suid = pTask->outputInfo.tbSink.stbUid;
×
UNCOV
1141
  char*            stbFullName = pTask->outputInfo.tbSink.stbFullName;
×
UNCOV
1142
  STSchema*        pTSchema = pTask->outputInfo.tbSink.pTSchema;
×
UNCOV
1143
  int32_t          vgId = TD_VID(pVnode);
×
UNCOV
1144
  int32_t          numOfBlocks = taosArrayGetSize(pBlocks);
×
UNCOV
1145
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1146
  const char*      id = pTask->id.idStr;
×
UNCOV
1147
  int64_t          earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
×
UNCOV
1148
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
×
1149

UNCOV
1150
  code = checkTagSchema(pTask, pVnode);
×
UNCOV
1151
  if (code != TSDB_CODE_SUCCESS) {
×
1152
    return;
×
1153
  }
1154

UNCOV
1155
  bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
×
UNCOV
1156
  if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
×
UNCOV
1157
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
×
1158
            numOfBlocks);
1159

UNCOV
1160
    for (int32_t i = 0; i < numOfBlocks; ++i) {
×
UNCOV
1161
      if (streamTaskShouldStop(pTask)) {
×
1162
        return;
×
1163
      }
1164

UNCOV
1165
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
×
UNCOV
1166
      if (pDataBlock == NULL) {
×
1167
        continue;
×
1168
      }
1169

UNCOV
1170
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
×
UNCOV
1171
        code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
×
UNCOV
1172
      } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
×
UNCOV
1173
        code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
×
UNCOV
1174
      } else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
×
1175
        continue;
×
UNCOV
1176
      } else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
×
UNCOV
1177
        code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
×
1178
      } else {
UNCOV
1179
        code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
×
1180
      }
1181
    }
1182
  } else {
UNCOV
1183
    tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
×
UNCOV
1184
    if (streamTaskShouldStop(pTask)) {
×
1185
      return;
×
1186
    }
1187

UNCOV
1188
    rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
×
1189
  }
1190
}
1191

1192
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
UNCOV
1193
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
×
UNCOV
1194
  for (int32_t i = 0; i < numOfBlocks; ++i) {
×
UNCOV
1195
    SSDataBlock* p = taosArrayGet(pBlocks, i);
×
UNCOV
1196
    if (p == NULL) {
×
1197
      continue;
×
1198
    }
1199

UNCOV
1200
    if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
×
UNCOV
1201
      return false;
×
1202
    }
1203
  }
1204

UNCOV
1205
  return true;
×
1206
}
1207

UNCOV
1208
int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
×
UNCOV
1209
  int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
×
UNCOV
1210
  if (code != TSDB_CODE_SUCCESS) {
×
1211
    taosMemoryFreeClear(pTableSinkInfo);
×
1212
  } else {
UNCOV
1213
    tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
×
1214
            pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
1215
  }
1216

UNCOV
1217
  return code;
×
1218
}
1219

UNCOV
1220
bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
×
UNCOV
1221
  void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
×
UNCOV
1222
  if (pVal) {
×
UNCOV
1223
    *pInfo = *(STableSinkInfo**)pVal;
×
UNCOV
1224
    return true;
×
1225
  }
1226

UNCOV
1227
  return false;
×
1228
}
1229

1230
int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
×
1231
  if (tSimpleHashGetSize(pSinkTableMap) == 0) {
×
1232
    return TSDB_CODE_SUCCESS;
×
1233
  }
1234

1235
  int32_t code = tSimpleHashRemove(pSinkTableMap, &groupId, sizeof(groupId));
×
1236
  if (code == 0) {
×
1237
    tqDebug("s-task:%s remove cached table meta for groupId:%" PRId64, id, groupId);
×
1238
  } else {
1239
    tqError("s-task:%s failed to remove table meta from hashmap, groupId:%" PRId64, id, groupId);
×
1240
  }
1241
  return code;
×
1242
}
1243

UNCOV
1244
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
×
1245
                                int64_t suid) {
UNCOV
1246
  SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
×
UNCOV
1247
  if (deleteReq.deleteReqs == NULL) {
×
1248
    return terrno;
×
1249
  }
1250

1251
  int32_t code =
UNCOV
1252
      tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask));
×
UNCOV
1253
  if (code != TSDB_CODE_SUCCESS) {
×
1254
    return code;
×
1255
  }
1256

UNCOV
1257
  if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
×
1258
    taosArrayDestroy(deleteReq.deleteReqs);
×
1259
    return TSDB_CODE_SUCCESS;
×
1260
  }
1261

1262
  int32_t len;
UNCOV
1263
  tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
×
UNCOV
1264
  if (code != TSDB_CODE_SUCCESS) {
×
1265
    qError("s-task:%s failed to encode delete request", pTask->id.idStr);
×
1266
    return code;
×
1267
  }
1268

UNCOV
1269
  SEncoder encoder = {0};
×
UNCOV
1270
  void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
×
UNCOV
1271
  void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
×
UNCOV
1272
  tEncoderInit(&encoder, abuf, len);
×
UNCOV
1273
  code = tEncodeSBatchDeleteReq(&encoder, &deleteReq);
×
UNCOV
1274
  tEncoderClear(&encoder);
×
UNCOV
1275
  taosArrayDestroy(deleteReq.deleteReqs);
×
1276

UNCOV
1277
  if (code) {
×
1278
    return code;
×
1279
  }
1280

UNCOV
1281
  ((SMsgHead*)serializedDeleteReq)->vgId = TD_VID(pVnode);
×
1282

UNCOV
1283
  SRpcMsg msg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead)};
×
UNCOV
1284
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
×
1285
    tqDebug("failed to put delete req into write-queue since %s", terrstr());
×
1286
  }
1287

UNCOV
1288
  return TSDB_CODE_SUCCESS;
×
1289
}
1290

UNCOV
1291
void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
×
UNCOV
1292
  int32_t     code = 0;
×
UNCOV
1293
  const char* id = pTask->id.idStr;
×
UNCOV
1294
  int32_t     vgId = pTask->pMeta->vgId;
×
UNCOV
1295
  int32_t     numOfBlocks = taosArrayGetSize(pBlocks);
×
UNCOV
1296
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
×
UNCOV
1297
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
×
UNCOV
1298
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
×
1299

1300
  SHashObj* pTableIndexMap =
UNCOV
1301
      taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1302

UNCOV
1303
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
×
UNCOV
1304
  if (submitReq.aSubmitTbData == NULL) {
×
1305
    code = terrno;
×
1306
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
×
1307
    taosHashCleanup(pTableIndexMap);
×
1308
    return;
×
1309
  }
1310

UNCOV
1311
  bool hasSubmit = false;
×
UNCOV
1312
  for (int32_t i = 0; i < numOfBlocks; i++) {
×
UNCOV
1313
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
×
UNCOV
1314
    if (pDataBlock == NULL) {
×
1315
      continue;
×
1316
    }
1317

UNCOV
1318
    if (pDataBlock->info.type == STREAM_CHECKPOINT) {
×
1319
      continue;
×
1320
    }
1321

UNCOV
1322
    hasSubmit = true;
×
UNCOV
1323
    pTask->execInfo.sink.numOfBlocks += 1;
×
UNCOV
1324
    uint64_t groupId = pDataBlock->info.id.groupId;
×
1325

UNCOV
1326
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
×
1327

UNCOV
1328
    int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
×
UNCOV
1329
    if (index == NULL) {  // no data yet, append it
×
UNCOV
1330
      code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
×
UNCOV
1331
      if (code != TSDB_CODE_SUCCESS) {
×
1332
        tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
×
1333
        continue;
×
1334
      }
1335

UNCOV
1336
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
×
UNCOV
1337
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
×
1338
        if (tbData.pCreateTbReq != NULL) {
×
1339
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1340
          (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id);
×
1341
          tbData.pCreateTbReq = NULL;
×
1342
        }
1343
        continue;
×
1344
      }
1345

UNCOV
1346
      void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
×
UNCOV
1347
      if (p == NULL) {
×
1348
        tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
×
1349
        continue;
×
1350
      }
1351

UNCOV
1352
      int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
×
UNCOV
1353
      code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
×
UNCOV
1354
      if (code) {
×
1355
        tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
×
1356
        continue;
×
1357
      }
1358
    } else {
UNCOV
1359
      code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
×
UNCOV
1360
      if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
×
1361
        if (tbData.pCreateTbReq != NULL) {
×
1362
          tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1363
          tbData.pCreateTbReq = NULL;
×
1364
        }
1365
        continue;
×
1366
      }
1367

UNCOV
1368
      SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
×
UNCOV
1369
      if (pExisted == NULL) {
×
1370
        continue;
×
1371
      }
1372

UNCOV
1373
      code = doMergeExistedRows(pExisted, &tbData, id);
×
UNCOV
1374
      if (code != TSDB_CODE_SUCCESS) {
×
1375
        continue;
×
1376
      }
1377
    }
1378

UNCOV
1379
    pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
×
1380
  }
1381

UNCOV
1382
  taosHashCleanup(pTableIndexMap);
×
1383

UNCOV
1384
  if (hasSubmit) {
×
UNCOV
1385
    code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
×
UNCOV
1386
    if (code) {  // failed and continue
×
1387
      tqError("vgId:%d failed to build and send submit msg", vgId);
×
1388
    }
1389
  } else {
1390
    tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
×
1391
    tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
×
1392
  }
1393
}
1394

UNCOV
1395
int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) {
×
UNCOV
1396
  int32_t     code = 0;
×
UNCOV
1397
  STSchema*   pTSchema = pTask->outputInfo.tbSink.pTSchema;
×
UNCOV
1398
  int64_t     suid = pTask->outputInfo.tbSink.stbUid;
×
UNCOV
1399
  const char* id = pTask->id.idStr;
×
UNCOV
1400
  int32_t     vgId = TD_VID(pVnode);
×
UNCOV
1401
  char*       stbFullName = pTask->outputInfo.tbSink.stbFullName;
×
1402

UNCOV
1403
  pTask->execInfo.sink.numOfBlocks += 1;
×
1404

UNCOV
1405
  SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
×
UNCOV
1406
  if (submitReq.aSubmitTbData == NULL) {
×
1407
    tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno));
×
1408
    return terrno;
×
1409
  }
1410

UNCOV
1411
  SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
×
UNCOV
1412
  code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
×
UNCOV
1413
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1414
    tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
×
UNCOV
1415
    return code;
×
1416
  }
1417

UNCOV
1418
  code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id);
×
UNCOV
1419
  if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
×
UNCOV
1420
    if (tbData.pCreateTbReq != NULL) {
×
1421
      tdDestroySVCreateTbReq(tbData.pCreateTbReq);
×
1422
      (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id);
×
1423
      tbData.pCreateTbReq = NULL;
×
1424
    }
1425

UNCOV
1426
    return code;
×
1427
  }
1428

UNCOV
1429
  void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
×
UNCOV
1430
  if (p == NULL) {
×
1431
    tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
×
1432
    return terrno;
×
1433
  }
1434

UNCOV
1435
  code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
×
UNCOV
1436
  if (code) {  // failed and continue
×
1437
    tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code));
×
1438
  }
1439

UNCOV
1440
  return code;
×
1441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc