• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.68
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
58,228✔
24
  if (pRsp == NULL) {
58,228!
UNCOV
25
    return TSDB_CODE_INVALID_PARA;
×
26
  }
27
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
58,228✔
28
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
58,228✔
29

30
  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
58,229!
UNCOV
31
    return terrno;
×
32
  }
33

34
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
58,229✔
35
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
58,229✔
36
  pRsp->withTbName = 0;
58,229✔
37
  pRsp->withSchema = false;
58,229✔
38
  return 0;
58,229✔
39
}
40

41
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
11,422✔
42
  SSyncState state = syncGetState(pTq->pVnode->sync);
11,422✔
43
  streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
11,422✔
44
}
11,422✔
45

46
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
11,104✔
47
  if (pRsp == NULL) {
11,104!
UNCOV
48
    return TSDB_CODE_INVALID_PARA;
×
49
  }
50
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
11,104✔
51
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
11,104✔
52

53
  pRsp->withTbName = 1;
11,104✔
54
  pRsp->withSchema = 1;
11,104✔
55
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
11,104✔
56
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
11,104✔
57
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
11,104✔
58
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
11,104✔
59

60
  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL ||
11,104!
61
      pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
11,104!
62
    if (pRsp->blockData != NULL) {
×
63
      taosArrayDestroy(pRsp->blockData);
×
UNCOV
64
      pRsp->blockData = NULL;
×
65
    }
66

67
    if (pRsp->blockDataLen != NULL) {
×
68
      taosArrayDestroy(pRsp->blockDataLen);
×
UNCOV
69
      pRsp->blockDataLen = NULL;
×
70
    }
71

72
    if (pRsp->blockTbName != NULL) {
×
73
      taosArrayDestroy(pRsp->blockTbName);
×
UNCOV
74
      pRsp->blockTbName = NULL;
×
75
    }
76

UNCOV
77
    if (pRsp->blockSchema != NULL) {
×
UNCOV
78
      taosArrayDestroy(pRsp->blockSchema);
×
UNCOV
79
      pRsp->blockSchema = NULL;
×
80
    }
81

UNCOV
82
    return terrno;
×
83
  }
84

85
  return 0;
11,104✔
86
}
87

88
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
2,440✔
89
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
90
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
2,440!
UNCOV
91
    return TSDB_CODE_INVALID_PARA;
×
92
  }
93
  uint64_t   consumerId = pRequest->consumerId;
2,440✔
94
  STqOffset* pOffset = NULL;
2,440✔
95
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
2,440✔
96
  int32_t    vgId = TD_VID(pTq->pVnode);
2,440✔
97

98
  *pBlockReturned = false;
2,440✔
99
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
100
  if (code == 0) {
2,440✔
101
    tOffsetCopy(pOffsetVal, &pOffset->val);
173✔
102

103
    char formatBuf[TSDB_OFFSET_LEN] = {0};
173✔
104
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
173✔
105
    tqDebug("tmq poll: consumer:0x%" PRIx64
173!
106
            ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
107
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
108
    return 0;
173✔
109
  } else {
110
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
111
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
2,267✔
112
      if (pRequest->useSnapshot) {
1,581✔
113
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
849!
114
                consumerId, pHandle->subKey, vgId);
115
        if (pHandle->fetchMeta) {
849✔
116
          tqOffsetResetToMeta(pOffsetVal, 0);
117
        } else {
118
          SValue val = {0};
831!
119
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
831✔
120
        }
121
      } else {
122
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
732✔
123
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
732✔
124
      }
125
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
686✔
126
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
54✔
127
      SMqDataRsp dataRsp = {0};
54✔
128
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
54✔
129

130
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
54✔
131
      if (code != 0) {
54!
UNCOV
132
        return code;
×
133
      }
134
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
54!
135
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
136
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
54✔
137
      tDeleteMqDataRsp(&dataRsp);
54✔
138

139
      *pBlockReturned = true;
54✔
140
      return code;
54✔
141
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
632!
142
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
632!
143
              " in vg %d, subkey %s, reset none failed",
144
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
145
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
632✔
146
    }
147
  }
148

149
  return 0;
1,581✔
150
}
151

152
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
57,476✔
153
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
154
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pOffset == NULL) {
57,476!
UNCOV
155
    return TSDB_CODE_INVALID_PARA;
×
156
  }
157
  uint64_t consumerId = pRequest->consumerId;
57,476✔
158
  int32_t  vgId = TD_VID(pTq->pVnode);
57,476✔
159
  terrno = 0;
57,476✔
160

161
  SMqDataRsp dataRsp = {0};
57,476✔
162
  int code = tqInitDataRsp(&dataRsp, *pOffset);
57,476✔
163
  if (code != 0) {
57,476!
UNCOV
164
    goto end;
×
165
  }
166

167
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
57,476✔
168
  if (code != 0) {
57,476!
UNCOV
169
    goto end;
×
170
  }
171

172
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
57,476✔
173
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
57,476✔
174
    goto end;
659✔
175
  }
176

177
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
178
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
56,817✔
179
    // lock
180
    taosWLockLatch(&pTq->lock);
27,082✔
181
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
27,082✔
182
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
27,082✔
183
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
26,105✔
184
      taosWUnLockLatch(&pTq->lock);
26,105✔
185
      goto end;
26,105✔
186
    }
187
    taosWUnLockLatch(&pTq->lock);
977✔
188
  }
189

190
  // reqOffset represents the current date offset, may be changed if wal not exists
191
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
30,712✔
192
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
30,712✔
193

194
end : {
57,476✔
195
  char buf[TSDB_OFFSET_LEN] = {0};
57,476✔
196
  tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
57,476✔
197
  tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64
57,475!
198
          " code:%d",
199
          consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
200
  tDeleteMqDataRsp(&dataRsp);
57,476✔
201
  return code;
57,476✔
202
}
203
}
204

205
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
206
  SDecoder decoder = {0};                                                                                  \
207
  TYPE     req = {0};                                                                                      \
208
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
209
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
210
  tDecoderInit(&decoder, data, len);                                                                       \
211
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
212
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
213
            " msgType %d",                                                                                 \
214
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
215
    fetchVer++;                                                                                            \
216
    DELETE_FUNC(&req);                                                                                     \
217
    tDecoderClear(&decoder);                                                                               \
218
    continue;                                                                                              \
219
  }                                                                                                        \
220
  DELETE_FUNC(&req);                                                                                       \
221
  tDecoderClear(&decoder);
222

223
static void tDeleteCommon(void* parm) {}
321✔
224

225
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
11,104✔
226
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
227
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || offset == NULL) {
11,104!
UNCOV
228
    return TSDB_CODE_INVALID_PARA;
×
229
  }
230
  int32_t         vgId = TD_VID(pTq->pVnode);
11,104✔
231
  SMqDataRsp      taosxRsp = {0};
11,104✔
232
  SMqBatchMetaRsp btMetaRsp = {0};
11,104✔
233
  int32_t         code = 0;
11,104✔
234

235
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
11,104!
236
  if (offset->type != TMQ_OFFSET__LOG) {
11,104✔
237
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset));
80!
238

239
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
80✔
240
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
16✔
241
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
16!
242
              ",ts:%" PRId64,
243
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,
244
              btMetaRsp.rspOffset.ts);
245
      goto END;
16✔
246
    }
247

248
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
64!
249
            ",ts:%" PRId64,
250
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
251
            taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
252
    if (taosxRsp.blockNum > 0) {
64✔
253
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
58✔
254
      goto END;
58✔
255
    } else {
256
      tOffsetCopy(offset, &taosxRsp.rspOffset);
6✔
257
    }
258
  }
259

260
  if (offset->type == TMQ_OFFSET__LOG) {
11,030!
261
    walReaderVerifyOffset(pHandle->pWalReader, offset);
11,030✔
262
    int64_t fetchVer = offset->version;
11,030✔
263

264
    uint64_t st = taosGetTimestampMs();
11,030✔
265
    int      totalRows = 0;
11,030✔
266
    int32_t  totalMetaRows = 0;
11,030✔
267
    while (1) {
121,776✔
268
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
132,806✔
269
      if (savedEpoch > pRequest->epoch) {
132,757!
UNCOV
270
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
271
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
UNCOV
272
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
273
        goto END;
11,030✔
274
      }
275

276
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
132,757✔
277
        if (totalMetaRows > 0) {
10,356✔
278
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
2✔
279
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
2✔
280
          if (totalRows != 0) {
2!
UNCOV
281
            tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, totalRows error, vgId:%d offset %" PRId64,
×
282
                    pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
UNCOV
283
            code = code == 0 ? TSDB_CODE_TQ_INTERNAL_ERROR : code;
×
284
          }
285
          goto END;
2✔
286
        }
287
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
10,354✔
288
        code = tqSendDataRsp(
10,354✔
289
            pHandle, pMsg, pRequest, &taosxRsp,
290
            taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
10,354✔
291
        goto END;
10,354✔
292
      }
293

294
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
122,577✔
295
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
122,577!
296
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
297

298
      // process meta
299
      if (pHead->msgType != TDMT_VND_SUBMIT) {
122,590✔
300
        if (totalRows > 0) {
558✔
301
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
26✔
302
          code = tqSendDataRsp(
26✔
303
              pHandle, pMsg, pRequest, &taosxRsp,
304
              taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
26✔
305
          goto END;
215✔
306
        }
307

308
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
532✔
309
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
529✔
310
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
170!
311
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
359✔
312
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
44!
313
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
396✔
314
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
272!
315
          } else if (pHead->msgType == TDMT_VND_DELETE) {
43✔
316
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
5!
317
          }
318
        }
319

320
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
191!
321
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
322
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
191!
323
          SMqMetaRsp metaRsp = {0};
189✔
324
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
189✔
325
          metaRsp.resMsgType = pHead->msgType;
189✔
326
          metaRsp.metaRspLen = pHead->bodyLen;
189✔
327
          metaRsp.metaRsp = pHead->body;
189✔
328
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
189✔
329
          goto END;
189✔
330
        }
331

332
        if (!btMetaRsp.batchMetaReq) {
2!
333
          btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
2✔
334
          if (btMetaRsp.batchMetaReq == NULL) {
2!
UNCOV
335
            code = TAOS_GET_TERRNO(terrno);
×
UNCOV
336
            goto END;
×
337
          }
338
          btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
2✔
339
          if (btMetaRsp.batchMetaLen == NULL) {
2!
UNCOV
340
            code = TAOS_GET_TERRNO(terrno);
×
UNCOV
341
            goto END;
×
342
          }
343
        }
344
        fetchVer++;
2✔
345

346
        SMqMetaRsp tmpMetaRsp = {0};
2✔
347
        tmpMetaRsp.resMsgType = pHead->msgType;
2✔
348
        tmpMetaRsp.metaRspLen = pHead->bodyLen;
2✔
349
        tmpMetaRsp.metaRsp = pHead->body;
2✔
350
        uint32_t len = 0;
2✔
351
        tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
2!
352
        if (TSDB_CODE_SUCCESS != code) {
2!
UNCOV
353
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
354
          continue;
×
355
        }
356
        int32_t tLen = sizeof(SMqRspHead) + len;
2✔
357
        void*   tBuf = taosMemoryCalloc(1, tLen);
2!
358
        if (tBuf == NULL) {
2!
359
          code = TAOS_GET_TERRNO(terrno);
×
UNCOV
360
          goto END;
×
361
        }
362
        void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
2✔
363
        SEncoder encoder = {0};
2✔
364
        tEncoderInit(&encoder, metaBuff, len);
2✔
365
        code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
2✔
366
        tEncoderClear(&encoder);
2✔
367

368
        if (code < 0) {
2!
369
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
UNCOV
370
          continue;
×
371
        }
372
        if (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf) == NULL) {
4!
UNCOV
373
          code = TAOS_GET_TERRNO(terrno);
×
UNCOV
374
          goto END;
×
375
        }
376
        if (taosArrayPush(btMetaRsp.batchMetaLen, &tLen) == NULL) {
4!
377
          code = TAOS_GET_TERRNO(terrno);
×
UNCOV
378
          goto END;
×
379
        }
380
        totalMetaRows++;
2✔
381
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) {
4!
UNCOV
382
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
UNCOV
383
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
UNCOV
384
          goto END;
×
385
        }
386
        continue;
2✔
387
      }
388

389
      if (totalMetaRows > 0) {
122,032!
UNCOV
390
        tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
391
        code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
UNCOV
392
        goto END;
×
393
      }
394

395
      // process data
396
      SPackedData submit = {
122,032✔
397
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
122,032✔
398
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
122,032✔
399
          .ver = pHead->version,
122,032✔
400
      };
401

402
      code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded);
122,032✔
403
      if (code < 0) {
121,916!
UNCOV
404
        tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
405
                pRequest->subKey);
406
        goto END;
×
407
      }
408

409
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
243,349!
410
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
459✔
411
        code = tqSendDataRsp(
459✔
412
            pHandle, pMsg, pRequest, &taosxRsp,
413
            taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
459!
414
        goto END;
459✔
415
      } else {
416
        fetchVer++;
121,433✔
417
      }
418
    }
419
  }
420

UNCOV
421
END:
×
422
  tDeleteMqBatchMetaRsp(&btMetaRsp);
11,104✔
423
  tDeleteSTaosxRsp(&taosxRsp);
11,104✔
424
  return code;
11,104✔
425
}
426

427
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
69,266✔
428
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
69,266!
UNCOV
429
    return TSDB_CODE_TMQ_INVALID_MSG;
×
430
  }
431
  int32_t      code = 0;
69,266✔
432
  STqOffsetVal reqOffset = {0};
69,266✔
433
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
69,266✔
434

435
  // reset the offset if needed
436
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
69,265✔
437
    bool blockReturned = false;
2,439✔
438
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
2,439✔
439
    if (code != 0) {
2,440✔
440
      goto END;
686✔
441
    }
442

443
    // empty block returned, quit
444
    if (blockReturned) {
1,808✔
445
      goto END;
54✔
446
    }
447
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
66,826!
UNCOV
448
    uError("req offset type is 0");
×
UNCOV
449
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
UNCOV
450
    goto END;
×
451
  }
452

453
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
68,580✔
454
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
57,476✔
455
  } else {
456
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
11,104✔
457
  }
458

459
END:
69,266✔
460
  tOffsetDestroy(&reqOffset);
69,266✔
461
  return code;
69,266✔
462
}
463

464
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
42,568✔
465
                          int64_t ever) {
466
  if (pMsgHead == NULL) {
42,568!
UNCOV
467
    return;
×
468
  }
469
  pMsgHead->consumerId = consumerId;
42,568✔
470
  pMsgHead->epoch = epoch;
42,568✔
471
  pMsgHead->mqMsgType = type;
42,568✔
472
  pMsgHead->walsver = sver;
42,568✔
473
  pMsgHead->walever = ever;
42,568✔
474
}
475

476
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
18✔
477
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
478
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
18!
UNCOV
479
    return TSDB_CODE_TMQ_INVALID_MSG;
×
480
  }
481
  int32_t len = 0;
18✔
482
  int32_t code = 0;
18✔
483
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
18!
484
  if (code < 0) {
18!
UNCOV
485
    return TAOS_GET_TERRNO(code);
×
486
  }
487
  int32_t tlen = sizeof(SMqRspHead) + len;
18✔
488
  void*   buf = rpcMallocCont(tlen);
18✔
489
  if (buf == NULL) {
18!
UNCOV
490
    return TAOS_GET_TERRNO(terrno);
×
491
  }
492

493
  int64_t sver = 0, ever = 0;
18✔
494
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
18✔
495
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
18✔
496

497
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
18✔
498

499
  SEncoder encoder = {0};
18✔
500
  tEncoderInit(&encoder, abuf, len);
18✔
501
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
18✔
502
  tEncoderClear(&encoder);
18✔
503
  if (code < 0) {
18!
UNCOV
504
    rpcFreeCont(buf);
×
UNCOV
505
    return TAOS_GET_TERRNO(code);
×
506
  }
507
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
18✔
508

509
  tmsgSendRsp(&resp);
18✔
510
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
18!
511
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
512

513
  return 0;
18✔
514
}
515

516
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
189✔
517
                          int32_t vgId) {
518
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
189!
UNCOV
519
    return TSDB_CODE_TMQ_INVALID_MSG;
×
520
  }
521
  int32_t len = 0;
189✔
522
  int32_t code = 0;
189✔
523
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
189!
524
  if (code < 0) {
189!
UNCOV
525
    return TAOS_GET_TERRNO(code);
×
526
  }
527
  int32_t tlen = sizeof(SMqRspHead) + len;
189✔
528
  void*   buf = rpcMallocCont(tlen);
189✔
529
  if (buf == NULL) {
189!
UNCOV
530
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
531
  }
532

533
  int64_t sver = 0, ever = 0;
189✔
534
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
189✔
535
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
189✔
536

537
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
189✔
538

539
  SEncoder encoder = {0};
189✔
540
  tEncoderInit(&encoder, abuf, len);
189✔
541
  code = tEncodeMqMetaRsp(&encoder, pRsp);
189✔
542
  tEncoderClear(&encoder);
189✔
543
  if (code < 0) {
189!
UNCOV
544
    rpcFreeCont(buf);
×
UNCOV
545
    return TAOS_GET_TERRNO(code);
×
546
  }
547

548
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
189✔
549

550
  tmsgSendRsp(&resp);
189✔
551
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
189!
552
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
553

554
  return 0;
189✔
555
}
556

557
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
42,361✔
558
                        int32_t type, int64_t sver, int64_t ever) {
559
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
42,361!
UNCOV
560
    return TSDB_CODE_TMQ_INVALID_MSG;
×
561
  }
562
  int32_t len = 0;
42,361✔
563
  int32_t code = 0;
42,361✔
564

565
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
42,361✔
566
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
42,349!
567
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
12!
568
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
12!
569
  }
570

571
  if (code < 0) {
42,360!
UNCOV
572
    return TAOS_GET_TERRNO(code);
×
573
  }
574

575
  int32_t tlen = sizeof(SMqRspHead) + len;
42,360✔
576
  void*   buf = rpcMallocCont(tlen);
42,360✔
577
  if (buf == NULL) {
42,361!
UNCOV
578
    return terrno;
×
579
  }
580

581
  SMqRspHead* pHead = (SMqRspHead*)buf;
42,361✔
582
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
42,361✔
583

584
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
42,361✔
585

586
  SEncoder encoder = {0};
42,361✔
587
  tEncoderInit(&encoder, abuf, len);
42,361✔
588

589
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
42,361✔
590
    code = tEncodeMqDataRsp(&encoder, pRsp);
42,349✔
591
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
12!
592
    code = tEncodeSTaosxRsp(&encoder, pRsp);
12✔
593
  }
594
  tEncoderClear(&encoder);
42,362✔
595
  if (code < 0) {
42,361!
UNCOV
596
    rpcFreeCont(buf);
×
UNCOV
597
    return TAOS_GET_TERRNO(code);
×
598
  }
599
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
42,361✔
600

601
  tmsgSendRsp(&rsp);
42,361✔
602
  return 0;
42,362✔
603
}
604

605
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
10,466✔
606
  int32_t     code = 0;
10,466✔
607
  int32_t     line = 0;
10,466✔
608
  SDecoder*   pCoder = &(SDecoder){0};
10,466✔
609
  SDeleteRes* pRes = &(SDeleteRes){0};
10,466✔
610

611
  *pRefBlock = NULL;
10,466✔
612

613
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
10,466✔
614
  TSDB_CHECK_NULL(pRes->uidList, code, line, END, terrno)
10,467!
615

616
  tDecoderInit(pCoder, (uint8_t*)pData, len);
10,467✔
617
  code = tDecodeDeleteRes(pCoder, pRes);
10,467✔
618
  TSDB_CHECK_CODE(code, line, END);
10,462!
619

620
  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
10,462✔
621
  if (numOfTables == 0 || pRes->affectedRows == 0) {
10,461✔
622
    goto END;
8,102✔
623
  }
624

625
  SSDataBlock* pDelBlock = NULL;
2,359✔
626
  code = createSpecialDataBlock(blockType, &pDelBlock);
2,359✔
627
  TSDB_CHECK_CODE(code, line, END);
2,366!
628

629
  code = blockDataEnsureCapacity(pDelBlock, numOfTables);
2,366✔
630
  TSDB_CHECK_CODE(code, line, END);
2,366!
631

632
  pDelBlock->info.rows = numOfTables;
2,366✔
633
  pDelBlock->info.version = ver;
2,366✔
634

635
  for (int32_t i = 0; i < numOfTables; i++) {
4,737✔
636
    // start key column
637
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
2,373✔
638
    TSDB_CHECK_NULL(pStartCol, code, line, END, terrno)
2,372!
639
    code = colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
2,372✔
640
    TSDB_CHECK_CODE(code, line, END);
2,372!
641
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
2,372✔
642
    TSDB_CHECK_NULL(pEndCol, code, line, END, terrno)
2,372!
643
    code = colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
2,372✔
644
    TSDB_CHECK_CODE(code, line, END);
2,372!
645
    // uid column
646
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
2,372✔
647
    TSDB_CHECK_NULL(pUidCol, code, line, END, terrno)
2,372!
648

649
    int64_t* pUid = taosArrayGet(pRes->uidList, i);
2,372✔
650
    code = colDataSetVal(pUidCol, i, (const char*)pUid, false);
2,372✔
651
    TSDB_CHECK_CODE(code, line, END);
2,372!
652
    void* tmp = taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX);
2,372✔
653
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,372!
654
    colDataSetNULL(tmp, i);
2,372!
655
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
2,372✔
656
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,372!
657
    colDataSetNULL(tmp, i);
2,372!
658
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
2,372✔
659
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,372!
660
    colDataSetNULL(tmp, i);
2,372!
661
    tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
2,372✔
662
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,371!
663
    colDataSetNULL(tmp, i);
2,371!
664
  }
665

666
  if (type == 0) {
2,364✔
667
    code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
2,356✔
668
    if (code) {
2,359!
UNCOV
669
      blockDataCleanup(pDelBlock);
×
UNCOV
670
      taosMemoryFree(pDelBlock);
×
UNCOV
671
      return code;
×
672
    }
673

674
    ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
2,359✔
675
    ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
2,359✔
676
  } else if (type == 1) {
8!
677
    *pRefBlock = pDelBlock;
8✔
678
  } else {
UNCOV
679
    tqError("unknown type:%d", type);
×
UNCOV
680
    code = TSDB_CODE_TMQ_CONSUMER_ERROR;
×
681
  }
682

683
END:
10,469✔
684
  if (code != 0) {
10,469!
UNCOV
685
    tqError("failed to extract delete data block, line:%d code:%d", line, code);
×
686
  }
687
  tDecoderClear(pCoder);
10,469✔
688
  taosArrayDestroy(pRes->uidList);
10,469✔
689
  return code;
10,468✔
690
}
691

692
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
6,140✔
693
  SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
6,140✔
694
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
6,140✔
695
  int32_t      code = TSDB_CODE_SUCCESS;
6,140✔
696

697
  if (pDelay != NULL) {
6,140!
698
    *pDelay = 0;
6,140✔
699
  }
700

701
  *fhFinished = false;
6,140✔
702

703
  if (numOfTasks <= 0) {
6,140!
UNCOV
704
    return code;
×
705
  }
706

707
  // extract the required source task for a given stream, identified by streamId
708
  streamMetaRLock(pMeta);
6,140✔
709

710
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
6,140✔
711

712
  for (int32_t i = 0; i < numOfTasks; ++i) {
37,690✔
713
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
31,550✔
714
    if (pId == NULL) {
31,550!
715
      continue;
25,416✔
716
    }
717
    if (pId->streamId != streamId) {
31,550✔
718
      continue;
19,276✔
719
    }
720

721
    STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
12,274✔
722
    SStreamTask* pTask = NULL;
12,274✔
723

724
    code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
12,274✔
725
    if (code != 0) {
12,280!
UNCOV
726
      tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
×
UNCOV
727
      continue;
×
728
    }
729

730
    if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
12,280✔
731
      streamMetaReleaseTask(pMeta, pTask);
6,140✔
732
      continue;
6,140✔
733
    }
734

735
    // here we get the required stream source task
736
    *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
6,140✔
737

738
    int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
6,140✔
739
    if (ver == -1) {
6,140✔
740
      ver = pTask->chkInfo.processedVer;
1,940✔
741
    } else {
742
      ver--;
4,200✔
743
    }
744

745
    SVersionRange verRange = {0};
6,140✔
746
    walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
6,140✔
747

748
    SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
6,140✔
749
    if (pReader == NULL) {
6,140!
UNCOV
750
      tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
×
UNCOV
751
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
752
      continue;
×
753
    }
754

755
    int64_t cur = 0;
6,140✔
756
    int64_t latest = 0;
6,140✔
757

758
    code = walFetchHead(pReader, ver);
6,140✔
759
    if (code == TSDB_CODE_SUCCESS) {
6,140!
760
      cur = pReader->pHead->head.ingestTs;
6,140✔
761
    }
762

763
    if (ver == verRange.maxVer) {
6,140✔
764
      latest = cur;
2,607✔
765
    } else {
766
      code = walFetchHead(pReader, verRange.maxVer);
3,533✔
767
      if (code == TSDB_CODE_SUCCESS) {
3,533!
768
        latest = pReader->pHead->head.ingestTs;
3,533✔
769
      }
770
    }
771

772
    if (pDelay != NULL) {  // delay in ms
6,140!
773
      *pDelay = (latest - cur) / 1000;
6,140✔
774
    }
775

776
    walCloseReader(pReader);
6,140✔
777
    streamMetaReleaseTask(pMeta, pTask);
6,140✔
778
  }
779

780
  streamMetaRUnLock(pMeta);
6,140✔
781

782
  return TSDB_CODE_SUCCESS;
6,140✔
783
}
784

785
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
185✔
786
  int32_t          code = 0;
185✔
787
  int32_t          lino = 0;
185✔
788
  SDecoder         dc = {0};
185✔
789
  SVDropTbBatchReq batchReq = {0};
185✔
790
  tDecoderInit(&dc, (uint8_t*)data, len);
185✔
791
  code = tDecodeSVDropTbBatchReq(&dc, &batchReq);
185✔
792
  TSDB_CHECK_CODE(code, lino, _exit);
185!
793
  if (batchReq.nReqs <= 0) goto _exit;
185!
794

795
  SSDataBlock* pBlock = NULL;
185✔
796
  code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock);
185✔
797
  TSDB_CHECK_CODE(code, lino, _exit);
185!
798

799
  code = blockDataEnsureCapacity(pBlock, batchReq.nReqs);
185✔
800
  TSDB_CHECK_CODE(code, lino, _exit);
185!
801

802
  pBlock->info.rows = batchReq.nReqs;
185✔
803
  pBlock->info.version = ver;
185✔
804
  for (int32_t i = 0; i < batchReq.nReqs; ++i) {
375✔
805
    SVDropTbReq* pReq = batchReq.pReqs + i;
190✔
806
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
190✔
807
    TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
190!
808
    code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
190✔
809
    TSDB_CHECK_CODE(code, lino, _exit);
190!
810
  }
811

812
  code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
185✔
813
  TSDB_CHECK_CODE(code, lino, _exit);
185!
814
  ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
185✔
815
  ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock;
185✔
816

817
_exit:
185✔
818
  tDecoderClear(&dc);
185✔
819
  if (TSDB_CODE_SUCCESS != code) {
185!
UNCOV
820
    tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code));
×
UNCOV
821
    blockDataCleanup(pBlock);
×
UNCOV
822
    taosMemoryFree(pBlock);
×
823
  }
824
  return code;
185✔
825
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc