• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3632

08 Mar 2025 06:17AM UTC coverage: 60.719% (+0.05%) from 60.671%
#3632

push

travis-ci

web-flow
Merge pull request #29999 from taosdata/enh/TS-5089

feat: taosBenchmark supports exporting to CSV files

141890 of 300701 branches covered (47.19%)

Branch coverage included in aggregate %.

599 of 766 new or added lines in 3 files covered. (78.2%)

1025 existing lines in 124 files now uncovered.

223757 of 301490 relevant lines covered (74.22%)

17284906.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.66
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
293,352✔
24
  int32_t    code = TDB_CODE_SUCCESS;
293,352✔
25
  int32_t    lino = 0;
293,352✔
26
  tqDebug("%s called", __FUNCTION__ );
293,352!
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
293,353!
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
293,353✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
293,353!
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
293,353✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
293,352!
34

35
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
293,352✔
36
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
293,352✔
37
  pRsp->withTbName = 0;
293,352✔
38
  pRsp->withSchema = false;
293,352✔
39

40
END:
293,352✔
41
  if (code != 0){
293,352!
42
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
43
  }
44
  return code;
293,352✔
45
}
46

47
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
13,984✔
48
  SSyncState state = syncGetState(pTq->pVnode->sync);
13,984✔
49
  streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
14,005✔
50

51
  if (isLeader) {
14,005✔
52
    tqScanWalAsync(pTq);
9,402✔
53
  }
54
}
14,005✔
55

56
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
2,721,670✔
57
  int32_t    code = TDB_CODE_SUCCESS;
2,721,670✔
58
  int32_t    lino = 0;
2,721,670✔
59
  tqDebug("%s called", __FUNCTION__ );
2,721,670!
60
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
2,721,671!
61
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
2,721,671✔
62
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
2,721,668✔
63

64
  pRsp->withTbName = 1;
2,721,666✔
65
  pRsp->withSchema = 1;
2,721,666✔
66
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
2,721,666✔
67
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
2,721,664!
68

69
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
2,721,664✔
70
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
2,721,667!
71

72
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
2,721,667✔
73
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
2,721,664!
74

75
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
2,721,664✔
76
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
2,721,668!
77

78

79
END:
2,721,668✔
80
  if (code != 0){
2,721,668!
81
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
82
    taosArrayDestroy(pRsp->blockData);
×
83
    taosArrayDestroy(pRsp->blockDataLen);
×
84
    taosArrayDestroy(pRsp->blockTbName);
×
85
    taosArrayDestroy(pRsp->blockSchema);
×
86
  }
87
  return code;
2,721,666✔
88
}
89

90
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
343,135✔
91
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
92
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
343,135!
93
    return TSDB_CODE_INVALID_PARA;
×
94
  }
95
  uint64_t   consumerId = pRequest->consumerId;
343,135✔
96
  STqOffset* pOffset = NULL;
343,135✔
97
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
343,135✔
98
  int32_t    vgId = TD_VID(pTq->pVnode);
343,126✔
99

100
  *pBlockReturned = false;
343,126✔
101
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
102
  if (code == 0) {
343,126✔
103
    tOffsetCopy(pOffsetVal, &pOffset->val);
204✔
104

105
    char formatBuf[TSDB_OFFSET_LEN] = {0};
204✔
106
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
204✔
107
    tqDebug("tmq poll: consumer:0x%" PRIx64
204!
108
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
109
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
110
    return 0;
204✔
111
  } else {
112
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
113
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
342,922✔
114
      if (pRequest->useSnapshot) {
194,186✔
115
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
193,314!
116
                consumerId, pHandle->subKey, vgId);
117
        if (pHandle->fetchMeta) {
193,314✔
118
          tqOffsetResetToMeta(pOffsetVal, 0);
119
        } else {
120
          SValue val = {0};
193,298!
121
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
193,298✔
122
        }
123
      } else {
124
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
872✔
125
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
873✔
126
      }
127
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
148,736✔
128
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
54✔
129
      SMqDataRsp dataRsp = {0};
54✔
130
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
54✔
131

132
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
54✔
133
      if (code != 0) {
54!
134
        return code;
×
135
      }
136
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
54!
137
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
138
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
54!
139
      tDeleteMqDataRsp(&dataRsp);
54✔
140

141
      *pBlockReturned = true;
54✔
142
      return code;
54✔
143
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
148,682✔
144
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
148,680!
145
                  " in vg %d, subkey %s, reset none failed",
146
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
147
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
148,690✔
148
    }
149
  }
150

151
  return 0;
194,189✔
152
}
153

154
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
292,532✔
155
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
156
  int32_t    code = TDB_CODE_SUCCESS;
292,532✔
157
  int32_t    lino = 0;
292,532✔
158
  tqDebug("%s called", __FUNCTION__ );
292,532!
159
  uint64_t consumerId = pRequest->consumerId;
292,531✔
160
  int32_t  vgId = TD_VID(pTq->pVnode);
292,531✔
161
  terrno = 0;
292,531✔
162

163
  SMqDataRsp dataRsp = {0};
292,531✔
164
  code = tqInitDataRsp(&dataRsp, *pOffset);
292,531✔
165
  TSDB_CHECK_CODE(code, lino, end);
292,531!
166

167
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
292,531✔
168
  TSDB_CHECK_CODE(code, lino, end);
292,531!
169

170
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
292,531✔
171
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
292,531✔
172
    goto end;
193,121✔
173
  }
174

175
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
176
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
99,410✔
177
    // lock
178
    taosWLockLatch(&pTq->lock);
31,708✔
179
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
31,708✔
180
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
31,708✔
181
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
28,957✔
182
      taosWUnLockLatch(&pTq->lock);
28,957✔
183
      goto end;
28,957✔
184
    }
185
    taosWUnLockLatch(&pTq->lock);
2,751✔
186
  }
187

188
  // reqOffset represents the current date offset, may be changed if wal not exists
189
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
70,453✔
190
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
70,453✔
191

192
end:
292,530✔
193
  {
194
    char buf[TSDB_OFFSET_LEN] = {0};
292,530✔
195
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
292,530✔
196
    if (code != 0){
292,529✔
197
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
193,119!
198
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
199
    } else {
200
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
99,410!
201
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
202
    }
203

204
    tDeleteMqDataRsp(&dataRsp);
292,532✔
205
    return code;
292,527✔
206
  }
207
}
208

209
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
210
  SDecoder decoder = {0};                                                                                  \
211
  TYPE     req = {0};                                                                                      \
212
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
213
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
214
  tDecoderInit(&decoder, data, len);                                                                       \
215
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
216
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
217
            " msgType %d",                                                                                 \
218
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
219
    fetchVer++;                                                                                            \
220
    DELETE_FUNC(&req);                                                                                     \
221
    tDecoderClear(&decoder);                                                                               \
222
    continue;                                                                                              \
223
  }                                                                                                        \
224
  DELETE_FUNC(&req);                                                                                       \
225
  tDecoderClear(&decoder);
226

227
static void tDeleteCommon(void* parm) {}
404✔
228

229
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
230
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
231
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
232

233
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
3✔
234
  int32_t         code = 0;
3✔
235

236
  if (!btMetaRsp->batchMetaReq) {
3!
237
    btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
3✔
238
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
3!
239
    btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
3✔
240
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
3!
241
  }
242

243
  SMqMetaRsp tmpMetaRsp = {0};
3✔
244
  tmpMetaRsp.resMsgType = type;
3✔
245
  tmpMetaRsp.metaRspLen = bodyLen;
3✔
246
  tmpMetaRsp.metaRsp = body;
3✔
247
  uint32_t len = 0;
3✔
248
  tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
3!
249
  if (TSDB_CODE_SUCCESS != code) {
3!
250
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
251
    goto END;
×
252
  }
253
  int32_t tLen = sizeof(SMqRspHead) + len;
3✔
254
  void*   tBuf = taosMemoryCalloc(1, tLen);
3!
255
  TQ_NULL_GO_TO_END(tBuf);
3!
256
  void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
3✔
257
  SEncoder encoder = {0};
3✔
258
  tEncoderInit(&encoder, metaBuff, len);
3✔
259
  code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
3✔
260
  tEncoderClear(&encoder);
3✔
261

262
  if (code < 0) {
3!
263
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
264
    goto END;
×
265
  }
266
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
6!
267
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
6!
268

269
END:
3✔
270
  return code;
3✔
271
}
272

273
static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
1✔
274
  int32_t code = 0;
1✔
275
  SVCreateTbBatchReq pReq = {0};
1✔
276
  pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
1✔
277
  pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
1✔
278
  TQ_NULL_GO_TO_END(pReq.pArray);
1!
279
  for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
5✔
280
    void   *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
4✔
281
    TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
8!
282
  }
283
  tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
1!
284
  if (code < 0) {
1!
285
    goto END;
×
286
  }
287
  *len += sizeof(SMsgHead);
1✔
288
  *pBuf = taosMemoryMalloc(*len);
1!
289
  TQ_NULL_GO_TO_END(pBuf);
1!
290
  SEncoder coder = {0};
1✔
291
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
1✔
292
  code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
1✔
293
  tEncoderClear(&coder);
1✔
294

295
END:
1✔
296
  taosArrayDestroy(pReq.pArray);
1✔
297
  return code;
1✔
298
}
299

300
#define SEND_BATCH_META_RSP \
301
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
302
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
303
goto END;
304

305
#define SEND_DATA_RSP \
306
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
307
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
308
goto END;
309
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
2,721,668✔
310
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
311
  int32_t         vgId = TD_VID(pTq->pVnode);
2,721,668✔
312
  SMqDataRsp      taosxRsp = {0};
2,721,668✔
313
  SMqBatchMetaRsp btMetaRsp = {0};
2,721,668✔
314
  int32_t         code = 0;
2,721,668✔
315

316
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
2,721,668!
317
  if (offset->type != TMQ_OFFSET__LOG) {
2,721,664✔
318
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
151!
319

320
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
151✔
321
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
14✔
322
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
14!
323
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
324
      goto END;
14✔
325
    }
326

327
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
137!
328
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
329
    if (taosxRsp.blockNum > 0) {
137✔
330
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
131✔
331
      goto END;
131✔
332
    } else {
333
      tOffsetCopy(offset, &taosxRsp.rspOffset);
6✔
334
    }
335
  }
336

337
  if (offset->type == TMQ_OFFSET__LOG) {
2,721,517!
338
    walReaderVerifyOffset(pHandle->pWalReader, offset);
2,721,517✔
339
    int64_t fetchVer = offset->version;
2,721,515✔
340

341
    uint64_t st = taosGetTimestampMs();
2,721,509✔
342
    int      totalRows = 0;
2,721,509✔
343
    int32_t  totalMetaRows = 0;
2,721,509✔
344
    while (1) {
121,778✔
345
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
2,843,287✔
346
      if (savedEpoch > pRequest->epoch) {
2,843,238!
347
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
348
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
349
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
350
        goto END;
2,721,447✔
351
      }
352

353
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
2,843,238✔
354
        if (totalMetaRows > 0) {
2,720,815✔
355
          SEND_BATCH_META_RSP
2✔
356
        }
357
        SEND_DATA_RSP
2,720,813✔
358
      }
359

360
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
122,522✔
361
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
122,522!
362
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
363

364
      // process meta
365
      if (pHead->msgType != TDMT_VND_SUBMIT) {
122,531✔
366
        if (totalRows > 0) {
666✔
367
          SEND_DATA_RSP
35!
368
        }
369

370
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
631✔
371
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
628✔
372
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
186!
373
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
442✔
374
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
55!
375
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
493✔
376
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
342!
377
          } else if (pHead->msgType == TDMT_VND_DELETE) {
45✔
378
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
7!
379
          }
380
        }
381

382
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
229!
383
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
384
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
229!
385
          SMqMetaRsp metaRsp = {0};
227✔
386
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
227✔
387
          metaRsp.resMsgType = pHead->msgType;
227✔
388
          metaRsp.metaRspLen = pHead->bodyLen;
227✔
389
          metaRsp.metaRsp = pHead->body;
227✔
390
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
227✔
391
          goto END;
227✔
392
        }
393
        code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
2✔
394
        fetchVer++;
2✔
395
        if (code != 0){
2!
396
          goto END;
×
397
        }
398
        totalMetaRows++;
2✔
399
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
4!
400
          SEND_BATCH_META_RSP
×
401
        }
402
        continue;
2✔
403
      }
404

405
      if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
121,865!
406
        SEND_BATCH_META_RSP
×
407
      }
408

409
      // process data
410
      SPackedData submit = {
121,865✔
411
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
121,865✔
412
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
121,865✔
413
          .ver = pHead->version,
121,865✔
414
      };
415

416
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
121,865!
417

418
      if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
121,816✔
419
        int32_t len = 0;
1✔
420
        void *pBuf = NULL;
1✔
421
        code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
1✔
422
        if (code == 0){
1!
423
          code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
1✔
424
        }
425
        taosMemoryFree(pBuf);
1!
426
        taosArrayDestroyP(taosxRsp.createTableReq, NULL);
1✔
427
        taosxRsp.createTableReq = NULL;
1✔
428
        fetchVer++;
1✔
429
        if (code != 0){
1!
430
          goto END;
1✔
431
        }
432
        totalMetaRows++;
1✔
433
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) ||
1!
434
            (taosGetTimestampMs() - st > pRequest->timeout) ||
1!
435
            (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
1!
436
          SEND_BATCH_META_RSP
1✔
437
        }
438
        continue;
×
439
      }
440

441
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
121,815✔
442
          (taosGetTimestampMs() - st > pRequest->timeout) ||
121,368!
443
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
121,377!
444
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
25✔
445
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
424✔
446
          terrno = 0;
6✔
447
        } else{
448
          fetchVer++;
439✔
449
        }
450
        SEND_DATA_RSP
445✔
451
      } else {
452
        fetchVer++;
121,374✔
453
      }
454
    }
455
  }
456

457
END:
×
458
  if (code != 0){
2,721,592!
459
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
460
            pRequest->subKey);
461
  }
462
  tDeleteMqBatchMetaRsp(&btMetaRsp);
2,721,592✔
463
  tDeleteSTaosxRsp(&taosxRsp);
2,721,531✔
464
  return code;
2,721,599✔
465
}
466

467
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
3,162,941✔
468
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
3,162,941!
469
    return TSDB_CODE_TMQ_INVALID_MSG;
×
470
  }
471
  int32_t      code = 0;
3,162,945✔
472
  STqOffsetVal reqOffset = {0};
3,162,945✔
473
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
3,162,945✔
474

475
  // reset the offset if needed
476
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
3,162,941✔
477
    bool blockReturned = false;
343,135✔
478
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
343,135✔
479
    if (code != 0) {
343,135✔
480
      goto END;
148,744✔
481
    }
482

483
    // empty block returned, quit
484
    if (blockReturned) {
194,445✔
485
      goto END;
54✔
486
    }
487
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
2,819,806!
488
    uError("req offset type is 0");
×
489
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
490
    goto END;
×
491
  }
492

493
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
3,014,197✔
494
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
292,532✔
495
  } else {
496
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
2,721,665✔
497
  }
498

499
END:
3,162,825✔
500
  if (code != 0){
3,162,825✔
501
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
341,805!
502
  }
503
  tOffsetDestroy(&reqOffset);
3,162,830✔
504
  return code;
3,162,829✔
505
}
506

507
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
2,792,914✔
508
                          int64_t ever) {
509
  if (pMsgHead == NULL) {
2,792,914!
510
    return;
×
511
  }
512
  pMsgHead->consumerId = consumerId;
2,792,914✔
513
  pMsgHead->epoch = epoch;
2,792,914✔
514
  pMsgHead->mqMsgType = type;
2,792,914✔
515
  pMsgHead->walsver = sver;
2,792,914✔
516
  pMsgHead->walever = ever;
2,792,914✔
517
}
518

519
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
17✔
520
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
521
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
17!
522
    return TSDB_CODE_TMQ_INVALID_MSG;
×
523
  }
524
  int32_t len = 0;
17✔
525
  int32_t code = 0;
17✔
526
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
17!
527
  if (code < 0) {
17!
528
    return TAOS_GET_TERRNO(code);
×
529
  }
530
  int32_t tlen = sizeof(SMqRspHead) + len;
17✔
531
  void*   buf = rpcMallocCont(tlen);
17✔
532
  if (buf == NULL) {
17!
533
    return TAOS_GET_TERRNO(terrno);
×
534
  }
535

536
  int64_t sver = 0, ever = 0;
17✔
537
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
17✔
538
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
17✔
539

540
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
17✔
541

542
  SEncoder encoder = {0};
17✔
543
  tEncoderInit(&encoder, abuf, len);
17✔
544
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
17✔
545
  tEncoderClear(&encoder);
17✔
546
  if (code < 0) {
17!
547
    rpcFreeCont(buf);
×
548
    return TAOS_GET_TERRNO(code);
×
549
  }
550
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
17✔
551

552
  tmsgSendRsp(&resp);
17✔
553
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
17!
554
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
555

556
  return 0;
17✔
557
}
558

559
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
227✔
560
                          int32_t vgId) {
561
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
227!
562
    return TSDB_CODE_TMQ_INVALID_MSG;
×
563
  }
564
  int32_t len = 0;
227✔
565
  int32_t code = 0;
227✔
566
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
227!
567
  if (code < 0) {
227!
568
    return TAOS_GET_TERRNO(code);
×
569
  }
570
  int32_t tlen = sizeof(SMqRspHead) + len;
227✔
571
  void*   buf = rpcMallocCont(tlen);
227✔
572
  if (buf == NULL) {
227!
573
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
574
  }
575

576
  int64_t sver = 0, ever = 0;
227✔
577
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
227✔
578
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
227✔
579

580
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
227✔
581

582
  SEncoder encoder = {0};
227✔
583
  tEncoderInit(&encoder, abuf, len);
227✔
584
  code = tEncodeMqMetaRsp(&encoder, pRsp);
227✔
585
  tEncoderClear(&encoder);
227✔
586
  if (code < 0) {
227!
587
    rpcFreeCont(buf);
×
588
    return TAOS_GET_TERRNO(code);
×
589
  }
590

591
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
227✔
592

593
  tmsgSendRsp(&resp);
227✔
594
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
227!
595
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
596

597
  return 0;
227✔
598
}
599

600
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
2,792,699✔
601
                        int32_t type, int64_t sver, int64_t ever) {
602
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
2,792,699!
603
    return TSDB_CODE_TMQ_INVALID_MSG;
×
604
  }
605
  int32_t len = 0;
2,792,701✔
606
  int32_t code = 0;
2,792,701✔
607

608
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
2,792,701✔
609
    pRsp->withSchema = 0;
19,180✔
610
  }
611
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
2,792,701✔
612
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
19,204✔
613
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
614
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
2,792,677!
615
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
24!
616
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
24!
617
  }
618

619
  if (code < 0) {
2,792,675!
620
    return TAOS_GET_TERRNO(code);
×
621
  }
622

623
  int32_t tlen = sizeof(SMqRspHead) + len;
2,792,675✔
624
  void*   buf = rpcMallocCont(tlen);
2,792,675✔
625
  if (buf == NULL) {
2,792,670!
626
    return terrno;
×
627
  }
628

629
  SMqRspHead* pHead = (SMqRspHead*)buf;
2,792,670✔
630
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
2,792,670✔
631

632
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
2,792,664✔
633

634
  SEncoder encoder = {0};
2,792,664✔
635
  tEncoderInit(&encoder, abuf, len);
2,792,664✔
636

637
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
2,792,654✔
638
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
19,204✔
639
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
640
    code = tEncodeMqDataRsp(&encoder, pRsp);
2,792,630✔
641
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
24!
642
    code = tEncodeSTaosxRsp(&encoder, pRsp);
24✔
643
  }
644
  tEncoderClear(&encoder);
2,792,669✔
645
  if (code < 0) {
2,792,664!
646
    rpcFreeCont(buf);
×
647
    return TAOS_GET_TERRNO(code);
×
648
  }
649
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
2,792,664✔
650

651
  tmsgSendRsp(&rsp);
2,792,664✔
652
  return 0;
2,792,631✔
653
}
654

655
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
11,359✔
656
  int32_t     code = 0;
11,359✔
657
  int32_t     line = 0;
11,359✔
658
  SDecoder*   pCoder = &(SDecoder){0};
11,359✔
659
  SDeleteRes* pRes = &(SDeleteRes){0};
11,359✔
660

661
  *pRefBlock = NULL;
11,359✔
662

663
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
11,359✔
664
  TSDB_CHECK_NULL(pRes->uidList, code, line, END, terrno)
11,357!
665

666
  tDecoderInit(pCoder, (uint8_t*)pData, len);
11,357✔
667
  code = tDecodeDeleteRes(pCoder, pRes);
11,355✔
668
  TSDB_CHECK_CODE(code, line, END);
11,319!
669

670
  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
11,319✔
671
  if (numOfTables == 0 || pRes->affectedRows == 0) {
11,317✔
672
    goto END;
8,915✔
673
  }
674

675
  SSDataBlock* pDelBlock = NULL;
2,402✔
676
  code = createSpecialDataBlock(blockType, &pDelBlock);
2,402✔
677
  TSDB_CHECK_CODE(code, line, END);
2,402!
678

679
  code = blockDataEnsureCapacity(pDelBlock, numOfTables);
2,402✔
680
  TSDB_CHECK_CODE(code, line, END);
2,402!
681

682
  pDelBlock->info.rows = numOfTables;
2,402✔
683
  pDelBlock->info.version = ver;
2,402✔
684

685
  for (int32_t i = 0; i < numOfTables; i++) {
4,811✔
686
    // start key column
687
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
2,409✔
688
    TSDB_CHECK_NULL(pStartCol, code, line, END, terrno)
2,409!
689
    code = colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
2,409✔
690
    TSDB_CHECK_CODE(code, line, END);
2,408!
691
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
2,408✔
692
    TSDB_CHECK_NULL(pEndCol, code, line, END, terrno)
2,408!
693
    code = colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
2,408✔
694
    TSDB_CHECK_CODE(code, line, END);
2,409!
695
    // uid column
696
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
2,409✔
697
    TSDB_CHECK_NULL(pUidCol, code, line, END, terrno)
2,409!
698

699
    int64_t* pUid = taosArrayGet(pRes->uidList, i);
2,409✔
700
    code = colDataSetVal(pUidCol, i, (const char*)pUid, false);
2,409✔
701
    TSDB_CHECK_CODE(code, line, END);
2,409!
702
    void* tmp = taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX);
2,409✔
703
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,409!
704
    colDataSetNULL(tmp, i);
2,409!
705
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
2,409✔
706
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,409!
707
    colDataSetNULL(tmp, i);
2,409!
708
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
2,409✔
709
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,409!
710
    colDataSetNULL(tmp, i);
2,409!
711
    tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
2,409✔
712
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,409!
713
    colDataSetNULL(tmp, i);
2,409!
714
  }
715

716
  if (type == 0) {
2,402✔
717
    code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
2,394✔
718
    if (code) {
2,394!
UNCOV
719
      blockDataCleanup(pDelBlock);
×
720
      taosMemoryFree(pDelBlock);
×
721
      return code;
×
722
    }
723

724
    ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
2,394✔
725
    ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
2,394✔
726
  } else if (type == 1) {
8!
727
    *pRefBlock = pDelBlock;
8✔
728
  } else {
729
    tqError("unknown type:%d", type);
×
730
    code = TSDB_CODE_TMQ_CONSUMER_ERROR;
×
731
  }
732

733
END:
11,317✔
734
  if (code != 0) {
11,317!
735
    tqError("failed to extract delete data block, line:%d code:%d", line, code);
×
736
  }
737
  tDecoderClear(pCoder);
11,317✔
738
  taosArrayDestroy(pRes->uidList);
11,318✔
739
  return code;
11,368✔
740
}
741

742
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
5,983✔
743
  SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
5,983✔
744
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,983✔
745
  int32_t      code = TSDB_CODE_SUCCESS;
5,981✔
746

747
  if (pDelay != NULL) {
5,981!
748
    *pDelay = 0;
5,981✔
749
  }
750

751
  *fhFinished = false;
5,981✔
752

753
  if (numOfTasks <= 0) {
5,981!
754
    return code;
×
755
  }
756

757
  // extract the required source task for a given stream, identified by streamId
758
  streamMetaRLock(pMeta);
5,981✔
759

760
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,983✔
761

762
  for (int32_t i = 0; i < numOfTasks; ++i) {
36,447✔
763
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
30,463✔
764
    if (pId == NULL) {
30,460!
765
      continue;
24,485✔
766
    }
767
    if (pId->streamId != streamId) {
30,460✔
768
      continue;
18,501✔
769
    }
770

771
    STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
11,959✔
772
    SStreamTask* pTask = NULL;
11,959✔
773

774
    code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
11,959✔
775
    if (code != 0) {
11,968!
776
      tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
×
777
      continue;
×
778
    }
779

780
    if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
11,968✔
781
      streamMetaReleaseTask(pMeta, pTask);
5,984✔
782
      continue;
5,984✔
783
    }
784

785
    // here we get the required stream source task
786
    *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
5,984✔
787

788
    int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
5,984✔
789
    if (ver == -1) {
5,984✔
790
      ver = pTask->chkInfo.processedVer;
1,276✔
791
    } else {
792
      ver--;
4,708✔
793
    }
794

795
    SVersionRange verRange = {0};
5,984✔
796
    walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
5,984✔
797

798
    SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
5,984✔
799
    if (pReader == NULL) {
5,984!
800
      tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
×
801
      streamMetaReleaseTask(pMeta, pTask);
×
802
      continue;
×
803
    }
804

805
    int64_t cur = 0;
5,984✔
806
    int64_t latest = 0;
5,984✔
807

808
    code = walFetchHead(pReader, ver);
5,984✔
809
    if (code == TSDB_CODE_SUCCESS) {
5,982!
810
      cur = pReader->pHead->head.ingestTs;
5,983✔
811
    }
812

813
    if (ver == verRange.maxVer) {
5,982✔
814
      latest = cur;
2,573✔
815
    } else {
816
      code = walFetchHead(pReader, verRange.maxVer);
3,409✔
817
      if (code == TSDB_CODE_SUCCESS) {
3,410!
818
        latest = pReader->pHead->head.ingestTs;
3,410✔
819
      }
820
    }
821

822
    if (pDelay != NULL) {  // delay in ms
5,983!
823
      *pDelay = (latest - cur) / 1000;
5,983✔
824
    }
825

826
    walCloseReader(pReader);
5,983✔
827
    streamMetaReleaseTask(pMeta, pTask);
5,982✔
828
  }
829

830
  streamMetaRUnLock(pMeta);
5,984✔
831

832
  return TSDB_CODE_SUCCESS;
5,984✔
833
}
834

835
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
185✔
836
  int32_t          code = 0;
185✔
837
  int32_t          lino = 0;
185✔
838
  SDecoder         dc = {0};
185✔
839
  SVDropTbBatchReq batchReq = {0};
185✔
840
  tDecoderInit(&dc, (uint8_t*)data, len);
185✔
841
  code = tDecodeSVDropTbBatchReq(&dc, &batchReq);
185✔
842
  TSDB_CHECK_CODE(code, lino, _exit);
185!
843
  if (batchReq.nReqs <= 0) goto _exit;
185!
844

845
  SSDataBlock* pBlock = NULL;
185✔
846
  code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock);
185✔
847
  TSDB_CHECK_CODE(code, lino, _exit);
185!
848

849
  code = blockDataEnsureCapacity(pBlock, batchReq.nReqs);
185✔
850
  TSDB_CHECK_CODE(code, lino, _exit);
185!
851

852
  pBlock->info.rows = batchReq.nReqs;
185✔
853
  pBlock->info.version = ver;
185✔
854
  for (int32_t i = 0; i < batchReq.nReqs; ++i) {
375✔
855
    SVDropTbReq* pReq = batchReq.pReqs + i;
190✔
856
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
190✔
857
    TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
190!
858
    code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
190✔
859
    TSDB_CHECK_CODE(code, lino, _exit);
190!
860
  }
861

862
  code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
185✔
863
  TSDB_CHECK_CODE(code, lino, _exit);
185!
864
  ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
185✔
865
  ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock;
185✔
866

867
_exit:
185✔
868
  tDecoderClear(&dc);
185✔
869
  if (TSDB_CODE_SUCCESS != code) {
185!
870
    tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code));
×
871
    blockDataCleanup(pBlock);
×
872
    taosMemoryFree(pBlock);
×
873
  }
874
  return code;
185✔
875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc