• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3661

17 Mar 2025 05:39AM UTC coverage: 62.007% (-0.03%) from 62.039%
#3661

push

travis-ci

web-flow
tests: add tdb ut (#30093)

* fix: compile warnings

* tests: add tdb ut

* test(tdb): fix return code

* test: recover ut

* fix: minor changes

* fix: enable test

* fix: ut errors

---------

Co-authored-by: Minglei Jin <mljin@taosdata.com>

153829 of 317582 branches covered (48.44%)

Branch coverage included in aggregate %.

240310 of 318051 relevant lines covered (75.56%)

19602636.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.22
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
289,248✔
24
  int32_t    code = TDB_CODE_SUCCESS;
289,248✔
25
  int32_t    lino = 0;
289,248✔
26
  tqDebug("%s called", __FUNCTION__ );
289,248✔
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
289,248!
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
289,248✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
289,248!
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
289,248✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
289,247!
34

35
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
289,247✔
36
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
289,248✔
37
  pRsp->withTbName = 0;
289,247✔
38
  pRsp->withSchema = false;
289,247✔
39

40
END:
289,247✔
41
  if (code != 0){
289,247!
42
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
43
  }
44
  return code;
289,247✔
45
}
46

47
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
14,561✔
48
  SSyncState state = syncGetState(pTq->pVnode->sync);
14,561✔
49
  streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
14,582✔
50

51
  if (isLeader) {
14,583✔
52
    tqScanWalAsync(pTq);
9,949✔
53
  }
54
}
14,583✔
55

56
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
3,015,735✔
57
  int32_t    code = TDB_CODE_SUCCESS;
3,015,735✔
58
  int32_t    lino = 0;
3,015,735✔
59
  tqDebug("%s called", __FUNCTION__ );
3,015,735!
60
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
3,015,742!
61
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
3,015,742✔
62
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
3,015,737✔
63

64
  pRsp->withTbName = 1;
3,015,733✔
65
  pRsp->withSchema = 1;
3,015,733✔
66
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
3,015,733✔
67
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
3,015,733!
68

69
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
3,015,733✔
70
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
3,015,734!
71

72
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
3,015,734✔
73
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
3,015,738!
74

75
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
3,015,738✔
76
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
3,015,734!
77

78
END:
3,015,734✔
79
  if (code != 0){
3,015,734!
80
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
81
    taosArrayDestroy(pRsp->blockData);
×
82
    taosArrayDestroy(pRsp->blockDataLen);
×
83
    taosArrayDestroy(pRsp->blockTbName);
×
84
    taosArrayDestroy(pRsp->blockSchema);
×
85
  }
86
  return code;
3,015,732✔
87
}
88

89
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
342,066✔
90
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
91
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
342,066!
92
    return TSDB_CODE_INVALID_PARA;
×
93
  }
94
  uint64_t   consumerId = pRequest->consumerId;
342,067✔
95
  STqOffset* pOffset = NULL;
342,067✔
96
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
342,067✔
97
  int32_t    vgId = TD_VID(pTq->pVnode);
342,066✔
98

99
  *pBlockReturned = false;
342,066✔
100
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
101
  if (code == 0) {
342,066✔
102
    tOffsetCopy(pOffsetVal, &pOffset->val);
276✔
103

104
    char formatBuf[TSDB_OFFSET_LEN] = {0};
275✔
105
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
275✔
106
    tqDebug("tmq poll: consumer:0x%" PRIx64
275!
107
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
108
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
109
    return 0;
275✔
110
  } else {
111
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
112
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
341,790✔
113
      if (pRequest->useSnapshot) {
195,163✔
114
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
194,282!
115
                consumerId, pHandle->subKey, vgId);
116
        if (pHandle->fetchMeta) {
194,281✔
117
          tqOffsetResetToMeta(pOffsetVal, 0);
118
        } else {
119
          SValue val = {0};
194,265!
120
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
194,264✔
121
        }
122
      } else {
123
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
881✔
124
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
881✔
125
      }
126
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
146,627✔
127
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
66✔
128
      SMqDataRsp dataRsp = {0};
66✔
129
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
66✔
130

131
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
66✔
132
      if (code != 0) {
66!
133
        return code;
×
134
      }
135
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
66✔
136
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
137
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
66!
138
      tDeleteMqDataRsp(&dataRsp);
66✔
139

140
      *pBlockReturned = true;
66✔
141
      return code;
66✔
142
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
146,561✔
143
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
146,560!
144
                  " in vg %d, subkey %s, reset none failed",
145
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
146
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
146,564✔
147
    }
148
  }
149

150
  return 0;
195,162✔
151
}
152

153
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
288,378✔
154
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
155
  int32_t    code = TDB_CODE_SUCCESS;
288,378✔
156
  int32_t    lino = 0;
288,378✔
157
  tqDebug("%s called", __FUNCTION__ );
288,378✔
158
  uint64_t consumerId = pRequest->consumerId;
288,382✔
159
  int32_t  vgId = TD_VID(pTq->pVnode);
288,382✔
160
  terrno = 0;
288,382✔
161

162
  SMqDataRsp dataRsp = {0};
288,382✔
163
  code = tqInitDataRsp(&dataRsp, *pOffset);
288,382✔
164
  TSDB_CHECK_CODE(code, lino, end);
288,382!
165

166
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
288,382✔
167
  TSDB_CHECK_CODE(code, lino, end);
288,382!
168

169
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
288,382✔
170
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
288,379✔
171
    goto end;
194,085✔
172
  }
173

174
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
175
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
94,292✔
176
    // lock
177
    taosWLockLatch(&pTq->lock);
29,150✔
178
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
29,150✔
179
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
29,150✔
180
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
27,943✔
181
      taosWUnLockLatch(&pTq->lock);
27,943✔
182
      goto end;
27,943✔
183
    }
184
    taosWUnLockLatch(&pTq->lock);
1,207✔
185
  }
186

187
  // reqOffset represents the current date offset, may be changed if wal not exists
188
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
66,350✔
189
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
66,350✔
190

191
end:
288,378✔
192
  {
193
    char buf[TSDB_OFFSET_LEN] = {0};
288,378✔
194
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
288,378✔
195
    if (code != 0){
288,381✔
196
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
194,085!
197
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
198
    } else {
199
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
94,296✔
200
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
201
    }
202

203
    tDeleteMqDataRsp(&dataRsp);
288,381✔
204
    return code;
288,375✔
205
  }
206
}
207

208
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
209
  SDecoder decoder = {0};                                                                                  \
210
  TYPE     req = {0};                                                                                      \
211
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
212
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
213
  tDecoderInit(&decoder, data, len);                                                                       \
214
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
215
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
216
            " msgType %d",                                                                                 \
217
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
218
    fetchVer++;                                                                                            \
219
    DELETE_FUNC(&req);                                                                                     \
220
    tDecoderClear(&decoder);                                                                               \
221
    continue;                                                                                              \
222
  }                                                                                                        \
223
  DELETE_FUNC(&req);                                                                                       \
224
  tDecoderClear(&decoder);
225

226
static void tDeleteCommon(void* parm) {}
405✔
227

228
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
229
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
230
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
231

232
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
3✔
233
  int32_t         code = 0;
3✔
234

235
  if (!btMetaRsp->batchMetaReq) {
3!
236
    btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
3✔
237
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
3!
238
    btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
3✔
239
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
3!
240
  }
241

242
  SMqMetaRsp tmpMetaRsp = {0};
3✔
243
  tmpMetaRsp.resMsgType = type;
3✔
244
  tmpMetaRsp.metaRspLen = bodyLen;
3✔
245
  tmpMetaRsp.metaRsp = body;
3✔
246
  uint32_t len = 0;
3✔
247
  tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
3!
248
  if (TSDB_CODE_SUCCESS != code) {
3!
249
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
250
    goto END;
×
251
  }
252
  int32_t tLen = sizeof(SMqRspHead) + len;
3✔
253
  void*   tBuf = taosMemoryCalloc(1, tLen);
3!
254
  TQ_NULL_GO_TO_END(tBuf);
3!
255
  void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
3✔
256
  SEncoder encoder = {0};
3✔
257
  tEncoderInit(&encoder, metaBuff, len);
3✔
258
  code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
3✔
259
  tEncoderClear(&encoder);
3✔
260

261
  if (code < 0) {
3!
262
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
263
    goto END;
×
264
  }
265
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
6!
266
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
6!
267

268
END:
3✔
269
  return code;
3✔
270
}
271

272
static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
1✔
273
  int32_t code = 0;
1✔
274
  SVCreateTbBatchReq pReq = {0};
1✔
275
  pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
1✔
276
  pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
1✔
277
  TQ_NULL_GO_TO_END(pReq.pArray);
1!
278
  for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
5✔
279
    void   *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
4✔
280
    TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
8!
281
  }
282
  tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
1!
283
  if (code < 0) {
1!
284
    goto END;
×
285
  }
286
  *len += sizeof(SMsgHead);
1✔
287
  *pBuf = taosMemoryMalloc(*len);
1!
288
  TQ_NULL_GO_TO_END(pBuf);
1!
289
  SEncoder coder = {0};
1✔
290
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
1✔
291
  code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
1✔
292
  tEncoderClear(&coder);
1✔
293

294
END:
1✔
295
  taosArrayDestroy(pReq.pArray);
1✔
296
  return code;
1✔
297
}
298

299
#define SEND_BATCH_META_RSP \
300
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
301
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
302
goto END;
303

304
#define SEND_DATA_RSP \
305
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
306
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
307
goto END;
308
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
3,015,734✔
309
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
310
  int32_t         vgId = TD_VID(pTq->pVnode);
3,015,734✔
311
  SMqDataRsp      taosxRsp = {0};
3,015,734✔
312
  SMqBatchMetaRsp btMetaRsp = {0};
3,015,734✔
313
  int32_t         code = 0;
3,015,734✔
314

315
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
3,015,734!
316
  if (offset->type != TMQ_OFFSET__LOG) {
3,015,733✔
317
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
151!
318

319
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
151✔
320
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
14✔
321
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
14!
322
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
323
      goto END;
14✔
324
    }
325

326
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
137!
327
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
328
    if (taosxRsp.blockNum > 0) {
137✔
329
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
131✔
330
      goto END;
131✔
331
    } else {
332
      tOffsetCopy(offset, &taosxRsp.rspOffset);
6✔
333
    }
334
  }
335

336
  if (offset->type == TMQ_OFFSET__LOG) {
3,015,589!
337
    walReaderVerifyOffset(pHandle->pWalReader, offset);
3,015,589✔
338
    int64_t fetchVer = offset->version;
3,015,592✔
339

340
    uint64_t st = taosGetTimestampMs();
3,015,583✔
341
    int      totalRows = 0;
3,015,583✔
342
    int32_t  totalMetaRows = 0;
3,015,583✔
343
    while (1) {
121,638✔
344
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
3,137,221✔
345
      if (savedEpoch > pRequest->epoch) {
3,137,138!
346
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
347
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
348
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
349
        goto END;
3,015,496✔
350
      }
351

352
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
3,137,138✔
353
        if (totalMetaRows > 0) {
3,014,877✔
354
          SEND_BATCH_META_RSP
2✔
355
        }
356
        SEND_DATA_RSP
3,014,875✔
357
      }
358

359
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
122,407✔
360
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
122,407!
361
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
362

363
      // process meta
364
      if (pHead->msgType != TDMT_VND_SUBMIT) {
122,408✔
365
        if (totalRows > 0) {
667✔
366
          SEND_DATA_RSP
35!
367
        }
368

369
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
632✔
370
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
629✔
371
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
186!
372
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
443✔
373
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
55!
374
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
495✔
375
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
343!
376
          } else if (pHead->msgType == TDMT_VND_DELETE) {
45✔
377
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
7!
378
          }
379
        }
380

381
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
230!
382
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
383
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
230!
384
          SMqMetaRsp metaRsp = {0};
228✔
385
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
228✔
386
          metaRsp.resMsgType = pHead->msgType;
228✔
387
          metaRsp.metaRspLen = pHead->bodyLen;
228✔
388
          metaRsp.metaRsp = pHead->body;
228✔
389
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
228✔
390
          goto END;
228✔
391
        }
392
        code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
2✔
393
        fetchVer++;
2✔
394
        if (code != 0){
2!
395
          goto END;
×
396
        }
397
        totalMetaRows++;
2✔
398
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
4!
399
          SEND_BATCH_META_RSP
×
400
        }
401
        continue;
2✔
402
      }
403

404
      if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
121,741!
405
        SEND_BATCH_META_RSP
×
406
      }
407

408
      // process data
409
      SPackedData submit = {
121,741✔
410
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
121,741✔
411
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
121,741✔
412
          .ver = pHead->version,
121,741✔
413
      };
414

415
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
121,741!
416

417
      if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
121,667✔
418
        int32_t len = 0;
1✔
419
        void *pBuf = NULL;
1✔
420
        code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
1✔
421
        if (code == 0){
1!
422
          code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
1✔
423
        }
424
        taosMemoryFree(pBuf);
1!
425
        taosArrayDestroyP(taosxRsp.createTableReq, NULL);
1✔
426
        taosxRsp.createTableReq = NULL;
1✔
427
        fetchVer++;
1✔
428
        if (code != 0){
1!
429
          goto END;
1✔
430
        }
431
        totalMetaRows++;
1✔
432
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) ||
1!
433
            (taosGetTimestampMs() - st > pRequest->timeout) ||
1!
434
            (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
1!
435
          SEND_BATCH_META_RSP
1✔
436
        }
437
        continue;
×
438
      }
439

440
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
121,666✔
441
          (taosGetTimestampMs() - st > pRequest->timeout) ||
121,228!
442
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
121,249!
443
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
25✔
444
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
418✔
445
          terrno = 0;
5✔
446
        } else{
447
          fetchVer++;
443✔
448
        }
449
        SEND_DATA_RSP
449✔
450
      } else {
451
        fetchVer++;
121,234✔
452
      }
453
    }
454
  }
455

456
END:
×
457
  if (code != 0){
3,015,641!
458
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
459
            pRequest->subKey);
460
  }
461
  tDeleteMqBatchMetaRsp(&btMetaRsp);
3,015,641✔
462
  tDeleteSTaosxRsp(&taosxRsp);
3,015,659✔
463
  return code;
3,015,600✔
464
}
465

466
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
3,450,739✔
467
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
3,450,739!
468
    return TSDB_CODE_TMQ_INVALID_MSG;
×
469
  }
470
  int32_t      code = 0;
3,450,753✔
471
  STqOffsetVal reqOffset = {0};
3,450,753✔
472
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
3,450,753✔
473

474
  // reset the offset if needed
475
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
3,450,746✔
476
    bool blockReturned = false;
342,071✔
477
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
342,071✔
478
    if (code != 0) {
342,065✔
479
      goto END;
146,629✔
480
    }
481

482
    // empty block returned, quit
483
    if (blockReturned) {
195,502✔
484
      goto END;
66✔
485
    }
486
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
3,108,675!
487
    uError("req offset type is 0");
×
488
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
489
    goto END;
×
490
  }
491

492
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
3,304,111✔
493
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
288,378✔
494
  } else {
495
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
3,015,733✔
496
  }
497

498
END:
3,450,556✔
499
  if (code != 0){
3,450,556✔
500
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
340,644!
501
  }
502
  tOffsetDestroy(&reqOffset);
3,450,564✔
503
  return code;
3,450,561✔
504
}
505

506
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
3,082,919✔
507
                          int64_t ever) {
508
  if (pMsgHead == NULL) {
3,082,919!
509
    return;
×
510
  }
511
  pMsgHead->consumerId = consumerId;
3,082,919✔
512
  pMsgHead->epoch = epoch;
3,082,919✔
513
  pMsgHead->mqMsgType = type;
3,082,919✔
514
  pMsgHead->walsver = sver;
3,082,919✔
515
  pMsgHead->walever = ever;
3,082,919✔
516
}
517

518
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
17✔
519
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
520
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
17!
521
    return TSDB_CODE_TMQ_INVALID_MSG;
×
522
  }
523
  int32_t len = 0;
17✔
524
  int32_t code = 0;
17✔
525
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
17!
526
  if (code < 0) {
17!
527
    return TAOS_GET_TERRNO(code);
×
528
  }
529
  int32_t tlen = sizeof(SMqRspHead) + len;
17✔
530
  void*   buf = rpcMallocCont(tlen);
17✔
531
  if (buf == NULL) {
17!
532
    return TAOS_GET_TERRNO(terrno);
×
533
  }
534

535
  int64_t sver = 0, ever = 0;
17✔
536
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
17✔
537
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
17✔
538

539
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
17✔
540

541
  SEncoder encoder = {0};
17✔
542
  tEncoderInit(&encoder, abuf, len);
17✔
543
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
17✔
544
  tEncoderClear(&encoder);
17✔
545
  if (code < 0) {
17!
546
    rpcFreeCont(buf);
×
547
    return TAOS_GET_TERRNO(code);
×
548
  }
549
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
17✔
550

551
  tmsgSendRsp(&resp);
17✔
552
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
17!
553
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
554

555
  return 0;
17✔
556
}
557

558
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
228✔
559
                          int32_t vgId) {
560
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
228!
561
    return TSDB_CODE_TMQ_INVALID_MSG;
×
562
  }
563
  int32_t len = 0;
228✔
564
  int32_t code = 0;
228✔
565
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
228!
566
  if (code < 0) {
228!
567
    return TAOS_GET_TERRNO(code);
×
568
  }
569
  int32_t tlen = sizeof(SMqRspHead) + len;
228✔
570
  void*   buf = rpcMallocCont(tlen);
228✔
571
  if (buf == NULL) {
228!
572
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
573
  }
574

575
  int64_t sver = 0, ever = 0;
228✔
576
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
228✔
577
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
228✔
578

579
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
228✔
580

581
  SEncoder encoder = {0};
228✔
582
  tEncoderInit(&encoder, abuf, len);
228✔
583
  code = tEncodeMqMetaRsp(&encoder, pRsp);
228✔
584
  tEncoderClear(&encoder);
228✔
585
  if (code < 0) {
228!
586
    rpcFreeCont(buf);
×
587
    return TAOS_GET_TERRNO(code);
×
588
  }
589

590
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
228✔
591

592
  tmsgSendRsp(&resp);
228✔
593
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
228!
594
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
595

596
  return 0;
228✔
597
}
598

599
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
3,082,710✔
600
                        int32_t type, int64_t sver, int64_t ever) {
601
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
3,082,710!
602
    return TSDB_CODE_TMQ_INVALID_MSG;
×
603
  }
604
  int32_t len = 0;
3,082,712✔
605
  int32_t code = 0;
3,082,712✔
606

607
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
3,082,712✔
608
    pRsp->withSchema = 0;
19,246✔
609
  }
610
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
3,082,712✔
611
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
19,270✔
612
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
613
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
3,082,687!
614
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
25!
615
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
25!
616
  }
617

618
  if (code < 0) {
3,082,674!
619
    return TAOS_GET_TERRNO(code);
×
620
  }
621

622
  int32_t tlen = sizeof(SMqRspHead) + len;
3,082,674✔
623
  void*   buf = rpcMallocCont(tlen);
3,082,674✔
624
  if (buf == NULL) {
3,082,676!
625
    return terrno;
×
626
  }
627

628
  SMqRspHead* pHead = (SMqRspHead*)buf;
3,082,676✔
629
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
3,082,676✔
630

631
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
3,082,665✔
632

633
  SEncoder encoder = {0};
3,082,665✔
634
  tEncoderInit(&encoder, abuf, len);
3,082,665✔
635

636
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
3,082,663✔
637
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
19,271✔
638
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
639
    code = tEncodeMqDataRsp(&encoder, pRsp);
3,082,638✔
640
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
25!
641
    code = tEncodeSTaosxRsp(&encoder, pRsp);
25✔
642
  }
643
  tEncoderClear(&encoder);
3,082,667✔
644
  if (code < 0) {
3,082,659!
645
    rpcFreeCont(buf);
×
646
    return TAOS_GET_TERRNO(code);
×
647
  }
648
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
3,082,659✔
649

650
  tmsgSendRsp(&rsp);
3,082,659✔
651
  return 0;
3,082,652✔
652
}
653

654
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
12,435✔
655
  int32_t     code = 0;
12,435✔
656
  int32_t     line = 0;
12,435✔
657
  SDecoder*   pCoder = &(SDecoder){0};
12,435✔
658
  SDeleteRes* pRes = &(SDeleteRes){0};
12,435✔
659

660
  *pRefBlock = NULL;
12,435✔
661

662
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
12,435✔
663
  TSDB_CHECK_NULL(pRes->uidList, code, line, END, terrno)
12,436!
664

665
  tDecoderInit(pCoder, (uint8_t*)pData, len);
12,436✔
666
  code = tDecodeDeleteRes(pCoder, pRes);
12,434✔
667
  TSDB_CHECK_CODE(code, line, END);
12,423!
668

669
  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
12,423✔
670
  if (numOfTables == 0 || pRes->affectedRows == 0) {
12,422✔
671
    goto END;
10,001✔
672
  }
673

674
  SSDataBlock* pDelBlock = NULL;
2,421✔
675
  code = createSpecialDataBlock(blockType, &pDelBlock);
2,421✔
676
  TSDB_CHECK_CODE(code, line, END);
2,420!
677

678
  code = blockDataEnsureCapacity(pDelBlock, numOfTables);
2,420✔
679
  TSDB_CHECK_CODE(code, line, END);
2,421!
680

681
  pDelBlock->info.rows = numOfTables;
2,421✔
682
  pDelBlock->info.version = ver;
2,421✔
683

684
  for (int32_t i = 0; i < numOfTables; i++) {
4,849✔
685
    // start key column
686
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
2,428✔
687
    TSDB_CHECK_NULL(pStartCol, code, line, END, terrno)
2,428!
688
    code = colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
2,428✔
689
    TSDB_CHECK_CODE(code, line, END);
2,428!
690
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
2,428✔
691
    TSDB_CHECK_NULL(pEndCol, code, line, END, terrno)
2,428!
692
    code = colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
2,428✔
693
    TSDB_CHECK_CODE(code, line, END);
2,428!
694
    // uid column
695
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
2,428✔
696
    TSDB_CHECK_NULL(pUidCol, code, line, END, terrno)
2,428!
697

698
    int64_t* pUid = taosArrayGet(pRes->uidList, i);
2,428✔
699
    code = colDataSetVal(pUidCol, i, (const char*)pUid, false);
2,428✔
700
    TSDB_CHECK_CODE(code, line, END);
2,428!
701
    void* tmp = taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX);
2,428✔
702
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,428!
703
    colDataSetNULL(tmp, i);
2,428!
704
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
2,428✔
705
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,428!
706
    colDataSetNULL(tmp, i);
2,428!
707
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
2,428✔
708
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,428!
709
    colDataSetNULL(tmp, i);
2,428!
710
    tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
2,428✔
711
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,428!
712
    colDataSetNULL(tmp, i);
2,428!
713
  }
714

715
  if (type == 0) {
2,421✔
716
    code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
2,413✔
717
    if (code) {
2,413!
718
      blockDataCleanup(pDelBlock);
×
719
      taosMemoryFree(pDelBlock);
×
720
      return code;
×
721
    }
722

723
    ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
2,413✔
724
    ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
2,413✔
725
  } else if (type == 1) {
8!
726
    *pRefBlock = pDelBlock;
8✔
727
  } else {
728
    tqError("unknown type:%d", type);
×
729
    code = TSDB_CODE_TMQ_CONSUMER_ERROR;
×
730
  }
731

732
END:
12,422✔
733
  if (code != 0) {
12,422!
734
    tqError("failed to extract delete data block, line:%d code:%d", line, code);
×
735
  }
736
  tDecoderClear(pCoder);
12,422✔
737
  taosArrayDestroy(pRes->uidList);
12,422✔
738
  return code;
12,429✔
739
}
740

741
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
5,978✔
742
  SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
5,978✔
743
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,978✔
744
  int32_t      code = TSDB_CODE_SUCCESS;
5,981✔
745

746
  if (pDelay != NULL) {
5,981!
747
    *pDelay = 0;
5,981✔
748
  }
749

750
  *fhFinished = false;
5,981✔
751

752
  if (numOfTasks <= 0) {
5,981!
753
    return code;
×
754
  }
755

756
  // extract the required source task for a given stream, identified by streamId
757
  streamMetaRLock(pMeta);
5,981✔
758

759
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,982✔
760

761
  for (int32_t i = 0; i < numOfTasks; ++i) {
36,696✔
762
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
30,714✔
763
    if (pId == NULL) {
30,709!
764
      continue;
24,738✔
765
    }
766
    if (pId->streamId != streamId) {
30,709✔
767
      continue;
18,756✔
768
    }
769

770
    STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
11,953✔
771
    SStreamTask* pTask = NULL;
11,953✔
772

773
    code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
11,953✔
774
    if (code != 0) {
11,964!
775
      tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
×
776
      continue;
×
777
    }
778

779
    if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
11,964✔
780
      streamMetaReleaseTask(pMeta, pTask);
5,982✔
781
      continue;
5,982✔
782
    }
783

784
    // here we get the required stream source task
785
    *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
5,982✔
786

787
    int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
5,982✔
788
    if (ver == -1) {
5,982✔
789
      ver = pTask->chkInfo.processedVer;
1,290✔
790
    } else {
791
      ver--;
4,692✔
792
    }
793

794
    SVersionRange verRange = {0};
5,982✔
795
    walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
5,982✔
796

797
    SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
5,982✔
798
    if (pReader == NULL) {
5,982!
799
      tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
×
800
      streamMetaReleaseTask(pMeta, pTask);
×
801
      continue;
×
802
    }
803

804
    int64_t cur = 0;
5,982✔
805
    int64_t latest = 0;
5,982✔
806

807
    code = walFetchHead(pReader, ver);
5,982✔
808
    if (code == TSDB_CODE_SUCCESS) {
5,982!
809
      cur = pReader->pHead->head.ingestTs;
5,982✔
810
    }
811

812
    if (ver == verRange.maxVer) {
5,982✔
813
      latest = cur;
2,669✔
814
    } else {
815
      code = walFetchHead(pReader, verRange.maxVer);
3,313✔
816
      if (code == TSDB_CODE_SUCCESS) {
3,313!
817
        latest = pReader->pHead->head.ingestTs;
3,313✔
818
      }
819
    }
820

821
    if (pDelay != NULL) {  // delay in ms
5,982!
822
      *pDelay = (latest - cur) / 1000;
5,982✔
823
    }
824

825
    walCloseReader(pReader);
5,982✔
826
    streamMetaReleaseTask(pMeta, pTask);
5,982✔
827
  }
828

829
  streamMetaRUnLock(pMeta);
5,982✔
830

831
  return TSDB_CODE_SUCCESS;
5,982✔
832
}
833

834
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
185✔
835
  int32_t          code = 0;
185✔
836
  int32_t          lino = 0;
185✔
837
  SDecoder         dc = {0};
185✔
838
  SVDropTbBatchReq batchReq = {0};
185✔
839
  tDecoderInit(&dc, (uint8_t*)data, len);
185✔
840
  code = tDecodeSVDropTbBatchReq(&dc, &batchReq);
185✔
841
  TSDB_CHECK_CODE(code, lino, _exit);
185!
842
  if (batchReq.nReqs <= 0) goto _exit;
185!
843

844
  SSDataBlock* pBlock = NULL;
185✔
845
  code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock);
185✔
846
  TSDB_CHECK_CODE(code, lino, _exit);
185!
847

848
  code = blockDataEnsureCapacity(pBlock, batchReq.nReqs);
185✔
849
  TSDB_CHECK_CODE(code, lino, _exit);
185!
850

851
  pBlock->info.rows = batchReq.nReqs;
185✔
852
  pBlock->info.version = ver;
185✔
853
  for (int32_t i = 0; i < batchReq.nReqs; ++i) {
375✔
854
    SVDropTbReq* pReq = batchReq.pReqs + i;
190✔
855
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
190✔
856
    TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
190!
857
    code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
190✔
858
    TSDB_CHECK_CODE(code, lino, _exit);
190!
859
  }
860

861
  code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
185✔
862
  TSDB_CHECK_CODE(code, lino, _exit);
185!
863
  ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
185✔
864
  ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock;
185✔
865

866
_exit:
185✔
867
  tDecoderClear(&dc);
185✔
868
  if (TSDB_CODE_SUCCESS != code) {
185!
869
    tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code));
×
870
    blockDataCleanup(pBlock);
×
871
    taosMemoryFree(pBlock);
×
872
  }
873
  return code;
185✔
874
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc