• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.42
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

UNCOV
23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
×
UNCOV
24
  if (pRsp == NULL) {
×
25
    return TSDB_CODE_INVALID_PARA;
×
26
  }
UNCOV
27
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
×
UNCOV
28
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
×
29

UNCOV
30
  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
×
31
    return terrno;
×
32
  }
33

UNCOV
34
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
×
UNCOV
35
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
×
UNCOV
36
  pRsp->withTbName = 0;
×
UNCOV
37
  pRsp->withSchema = false;
×
UNCOV
38
  return 0;
×
39
}
40

41
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
1,003✔
42
  SSyncState state = syncGetState(pTq->pVnode->sync);
1,003✔
43
  streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
1,003✔
44
}
1,003✔
45

UNCOV
46
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
×
UNCOV
47
  if (pRsp == NULL) {
×
48
    return TSDB_CODE_INVALID_PARA;
×
49
  }
UNCOV
50
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
×
UNCOV
51
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
×
52

UNCOV
53
  pRsp->withTbName = 1;
×
UNCOV
54
  pRsp->withSchema = 1;
×
UNCOV
55
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
×
UNCOV
56
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
×
UNCOV
57
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
×
UNCOV
58
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
×
59

UNCOV
60
  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL ||
×
UNCOV
61
      pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
×
62
    if (pRsp->blockData != NULL) {
×
63
      taosArrayDestroy(pRsp->blockData);
×
64
      pRsp->blockData = NULL;
×
65
    }
66

67
    if (pRsp->blockDataLen != NULL) {
×
68
      taosArrayDestroy(pRsp->blockDataLen);
×
69
      pRsp->blockDataLen = NULL;
×
70
    }
71

72
    if (pRsp->blockTbName != NULL) {
×
73
      taosArrayDestroy(pRsp->blockTbName);
×
74
      pRsp->blockTbName = NULL;
×
75
    }
76

77
    if (pRsp->blockSchema != NULL) {
×
78
      taosArrayDestroy(pRsp->blockSchema);
×
79
      pRsp->blockSchema = NULL;
×
80
    }
81

82
    return terrno;
×
83
  }
84

UNCOV
85
  return 0;
×
86
}
87

UNCOV
88
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
×
89
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
UNCOV
90
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
×
91
    return TSDB_CODE_INVALID_PARA;
×
92
  }
UNCOV
93
  uint64_t   consumerId = pRequest->consumerId;
×
UNCOV
94
  STqOffset* pOffset = NULL;
×
UNCOV
95
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
×
UNCOV
96
  int32_t    vgId = TD_VID(pTq->pVnode);
×
97

UNCOV
98
  *pBlockReturned = false;
×
99
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
UNCOV
100
  if (code == 0) {
×
UNCOV
101
    tOffsetCopy(pOffsetVal, &pOffset->val);
×
102

UNCOV
103
    char formatBuf[TSDB_OFFSET_LEN] = {0};
×
UNCOV
104
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
×
UNCOV
105
    tqDebug("tmq poll: consumer:0x%" PRIx64
×
106
            ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
107
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
UNCOV
108
    return 0;
×
109
  } else {
110
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
UNCOV
111
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
×
UNCOV
112
      if (pRequest->useSnapshot) {
×
UNCOV
113
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
×
114
                consumerId, pHandle->subKey, vgId);
UNCOV
115
        if (pHandle->fetchMeta) {
×
116
          tqOffsetResetToMeta(pOffsetVal, 0);
117
        } else {
UNCOV
118
          SValue val = {0};
×
UNCOV
119
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
×
120
        }
121
      } else {
UNCOV
122
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
×
UNCOV
123
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
×
124
      }
UNCOV
125
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
×
UNCOV
126
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
×
UNCOV
127
      SMqDataRsp dataRsp = {0};
×
UNCOV
128
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
×
129

UNCOV
130
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
×
UNCOV
131
      if (code != 0) {
×
132
        return code;
×
133
      }
UNCOV
134
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
×
135
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
UNCOV
136
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
×
UNCOV
137
      tDeleteMqDataRsp(&dataRsp);
×
138

UNCOV
139
      *pBlockReturned = true;
×
UNCOV
140
      return code;
×
UNCOV
141
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
×
UNCOV
142
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
×
143
              " in vg %d, subkey %s, reset none failed",
144
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
UNCOV
145
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
×
146
    }
147
  }
148

UNCOV
149
  return 0;
×
150
}
151

UNCOV
152
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
×
153
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
UNCOV
154
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pOffset == NULL) {
×
155
    return TSDB_CODE_INVALID_PARA;
×
156
  }
UNCOV
157
  uint64_t consumerId = pRequest->consumerId;
×
UNCOV
158
  int32_t  vgId = TD_VID(pTq->pVnode);
×
UNCOV
159
  terrno = 0;
×
160

UNCOV
161
  SMqDataRsp dataRsp = {0};
×
UNCOV
162
  int code = tqInitDataRsp(&dataRsp, *pOffset);
×
UNCOV
163
  if (code != 0) {
×
164
    goto end;
×
165
  }
166

UNCOV
167
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
×
UNCOV
168
  if (code != 0) {
×
169
    goto end;
×
170
  }
171

UNCOV
172
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
×
UNCOV
173
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
×
UNCOV
174
    goto end;
×
175
  }
176

177
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
UNCOV
178
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
×
179
    // lock
UNCOV
180
    taosWLockLatch(&pTq->lock);
×
UNCOV
181
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
×
UNCOV
182
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
×
UNCOV
183
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
×
UNCOV
184
      taosWUnLockLatch(&pTq->lock);
×
UNCOV
185
      goto end;
×
186
    }
UNCOV
187
    taosWUnLockLatch(&pTq->lock);
×
188
  }
189

190
  // reqOffset represents the current date offset, may be changed if wal not exists
UNCOV
191
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
×
UNCOV
192
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
×
193

UNCOV
194
end : {
×
UNCOV
195
  char buf[TSDB_OFFSET_LEN] = {0};
×
UNCOV
196
  tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
×
UNCOV
197
  tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64
×
198
          " code:%d",
199
          consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
UNCOV
200
  tDeleteMqDataRsp(&dataRsp);
×
UNCOV
201
  return code;
×
202
}
203
}
204

205
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
206
  SDecoder decoder = {0};                                                                                  \
207
  TYPE     req = {0};                                                                                      \
208
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
209
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
210
  tDecoderInit(&decoder, data, len);                                                                       \
211
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
212
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
213
            " msgType %d",                                                                                 \
214
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
215
    fetchVer++;                                                                                            \
216
    DELETE_FUNC(&req);                                                                                     \
217
    tDecoderClear(&decoder);                                                                               \
218
    continue;                                                                                              \
219
  }                                                                                                        \
220
  DELETE_FUNC(&req);                                                                                       \
221
  tDecoderClear(&decoder);
222

UNCOV
223
static void tDeleteCommon(void* parm) {}
×
224

UNCOV
225
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
×
226
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
UNCOV
227
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || offset == NULL) {
×
228
    return TSDB_CODE_INVALID_PARA;
×
229
  }
UNCOV
230
  int32_t         vgId = TD_VID(pTq->pVnode);
×
UNCOV
231
  SMqDataRsp      taosxRsp = {0};
×
UNCOV
232
  SMqBatchMetaRsp btMetaRsp = {0};
×
UNCOV
233
  int32_t         code = 0;
×
234

UNCOV
235
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
×
UNCOV
236
  if (offset->type != TMQ_OFFSET__LOG) {
×
UNCOV
237
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset));
×
238

UNCOV
239
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
×
UNCOV
240
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
UNCOV
241
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
×
242
              ",ts:%" PRId64,
243
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,
244
              btMetaRsp.rspOffset.ts);
UNCOV
245
      goto END;
×
246
    }
247

UNCOV
248
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
×
249
            ",ts:%" PRId64,
250
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
251
            taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
UNCOV
252
    if (taosxRsp.blockNum > 0) {
×
UNCOV
253
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
×
UNCOV
254
      goto END;
×
255
    } else {
UNCOV
256
      tOffsetCopy(offset, &taosxRsp.rspOffset);
×
257
    }
258
  }
259

UNCOV
260
  if (offset->type == TMQ_OFFSET__LOG) {
×
UNCOV
261
    walReaderVerifyOffset(pHandle->pWalReader, offset);
×
UNCOV
262
    int64_t fetchVer = offset->version;
×
263

UNCOV
264
    uint64_t st = taosGetTimestampMs();
×
UNCOV
265
    int      totalRows = 0;
×
UNCOV
266
    int32_t  totalMetaRows = 0;
×
UNCOV
267
    while (1) {
×
UNCOV
268
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
×
UNCOV
269
      if (savedEpoch > pRequest->epoch) {
×
270
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
271
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
272
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
UNCOV
273
        goto END;
×
274
      }
275

UNCOV
276
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
×
UNCOV
277
        if (totalMetaRows > 0) {
×
UNCOV
278
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
UNCOV
279
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
UNCOV
280
          if (totalRows != 0) {
×
281
            tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, totalRows error, vgId:%d offset %" PRId64,
×
282
                    pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
283
            code = code == 0 ? TSDB_CODE_TQ_INTERNAL_ERROR : code;
×
284
          }
UNCOV
285
          goto END;
×
286
        }
UNCOV
287
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
×
UNCOV
288
        code = tqSendDataRsp(
×
289
            pHandle, pMsg, pRequest, &taosxRsp,
UNCOV
290
            taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
×
UNCOV
291
        goto END;
×
292
      }
293

UNCOV
294
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
×
UNCOV
295
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
×
296
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
297

298
      // process meta
UNCOV
299
      if (pHead->msgType != TDMT_VND_SUBMIT) {
×
UNCOV
300
        if (totalRows > 0) {
×
UNCOV
301
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
×
UNCOV
302
          code = tqSendDataRsp(
×
303
              pHandle, pMsg, pRequest, &taosxRsp,
UNCOV
304
              taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
×
UNCOV
305
          goto END;
×
306
        }
307

UNCOV
308
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
×
UNCOV
309
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
×
UNCOV
310
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
×
UNCOV
311
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
×
UNCOV
312
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
×
UNCOV
313
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
×
UNCOV
314
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
×
UNCOV
315
          } else if (pHead->msgType == TDMT_VND_DELETE) {
×
UNCOV
316
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
×
317
          }
318
        }
319

UNCOV
320
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
×
321
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
UNCOV
322
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
×
UNCOV
323
          SMqMetaRsp metaRsp = {0};
×
UNCOV
324
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
×
UNCOV
325
          metaRsp.resMsgType = pHead->msgType;
×
UNCOV
326
          metaRsp.metaRspLen = pHead->bodyLen;
×
UNCOV
327
          metaRsp.metaRsp = pHead->body;
×
UNCOV
328
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
×
UNCOV
329
          goto END;
×
330
        }
331

UNCOV
332
        if (!btMetaRsp.batchMetaReq) {
×
UNCOV
333
          btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
×
UNCOV
334
          if (btMetaRsp.batchMetaReq == NULL) {
×
335
            code = TAOS_GET_TERRNO(terrno);
×
336
            goto END;
×
337
          }
UNCOV
338
          btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
×
UNCOV
339
          if (btMetaRsp.batchMetaLen == NULL) {
×
340
            code = TAOS_GET_TERRNO(terrno);
×
341
            goto END;
×
342
          }
343
        }
UNCOV
344
        fetchVer++;
×
345

UNCOV
346
        SMqMetaRsp tmpMetaRsp = {0};
×
UNCOV
347
        tmpMetaRsp.resMsgType = pHead->msgType;
×
UNCOV
348
        tmpMetaRsp.metaRspLen = pHead->bodyLen;
×
UNCOV
349
        tmpMetaRsp.metaRsp = pHead->body;
×
UNCOV
350
        uint32_t len = 0;
×
UNCOV
351
        tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
×
UNCOV
352
        if (TSDB_CODE_SUCCESS != code) {
×
353
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
354
          continue;
×
355
        }
UNCOV
356
        int32_t tLen = sizeof(SMqRspHead) + len;
×
UNCOV
357
        void*   tBuf = taosMemoryCalloc(1, tLen);
×
UNCOV
358
        if (tBuf == NULL) {
×
359
          code = TAOS_GET_TERRNO(terrno);
×
360
          goto END;
×
361
        }
UNCOV
362
        void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
×
UNCOV
363
        SEncoder encoder = {0};
×
UNCOV
364
        tEncoderInit(&encoder, metaBuff, len);
×
UNCOV
365
        code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
×
UNCOV
366
        tEncoderClear(&encoder);
×
367

UNCOV
368
        if (code < 0) {
×
369
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
370
          continue;
×
371
        }
UNCOV
372
        if (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf) == NULL) {
×
373
          code = TAOS_GET_TERRNO(terrno);
×
374
          goto END;
×
375
        }
UNCOV
376
        if (taosArrayPush(btMetaRsp.batchMetaLen, &tLen) == NULL) {
×
377
          code = TAOS_GET_TERRNO(terrno);
×
378
          goto END;
×
379
        }
UNCOV
380
        totalMetaRows++;
×
UNCOV
381
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) {
×
382
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
383
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
384
          goto END;
×
385
        }
UNCOV
386
        continue;
×
387
      }
388

UNCOV
389
      if (totalMetaRows > 0) {
×
390
        tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
391
        code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
392
        goto END;
×
393
      }
394

395
      // process data
UNCOV
396
      SPackedData submit = {
×
UNCOV
397
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
×
UNCOV
398
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
×
UNCOV
399
          .ver = pHead->version,
×
400
      };
401

UNCOV
402
      code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded);
×
UNCOV
403
      if (code < 0) {
×
404
        tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
405
                pRequest->subKey);
406
        goto END;
×
407
      }
408

UNCOV
409
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
×
UNCOV
410
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
×
UNCOV
411
        code = tqSendDataRsp(
×
412
            pHandle, pMsg, pRequest, &taosxRsp,
UNCOV
413
            taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
×
UNCOV
414
        goto END;
×
415
      } else {
UNCOV
416
        fetchVer++;
×
417
      }
418
    }
419
  }
420

421
END:
×
UNCOV
422
  tDeleteMqBatchMetaRsp(&btMetaRsp);
×
UNCOV
423
  tDeleteSTaosxRsp(&taosxRsp);
×
UNCOV
424
  return code;
×
425
}
426

UNCOV
427
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
×
UNCOV
428
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
×
429
    return TSDB_CODE_TMQ_INVALID_MSG;
×
430
  }
UNCOV
431
  int32_t      code = 0;
×
UNCOV
432
  STqOffsetVal reqOffset = {0};
×
UNCOV
433
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
×
434

435
  // reset the offset if needed
UNCOV
436
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
×
UNCOV
437
    bool blockReturned = false;
×
UNCOV
438
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
×
UNCOV
439
    if (code != 0) {
×
UNCOV
440
      goto END;
×
441
    }
442

443
    // empty block returned, quit
UNCOV
444
    if (blockReturned) {
×
UNCOV
445
      goto END;
×
446
    }
UNCOV
447
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
×
448
    uError("req offset type is 0");
×
449
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
450
    goto END;
×
451
  }
452

UNCOV
453
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
×
UNCOV
454
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
×
455
  } else {
UNCOV
456
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
×
457
  }
458

UNCOV
459
END:
×
UNCOV
460
  tOffsetDestroy(&reqOffset);
×
UNCOV
461
  return code;
×
462
}
463

UNCOV
464
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
×
465
                          int64_t ever) {
UNCOV
466
  if (pMsgHead == NULL) {
×
467
    return;
×
468
  }
UNCOV
469
  pMsgHead->consumerId = consumerId;
×
UNCOV
470
  pMsgHead->epoch = epoch;
×
UNCOV
471
  pMsgHead->mqMsgType = type;
×
UNCOV
472
  pMsgHead->walsver = sver;
×
UNCOV
473
  pMsgHead->walever = ever;
×
474
}
475

UNCOV
476
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
×
477
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
UNCOV
478
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
×
479
    return TSDB_CODE_TMQ_INVALID_MSG;
×
480
  }
UNCOV
481
  int32_t len = 0;
×
UNCOV
482
  int32_t code = 0;
×
UNCOV
483
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
×
UNCOV
484
  if (code < 0) {
×
485
    return TAOS_GET_TERRNO(code);
×
486
  }
UNCOV
487
  int32_t tlen = sizeof(SMqRspHead) + len;
×
UNCOV
488
  void*   buf = rpcMallocCont(tlen);
×
UNCOV
489
  if (buf == NULL) {
×
490
    return TAOS_GET_TERRNO(terrno);
×
491
  }
492

UNCOV
493
  int64_t sver = 0, ever = 0;
×
UNCOV
494
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
×
UNCOV
495
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
×
496

UNCOV
497
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
×
498

UNCOV
499
  SEncoder encoder = {0};
×
UNCOV
500
  tEncoderInit(&encoder, abuf, len);
×
UNCOV
501
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
×
UNCOV
502
  tEncoderClear(&encoder);
×
UNCOV
503
  if (code < 0) {
×
504
    rpcFreeCont(buf);
×
505
    return TAOS_GET_TERRNO(code);
×
506
  }
UNCOV
507
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
×
508

UNCOV
509
  tmsgSendRsp(&resp);
×
UNCOV
510
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
×
511
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
512

UNCOV
513
  return 0;
×
514
}
515

UNCOV
516
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
×
517
                          int32_t vgId) {
UNCOV
518
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
×
519
    return TSDB_CODE_TMQ_INVALID_MSG;
×
520
  }
UNCOV
521
  int32_t len = 0;
×
UNCOV
522
  int32_t code = 0;
×
UNCOV
523
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
×
UNCOV
524
  if (code < 0) {
×
525
    return TAOS_GET_TERRNO(code);
×
526
  }
UNCOV
527
  int32_t tlen = sizeof(SMqRspHead) + len;
×
UNCOV
528
  void*   buf = rpcMallocCont(tlen);
×
UNCOV
529
  if (buf == NULL) {
×
530
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
531
  }
532

UNCOV
533
  int64_t sver = 0, ever = 0;
×
UNCOV
534
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
×
UNCOV
535
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
×
536

UNCOV
537
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
×
538

UNCOV
539
  SEncoder encoder = {0};
×
UNCOV
540
  tEncoderInit(&encoder, abuf, len);
×
UNCOV
541
  code = tEncodeMqMetaRsp(&encoder, pRsp);
×
UNCOV
542
  tEncoderClear(&encoder);
×
UNCOV
543
  if (code < 0) {
×
544
    rpcFreeCont(buf);
×
545
    return TAOS_GET_TERRNO(code);
×
546
  }
547

UNCOV
548
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
×
549

UNCOV
550
  tmsgSendRsp(&resp);
×
UNCOV
551
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
×
552
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
553

UNCOV
554
  return 0;
×
555
}
556

UNCOV
557
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
×
558
                        int32_t type, int64_t sver, int64_t ever) {
UNCOV
559
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
×
560
    return TSDB_CODE_TMQ_INVALID_MSG;
×
561
  }
UNCOV
562
  int32_t len = 0;
×
UNCOV
563
  int32_t code = 0;
×
564

UNCOV
565
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
×
UNCOV
566
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
×
UNCOV
567
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
×
UNCOV
568
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
×
569
  }
570

UNCOV
571
  if (code < 0) {
×
572
    return TAOS_GET_TERRNO(code);
×
573
  }
574

UNCOV
575
  int32_t tlen = sizeof(SMqRspHead) + len;
×
UNCOV
576
  void*   buf = rpcMallocCont(tlen);
×
UNCOV
577
  if (buf == NULL) {
×
578
    return terrno;
×
579
  }
580

UNCOV
581
  SMqRspHead* pHead = (SMqRspHead*)buf;
×
UNCOV
582
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
×
583

UNCOV
584
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
×
585

UNCOV
586
  SEncoder encoder = {0};
×
UNCOV
587
  tEncoderInit(&encoder, abuf, len);
×
588

UNCOV
589
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
×
UNCOV
590
    code = tEncodeMqDataRsp(&encoder, pRsp);
×
UNCOV
591
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
×
UNCOV
592
    code = tEncodeSTaosxRsp(&encoder, pRsp);
×
593
  }
UNCOV
594
  tEncoderClear(&encoder);
×
UNCOV
595
  if (code < 0) {
×
596
    rpcFreeCont(buf);
×
597
    return TAOS_GET_TERRNO(code);
×
598
  }
UNCOV
599
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
×
600

UNCOV
601
  tmsgSendRsp(&rsp);
×
UNCOV
602
  return 0;
×
603
}
604

UNCOV
605
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
×
UNCOV
606
  int32_t     code = 0;
×
UNCOV
607
  int32_t     line = 0;
×
UNCOV
608
  SDecoder*   pCoder = &(SDecoder){0};
×
UNCOV
609
  SDeleteRes* pRes = &(SDeleteRes){0};
×
610

UNCOV
611
  *pRefBlock = NULL;
×
612

UNCOV
613
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
×
UNCOV
614
  TSDB_CHECK_NULL(pRes->uidList, code, line, END, terrno)
×
615

UNCOV
616
  tDecoderInit(pCoder, (uint8_t*)pData, len);
×
UNCOV
617
  code = tDecodeDeleteRes(pCoder, pRes);
×
UNCOV
618
  TSDB_CHECK_CODE(code, line, END);
×
619

UNCOV
620
  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
×
UNCOV
621
  if (numOfTables == 0 || pRes->affectedRows == 0) {
×
UNCOV
622
    goto END;
×
623
  }
624

UNCOV
625
  SSDataBlock* pDelBlock = NULL;
×
UNCOV
626
  code = createSpecialDataBlock(blockType, &pDelBlock);
×
UNCOV
627
  TSDB_CHECK_CODE(code, line, END);
×
628

UNCOV
629
  code = blockDataEnsureCapacity(pDelBlock, numOfTables);
×
UNCOV
630
  TSDB_CHECK_CODE(code, line, END);
×
631

UNCOV
632
  pDelBlock->info.rows = numOfTables;
×
UNCOV
633
  pDelBlock->info.version = ver;
×
634

UNCOV
635
  for (int32_t i = 0; i < numOfTables; i++) {
×
636
    // start key column
UNCOV
637
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
UNCOV
638
    TSDB_CHECK_NULL(pStartCol, code, line, END, terrno)
×
UNCOV
639
    code = colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
×
UNCOV
640
    TSDB_CHECK_CODE(code, line, END);
×
UNCOV
641
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
UNCOV
642
    TSDB_CHECK_NULL(pEndCol, code, line, END, terrno)
×
UNCOV
643
    code = colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
×
UNCOV
644
    TSDB_CHECK_CODE(code, line, END);
×
645
    // uid column
UNCOV
646
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
×
UNCOV
647
    TSDB_CHECK_NULL(pUidCol, code, line, END, terrno)
×
648

UNCOV
649
    int64_t* pUid = taosArrayGet(pRes->uidList, i);
×
UNCOV
650
    code = colDataSetVal(pUidCol, i, (const char*)pUid, false);
×
UNCOV
651
    TSDB_CHECK_CODE(code, line, END);
×
UNCOV
652
    void* tmp = taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
UNCOV
653
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
×
UNCOV
654
    colDataSetNULL(tmp, i);
×
UNCOV
655
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
UNCOV
656
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
×
UNCOV
657
    colDataSetNULL(tmp, i);
×
UNCOV
658
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
UNCOV
659
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
×
UNCOV
660
    colDataSetNULL(tmp, i);
×
UNCOV
661
    tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
×
UNCOV
662
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
×
UNCOV
663
    colDataSetNULL(tmp, i);
×
664
  }
665

UNCOV
666
  if (type == 0) {
×
UNCOV
667
    code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
×
UNCOV
668
    if (code) {
×
669
      blockDataCleanup(pDelBlock);
×
670
      taosMemoryFree(pDelBlock);
×
671
      return code;
×
672
    }
673

UNCOV
674
    ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
×
UNCOV
675
    ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
×
UNCOV
676
  } else if (type == 1) {
×
UNCOV
677
    *pRefBlock = pDelBlock;
×
678
  } else {
679
    tqError("unknown type:%d", type);
×
680
    code = TSDB_CODE_TMQ_CONSUMER_ERROR;
×
681
  }
682

UNCOV
683
END:
×
UNCOV
684
  if (code != 0) {
×
685
    tqError("failed to extract delete data block, line:%d code:%d", line, code);
×
686
  }
UNCOV
687
  tDecoderClear(pCoder);
×
UNCOV
688
  taosArrayDestroy(pRes->uidList);
×
UNCOV
689
  return code;
×
690
}
691

UNCOV
692
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
×
UNCOV
693
  SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
×
UNCOV
694
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
×
UNCOV
695
  int32_t      code = TSDB_CODE_SUCCESS;
×
696

UNCOV
697
  if (pDelay != NULL) {
×
UNCOV
698
    *pDelay = 0;
×
699
  }
700

UNCOV
701
  *fhFinished = false;
×
702

UNCOV
703
  if (numOfTasks <= 0) {
×
704
    return code;
×
705
  }
706

707
  // extract the required source task for a given stream, identified by streamId
UNCOV
708
  streamMetaRLock(pMeta);
×
709

UNCOV
710
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
×
711

UNCOV
712
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
UNCOV
713
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
×
UNCOV
714
    if (pId == NULL) {
×
UNCOV
715
      continue;
×
716
    }
UNCOV
717
    if (pId->streamId != streamId) {
×
UNCOV
718
      continue;
×
719
    }
720

UNCOV
721
    STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
×
UNCOV
722
    SStreamTask* pTask = NULL;
×
723

UNCOV
724
    code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
×
UNCOV
725
    if (code != 0) {
×
726
      tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
×
727
      continue;
×
728
    }
729

UNCOV
730
    if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
×
UNCOV
731
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
732
      continue;
×
733
    }
734

735
    // here we get the required stream source task
UNCOV
736
    *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
×
737

UNCOV
738
    int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
×
UNCOV
739
    if (ver == -1) {
×
UNCOV
740
      ver = pTask->chkInfo.processedVer;
×
741
    } else {
UNCOV
742
      ver--;
×
743
    }
744

UNCOV
745
    SVersionRange verRange = {0};
×
UNCOV
746
    walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
×
747

UNCOV
748
    SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
×
UNCOV
749
    if (pReader == NULL) {
×
750
      tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
×
751
      streamMetaReleaseTask(pMeta, pTask);
×
752
      continue;
×
753
    }
754

UNCOV
755
    int64_t cur = 0;
×
UNCOV
756
    int64_t latest = 0;
×
757

UNCOV
758
    code = walFetchHead(pReader, ver);
×
UNCOV
759
    if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
760
      cur = pReader->pHead->head.ingestTs;
×
761
    }
762

UNCOV
763
    if (ver == verRange.maxVer) {
×
UNCOV
764
      latest = cur;
×
765
    } else {
UNCOV
766
      code = walFetchHead(pReader, verRange.maxVer);
×
UNCOV
767
      if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
768
        latest = pReader->pHead->head.ingestTs;
×
769
      }
770
    }
771

UNCOV
772
    if (pDelay != NULL) {  // delay in ms
×
UNCOV
773
      *pDelay = (latest - cur) / 1000;
×
774
    }
775

UNCOV
776
    walCloseReader(pReader);
×
UNCOV
777
    streamMetaReleaseTask(pMeta, pTask);
×
778
  }
779

UNCOV
780
  streamMetaRUnLock(pMeta);
×
781

UNCOV
782
  return TSDB_CODE_SUCCESS;
×
783
}
784

UNCOV
785
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
×
UNCOV
786
  int32_t          code = 0;
×
UNCOV
787
  int32_t          lino = 0;
×
UNCOV
788
  SDecoder         dc = {0};
×
UNCOV
789
  SVDropTbBatchReq batchReq = {0};
×
UNCOV
790
  tDecoderInit(&dc, (uint8_t*)data, len);
×
UNCOV
791
  code = tDecodeSVDropTbBatchReq(&dc, &batchReq);
×
UNCOV
792
  TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
793
  if (batchReq.nReqs <= 0) goto _exit;
×
794

UNCOV
795
  SSDataBlock* pBlock = NULL;
×
UNCOV
796
  code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock);
×
UNCOV
797
  TSDB_CHECK_CODE(code, lino, _exit);
×
798

UNCOV
799
  code = blockDataEnsureCapacity(pBlock, batchReq.nReqs);
×
UNCOV
800
  TSDB_CHECK_CODE(code, lino, _exit);
×
801

UNCOV
802
  pBlock->info.rows = batchReq.nReqs;
×
UNCOV
803
  pBlock->info.version = ver;
×
UNCOV
804
  for (int32_t i = 0; i < batchReq.nReqs; ++i) {
×
UNCOV
805
    SVDropTbReq* pReq = batchReq.pReqs + i;
×
UNCOV
806
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
×
UNCOV
807
    TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
×
UNCOV
808
    code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
×
UNCOV
809
    TSDB_CHECK_CODE(code, lino, _exit);
×
810
  }
811

UNCOV
812
  code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
×
UNCOV
813
  TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
814
  ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
×
UNCOV
815
  ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock;
×
816

UNCOV
817
_exit:
×
UNCOV
818
  tDecoderClear(&dc);
×
UNCOV
819
  if (TSDB_CODE_SUCCESS != code) {
×
820
    tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code));
×
821
    blockDataCleanup(pBlock);
×
822
    taosMemoryFree(pBlock);
×
823
  }
UNCOV
824
  return code;
×
825
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc