• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3616

19 Feb 2025 11:22AM UTC coverage: 63.315% (+0.4%) from 62.953%
#3616

push

travis-ci

web-flow
Merge pull request #29823 from taosdata/feat/TS-5928

fix:[TS-5928]add consumer parameters

148228 of 300186 branches covered (49.38%)

Branch coverage included in aggregate %.

232358 of 300909 relevant lines covered (77.22%)

17990742.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.6
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
56,230✔
24
  int32_t    code = TDB_CODE_SUCCESS;
56,230✔
25
  int32_t    lino = 0;
56,230✔
26
  tqDebug("%s called", __FUNCTION__ );
56,230!
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
56,229!
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
56,229✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
56,230!
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
56,230✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
56,230!
34

35
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
56,230✔
36
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
56,230✔
37
  pRsp->withTbName = 0;
56,230✔
38
  pRsp->withSchema = false;
56,230✔
39

40
END:
56,230✔
41
  if (code != 0){
56,230!
42
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
43
  }
44
  return code;
56,230✔
45
}
46

47
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
14,911✔
48
  SSyncState state = syncGetState(pTq->pVnode->sync);
14,911✔
49
  streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
14,926✔
50
}
14,926✔
51

52
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
12,576✔
53
  int32_t    code = TDB_CODE_SUCCESS;
12,576✔
54
  int32_t    lino = 0;
12,576✔
55
  tqDebug("%s called", __FUNCTION__ );
12,576!
56
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
12,576!
57
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
12,576✔
58
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
12,576✔
59

60
  pRsp->withTbName = 1;
12,576✔
61
  pRsp->withSchema = 1;
12,576✔
62
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
12,576✔
63
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
12,575!
64

65
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
12,575✔
66
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
12,576!
67

68
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
12,576✔
69
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
12,576!
70

71
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
12,576✔
72
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
12,576!
73

74

75
END:
12,576✔
76
  if (code != 0){
12,576!
77
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
78
    taosArrayDestroy(pRsp->blockData);
×
79
    taosArrayDestroy(pRsp->blockDataLen);
×
80
    taosArrayDestroy(pRsp->blockTbName);
×
81
    taosArrayDestroy(pRsp->blockSchema);
×
82
  }
83
  return code;
12,576✔
84
}
85

86
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
2,594✔
87
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
88
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
2,594!
89
    return TSDB_CODE_INVALID_PARA;
×
90
  }
91
  uint64_t   consumerId = pRequest->consumerId;
2,594✔
92
  STqOffset* pOffset = NULL;
2,594✔
93
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
2,594✔
94
  int32_t    vgId = TD_VID(pTq->pVnode);
2,593✔
95

96
  *pBlockReturned = false;
2,593✔
97
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
98
  if (code == 0) {
2,593✔
99
    tOffsetCopy(pOffsetVal, &pOffset->val);
165✔
100

101
    char formatBuf[TSDB_OFFSET_LEN] = {0};
165✔
102
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
165✔
103
    tqDebug("tmq poll: consumer:0x%" PRIx64
165!
104
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
105
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
106
    return 0;
165✔
107
  } else {
108
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
109
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
2,428✔
110
      if (pRequest->useSnapshot) {
1,725✔
111
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
849!
112
                consumerId, pHandle->subKey, vgId);
113
        if (pHandle->fetchMeta) {
849✔
114
          tqOffsetResetToMeta(pOffsetVal, 0);
115
        } else {
116
          SValue val = {0};
833!
117
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
833✔
118
        }
119
      } else {
120
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
876✔
121
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
877✔
122
      }
123
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
703✔
124
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
55✔
125
      SMqDataRsp dataRsp = {0};
55✔
126
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
55✔
127

128
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
55✔
129
      if (code != 0) {
55!
130
        return code;
×
131
      }
132
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
55!
133
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
134
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
55!
135
      tDeleteMqDataRsp(&dataRsp);
55✔
136

137
      *pBlockReturned = true;
55✔
138
      return code;
55✔
139
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
648!
140
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
648!
141
                  " in vg %d, subkey %s, reset none failed",
142
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
143
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
648✔
144
    }
145
  }
146

147
  return 0;
1,726✔
148
}
149

150
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
55,493✔
151
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
152
  int32_t    code = TDB_CODE_SUCCESS;
55,493✔
153
  int32_t    lino = 0;
55,493✔
154
  tqDebug("%s called", __FUNCTION__ );
55,493!
155
  uint64_t consumerId = pRequest->consumerId;
55,493✔
156
  int32_t  vgId = TD_VID(pTq->pVnode);
55,493✔
157
  terrno = 0;
55,493✔
158

159
  SMqDataRsp dataRsp = {0};
55,493✔
160
  code = tqInitDataRsp(&dataRsp, *pOffset);
55,493✔
161
  TSDB_CHECK_CODE(code, lino, end);
55,493!
162

163
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
55,493✔
164
  TSDB_CHECK_CODE(code, lino, end);
55,493!
165

166
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
55,493✔
167
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
55,493✔
168
    goto end;
656✔
169
  }
170

171
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
172
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
54,837✔
173
    // lock
174
    taosWLockLatch(&pTq->lock);
25,642✔
175
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
25,642✔
176
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
25,642✔
177
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
24,709✔
178
      taosWUnLockLatch(&pTq->lock);
24,709✔
179
      goto end;
24,709✔
180
    }
181
    taosWUnLockLatch(&pTq->lock);
933✔
182
  }
183

184
  // reqOffset represents the current date offset, may be changed if wal not exists
185
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
30,128✔
186
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
30,128✔
187

188
end:
55,493✔
189
  {
190
    char buf[TSDB_OFFSET_LEN] = {0};
55,493✔
191
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
55,493✔
192
    if (code != 0){
55,493✔
193
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " error msg:%s, line:%d",
656!
194
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
195
    } else {
196
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " success",
54,837!
197
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
198
    }
199

200
    tDeleteMqDataRsp(&dataRsp);
55,493✔
201
    return code;
55,493✔
202
  }
203
}
204

205
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
206
  SDecoder decoder = {0};                                                                                  \
207
  TYPE     req = {0};                                                                                      \
208
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
209
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
210
  tDecoderInit(&decoder, data, len);                                                                       \
211
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
212
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
213
            " msgType %d",                                                                                 \
214
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
215
    fetchVer++;                                                                                            \
216
    DELETE_FUNC(&req);                                                                                     \
217
    tDecoderClear(&decoder);                                                                               \
218
    continue;                                                                                              \
219
  }                                                                                                        \
220
  DELETE_FUNC(&req);                                                                                       \
221
  tDecoderClear(&decoder);
222

223
static void tDeleteCommon(void* parm) {}
406✔
224

225
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
226
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
227
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
228

229
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
12,576✔
230
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
231
  int32_t         vgId = TD_VID(pTq->pVnode);
12,576✔
232
  SMqDataRsp      taosxRsp = {0};
12,576✔
233
  SMqBatchMetaRsp btMetaRsp = {0};
12,576✔
234
  int32_t         code = 0;
12,576✔
235

236
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
12,576!
237
  if (offset->type != TMQ_OFFSET__LOG) {
12,576✔
238
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
154!
239

240
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
154✔
241
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
14✔
242
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
14!
243
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
244
      goto END;
14✔
245
    }
246

247
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
140!
248
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
249
    if (taosxRsp.blockNum > 0) {
140✔
250
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
136✔
251
      goto END;
136✔
252
    } else {
253
      tOffsetCopy(offset, &taosxRsp.rspOffset);
4✔
254
    }
255
  }
256

257
  if (offset->type == TMQ_OFFSET__LOG) {
12,426!
258
    walReaderVerifyOffset(pHandle->pWalReader, offset);
12,426✔
259
    int64_t fetchVer = offset->version;
12,426✔
260

261
    uint64_t st = taosGetTimestampMs();
12,426✔
262
    int      totalRows = 0;
12,426✔
263
    int32_t  totalMetaRows = 0;
12,426✔
264
    while (1) {
121,266✔
265
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
133,692✔
266
      if (savedEpoch > pRequest->epoch) {
133,585!
267
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
268
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
269
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
270
        goto END;
12,426✔
271
      }
272

273
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
133,585✔
274
        if (totalMetaRows > 0) {
11,630✔
275
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
2✔
276
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
2✔
277
          goto END;
2✔
278
        }
279
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
11,628✔
280
        code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
11,628✔
281
                             POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
11,628✔
282
        goto END;
11,628✔
283
      }
284

285
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
122,309✔
286
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
122,309!
287
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
288

289
      // process meta
290
      if (pHead->msgType != TDMT_VND_SUBMIT) {
122,312✔
291
        if (totalRows > 0) {
674✔
292
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
35✔
293
          code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
35✔
294
                               POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
35!
295
          goto END;
269✔
296
        }
297

298
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
639✔
299
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
635✔
300
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
191!
301
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
444✔
302
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
55!
303
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
497✔
304
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
344!
305
          } else if (pHead->msgType == TDMT_VND_DELETE) {
45✔
306
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
7!
307
          }
308
        }
309

310
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
236!
311
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
312
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
236!
313
          SMqMetaRsp metaRsp = {0};
234✔
314
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
234✔
315
          metaRsp.resMsgType = pHead->msgType;
234✔
316
          metaRsp.metaRspLen = pHead->bodyLen;
234✔
317
          metaRsp.metaRsp = pHead->body;
234✔
318
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
234✔
319
          goto END;
234✔
320
        }
321

322
        if (!btMetaRsp.batchMetaReq) {
2!
323
          btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
2✔
324
          TQ_NULL_GO_TO_END(btMetaRsp.batchMetaReq);
2!
325
          btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
2✔
326
          TQ_NULL_GO_TO_END(btMetaRsp.batchMetaLen);
2!
327
        }
328
        fetchVer++;
2✔
329

330
        SMqMetaRsp tmpMetaRsp = {0};
2✔
331
        tmpMetaRsp.resMsgType = pHead->msgType;
2✔
332
        tmpMetaRsp.metaRspLen = pHead->bodyLen;
2✔
333
        tmpMetaRsp.metaRsp = pHead->body;
2✔
334
        uint32_t len = 0;
2✔
335
        tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
2!
336
        if (TSDB_CODE_SUCCESS != code) {
2!
337
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
338
          continue;
×
339
        }
340
        int32_t tLen = sizeof(SMqRspHead) + len;
2✔
341
        void*   tBuf = taosMemoryCalloc(1, tLen);
2!
342
        TQ_NULL_GO_TO_END(tBuf);
2!
343
        void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
2✔
344
        SEncoder encoder = {0};
2✔
345
        tEncoderInit(&encoder, metaBuff, len);
2✔
346
        code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
2✔
347
        tEncoderClear(&encoder);
2✔
348

349
        if (code < 0) {
2!
350
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
351
          continue;
×
352
        }
353
        TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf));
4!
354
        TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen));
4!
355
        totalMetaRows++;
2✔
356
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
4!
357
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
358
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
359
          goto END;
×
360
        }
361
        continue;
2✔
362
      }
363

364
      if (totalMetaRows > 0) {
121,638!
365
        tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
366
        code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
367
        goto END;
×
368
      }
369

370
      // process data
371
      SPackedData submit = {
121,638✔
372
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
121,638✔
373
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
121,638✔
374
          .ver = pHead->version,
121,638✔
375
      };
376

377
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
121,638!
378

379
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
121,450!
380
          (taosGetTimestampMs() - st > pRequest->timeout) ||
120,962!
381
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
121,010!
382
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
30!
383
//        tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d",
384
//                (int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno);
385
        tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT ? fetchVer : fetchVer + 1);
627✔
386
        code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
527✔
387
                             POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
527✔
388
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){terrno = 0;}
527✔
389
        goto END;
527✔
390
      } else {
391
        fetchVer++;
120,861✔
392
      }
393
    }
394
  }
395

396
END:
×
397
  if (code != 0){
12,576!
398
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
399
            pRequest->subKey);
400
  }
401
  tDeleteMqBatchMetaRsp(&btMetaRsp);
12,576✔
402
  tDeleteSTaosxRsp(&taosxRsp);
12,574✔
403
  return code;
12,576✔
404
}
405

406
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
68,772✔
407
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
68,772!
408
    return TSDB_CODE_TMQ_INVALID_MSG;
×
409
  }
410
  int32_t      code = 0;
68,772✔
411
  STqOffsetVal reqOffset = {0};
68,772✔
412
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
68,772✔
413

414
  // reset the offset if needed
415
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
68,771✔
416
    bool blockReturned = false;
2,593✔
417
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
2,593✔
418
    if (code != 0) {
2,594✔
419
      goto END;
703✔
420
    }
421

422
    // empty block returned, quit
423
    if (blockReturned) {
1,946✔
424
      goto END;
55✔
425
    }
426
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
66,178!
427
    uError("req offset type is 0");
×
428
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
429
    goto END;
×
430
  }
431

432
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
68,069✔
433
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
55,493✔
434
  } else {
435
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
12,576✔
436
  }
437

438
END:
68,771✔
439
  if (code != 0){
68,771✔
440
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
1,304!
441
  }
442
  tOffsetDestroy(&reqOffset);
68,771✔
443
  return code;
68,771✔
444
}
445

446
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
43,439✔
447
                          int64_t ever) {
448
  if (pMsgHead == NULL) {
43,439!
449
    return;
×
450
  }
451
  pMsgHead->consumerId = consumerId;
43,439✔
452
  pMsgHead->epoch = epoch;
43,439✔
453
  pMsgHead->mqMsgType = type;
43,439✔
454
  pMsgHead->walsver = sver;
43,439✔
455
  pMsgHead->walever = ever;
43,439✔
456
}
457

458
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
16✔
459
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
460
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
16!
461
    return TSDB_CODE_TMQ_INVALID_MSG;
×
462
  }
463
  int32_t len = 0;
16✔
464
  int32_t code = 0;
16✔
465
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
16!
466
  if (code < 0) {
16!
467
    return TAOS_GET_TERRNO(code);
×
468
  }
469
  int32_t tlen = sizeof(SMqRspHead) + len;
16✔
470
  void*   buf = rpcMallocCont(tlen);
16✔
471
  if (buf == NULL) {
16!
472
    return TAOS_GET_TERRNO(terrno);
×
473
  }
474

475
  int64_t sver = 0, ever = 0;
16✔
476
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
16✔
477
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
16✔
478

479
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
16✔
480

481
  SEncoder encoder = {0};
16✔
482
  tEncoderInit(&encoder, abuf, len);
16✔
483
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
16✔
484
  tEncoderClear(&encoder);
16✔
485
  if (code < 0) {
16!
486
    rpcFreeCont(buf);
×
487
    return TAOS_GET_TERRNO(code);
×
488
  }
489
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
16✔
490

491
  tmsgSendRsp(&resp);
16✔
492
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
16!
493
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
494

495
  return 0;
16✔
496
}
497

498
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
234✔
499
                          int32_t vgId) {
500
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
234!
501
    return TSDB_CODE_TMQ_INVALID_MSG;
×
502
  }
503
  int32_t len = 0;
234✔
504
  int32_t code = 0;
234✔
505
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
234!
506
  if (code < 0) {
234!
507
    return TAOS_GET_TERRNO(code);
×
508
  }
509
  int32_t tlen = sizeof(SMqRspHead) + len;
234✔
510
  void*   buf = rpcMallocCont(tlen);
234✔
511
  if (buf == NULL) {
234!
512
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
513
  }
514

515
  int64_t sver = 0, ever = 0;
234✔
516
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
234✔
517
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
234✔
518

519
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
234✔
520

521
  SEncoder encoder = {0};
234✔
522
  tEncoderInit(&encoder, abuf, len);
234✔
523
  code = tEncodeMqMetaRsp(&encoder, pRsp);
234✔
524
  tEncoderClear(&encoder);
234✔
525
  if (code < 0) {
234!
526
    rpcFreeCont(buf);
×
527
    return TAOS_GET_TERRNO(code);
×
528
  }
529

530
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
234✔
531

532
  tmsgSendRsp(&resp);
234✔
533
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
234!
534
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
535

536
  return 0;
234✔
537
}
538

539
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
43,191✔
540
                        int32_t type, int64_t sver, int64_t ever) {
541
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
43,191!
542
    return TSDB_CODE_TMQ_INVALID_MSG;
×
543
  }
544
  int32_t len = 0;
43,191✔
545
  int32_t code = 0;
43,191✔
546

547
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
43,191✔
548
    pRsp->withSchema = 0;
53✔
549
  }
550
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
43,191✔
551
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
141✔
552
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
553
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
43,103!
554
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
88!
555
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
88!
556
  }
557

558
  if (code < 0) {
43,187!
559
    return TAOS_GET_TERRNO(code);
×
560
  }
561

562
  int32_t tlen = sizeof(SMqRspHead) + len;
43,187✔
563
  void*   buf = rpcMallocCont(tlen);
43,187✔
564
  if (buf == NULL) {
43,191!
565
    return terrno;
×
566
  }
567

568
  SMqRspHead* pHead = (SMqRspHead*)buf;
43,191✔
569
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
43,191✔
570

571
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
43,189✔
572

573
  SEncoder encoder = {0};
43,189✔
574
  tEncoderInit(&encoder, abuf, len);
43,189✔
575

576
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
43,190✔
577
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
141✔
578
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
579
    code = tEncodeMqDataRsp(&encoder, pRsp);
43,102✔
580
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
88!
581
    code = tEncodeSTaosxRsp(&encoder, pRsp);
88✔
582
  }
583
  tEncoderClear(&encoder);
43,188✔
584
  if (code < 0) {
43,190!
585
    rpcFreeCont(buf);
×
586
    return TAOS_GET_TERRNO(code);
×
587
  }
588
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
43,190✔
589

590
  tmsgSendRsp(&rsp);
43,190✔
591
  return 0;
43,191✔
592
}
593

594
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
11,339✔
595
  int32_t     code = 0;
11,339✔
596
  int32_t     line = 0;
11,339✔
597
  SDecoder*   pCoder = &(SDecoder){0};
11,339✔
598
  SDeleteRes* pRes = &(SDeleteRes){0};
11,339✔
599

600
  *pRefBlock = NULL;
11,339✔
601

602
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
11,339✔
603
  TSDB_CHECK_NULL(pRes->uidList, code, line, END, terrno)
11,339!
604

605
  tDecoderInit(pCoder, (uint8_t*)pData, len);
11,339✔
606
  code = tDecodeDeleteRes(pCoder, pRes);
11,338✔
607
  TSDB_CHECK_CODE(code, line, END);
11,328!
608

609
  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
11,328✔
610
  if (numOfTables == 0 || pRes->affectedRows == 0) {
11,327✔
611
    goto END;
8,921✔
612
  }
613

614
  SSDataBlock* pDelBlock = NULL;
2,406✔
615
  code = createSpecialDataBlock(blockType, &pDelBlock);
2,406✔
616
  TSDB_CHECK_CODE(code, line, END);
2,409!
617

618
  code = blockDataEnsureCapacity(pDelBlock, numOfTables);
2,409✔
619
  TSDB_CHECK_CODE(code, line, END);
2,410!
620

621
  pDelBlock->info.rows = numOfTables;
2,410✔
622
  pDelBlock->info.version = ver;
2,410✔
623

624
  for (int32_t i = 0; i < numOfTables; i++) {
4,827✔
625
    // start key column
626
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
2,417✔
627
    TSDB_CHECK_NULL(pStartCol, code, line, END, terrno)
2,417!
628
    code = colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
2,417✔
629
    TSDB_CHECK_CODE(code, line, END);
2,417!
630
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
2,417✔
631
    TSDB_CHECK_NULL(pEndCol, code, line, END, terrno)
2,417!
632
    code = colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
2,417✔
633
    TSDB_CHECK_CODE(code, line, END);
2,417!
634
    // uid column
635
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
2,417✔
636
    TSDB_CHECK_NULL(pUidCol, code, line, END, terrno)
2,417!
637

638
    int64_t* pUid = taosArrayGet(pRes->uidList, i);
2,417✔
639
    code = colDataSetVal(pUidCol, i, (const char*)pUid, false);
2,417✔
640
    TSDB_CHECK_CODE(code, line, END);
2,417!
641
    void* tmp = taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX);
2,417✔
642
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,417!
643
    colDataSetNULL(tmp, i);
2,417!
644
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
2,417✔
645
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,417!
646
    colDataSetNULL(tmp, i);
2,417!
647
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
2,417✔
648
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,417!
649
    colDataSetNULL(tmp, i);
2,417!
650
    tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
2,417✔
651
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,417!
652
    colDataSetNULL(tmp, i);
2,417!
653
  }
654

655
  if (type == 0) {
2,410✔
656
    code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
2,402✔
657
    if (code) {
2,402!
658
      blockDataCleanup(pDelBlock);
×
659
      taosMemoryFree(pDelBlock);
×
660
      return code;
×
661
    }
662

663
    ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
2,402✔
664
    ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
2,402✔
665
  } else if (type == 1) {
8!
666
    *pRefBlock = pDelBlock;
8✔
667
  } else {
668
    tqError("unknown type:%d", type);
×
669
    code = TSDB_CODE_TMQ_CONSUMER_ERROR;
×
670
  }
671

672
END:
11,331✔
673
  if (code != 0) {
11,331!
674
    tqError("failed to extract delete data block, line:%d code:%d", line, code);
×
675
  }
676
  tDecoderClear(pCoder);
11,331✔
677
  taosArrayDestroy(pRes->uidList);
11,331✔
678
  return code;
11,334✔
679
}
680

681
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
5,903✔
682
  SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
5,903✔
683
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,903✔
684
  int32_t      code = TSDB_CODE_SUCCESS;
5,903✔
685

686
  if (pDelay != NULL) {
5,903!
687
    *pDelay = 0;
5,903✔
688
  }
689

690
  *fhFinished = false;
5,903✔
691

692
  if (numOfTasks <= 0) {
5,903!
693
    return code;
×
694
  }
695

696
  // extract the required source task for a given stream, identified by streamId
697
  streamMetaRLock(pMeta);
5,903✔
698

699
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,903✔
700

701
  for (int32_t i = 0; i < numOfTasks; ++i) {
36,035✔
702
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
30,131✔
703
    if (pId == NULL) {
30,133!
704
      continue;
24,236✔
705
    }
706
    if (pId->streamId != streamId) {
30,133✔
707
      continue;
18,332✔
708
    }
709

710
    STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
11,801✔
711
    SStreamTask* pTask = NULL;
11,801✔
712

713
    code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
11,801✔
714
    if (code != 0) {
11,808!
715
      tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
×
716
      continue;
×
717
    }
718

719
    if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
11,808✔
720
      streamMetaReleaseTask(pMeta, pTask);
5,904✔
721
      continue;
5,904✔
722
    }
723

724
    // here we get the required stream source task
725
    *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
5,904✔
726

727
    int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
5,904✔
728
    if (ver == -1) {
5,904✔
729
      ver = pTask->chkInfo.processedVer;
1,884✔
730
    } else {
731
      ver--;
4,020✔
732
    }
733

734
    SVersionRange verRange = {0};
5,904✔
735
    walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
5,904✔
736

737
    SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
5,904✔
738
    if (pReader == NULL) {
5,904!
739
      tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
×
740
      streamMetaReleaseTask(pMeta, pTask);
×
741
      continue;
×
742
    }
743

744
    int64_t cur = 0;
5,904✔
745
    int64_t latest = 0;
5,904✔
746

747
    code = walFetchHead(pReader, ver);
5,904✔
748
    if (code == TSDB_CODE_SUCCESS) {
5,904!
749
      cur = pReader->pHead->head.ingestTs;
5,904✔
750
    }
751

752
    if (ver == verRange.maxVer) {
5,904✔
753
      latest = cur;
2,526✔
754
    } else {
755
      code = walFetchHead(pReader, verRange.maxVer);
3,378✔
756
      if (code == TSDB_CODE_SUCCESS) {
3,378!
757
        latest = pReader->pHead->head.ingestTs;
3,378✔
758
      }
759
    }
760

761
    if (pDelay != NULL) {  // delay in ms
5,904!
762
      *pDelay = (latest - cur) / 1000;
5,904✔
763
    }
764

765
    walCloseReader(pReader);
5,904✔
766
    streamMetaReleaseTask(pMeta, pTask);
5,904✔
767
  }
768

769
  streamMetaRUnLock(pMeta);
5,904✔
770

771
  return TSDB_CODE_SUCCESS;
5,904✔
772
}
773

774
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
185✔
775
  int32_t          code = 0;
185✔
776
  int32_t          lino = 0;
185✔
777
  SDecoder         dc = {0};
185✔
778
  SVDropTbBatchReq batchReq = {0};
185✔
779
  tDecoderInit(&dc, (uint8_t*)data, len);
185✔
780
  code = tDecodeSVDropTbBatchReq(&dc, &batchReq);
185✔
781
  TSDB_CHECK_CODE(code, lino, _exit);
185!
782
  if (batchReq.nReqs <= 0) goto _exit;
185!
783

784
  SSDataBlock* pBlock = NULL;
185✔
785
  code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock);
185✔
786
  TSDB_CHECK_CODE(code, lino, _exit);
185!
787

788
  code = blockDataEnsureCapacity(pBlock, batchReq.nReqs);
185✔
789
  TSDB_CHECK_CODE(code, lino, _exit);
185!
790

791
  pBlock->info.rows = batchReq.nReqs;
185✔
792
  pBlock->info.version = ver;
185✔
793
  for (int32_t i = 0; i < batchReq.nReqs; ++i) {
375✔
794
    SVDropTbReq* pReq = batchReq.pReqs + i;
190✔
795
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
190✔
796
    TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
190!
797
    code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
190✔
798
    TSDB_CHECK_CODE(code, lino, _exit);
190!
799
  }
800

801
  code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
185✔
802
  TSDB_CHECK_CODE(code, lino, _exit);
185!
803
  ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
185✔
804
  ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock;
185✔
805

806
_exit:
185✔
807
  tDecoderClear(&dc);
185✔
808
  if (TSDB_CODE_SUCCESS != code) {
185!
809
    tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code));
×
810
    blockDataCleanup(pBlock);
×
811
    taosMemoryFree(pBlock);
×
812
  }
813
  return code;
185✔
814
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc