• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.6
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
253,872✔
24
  int32_t    code = TDB_CODE_SUCCESS;
253,872✔
25
  int32_t    lino = 0;
253,872✔
26
  tqDebug("%s called", __FUNCTION__ );
253,872✔
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
253,872!
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
253,872✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
253,870!
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
253,870✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
253,872!
34

35
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
253,872✔
36
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
253,871✔
37
  pRsp->withTbName = 0;
253,871✔
38
  pRsp->withSchema = false;
253,871✔
39

40
END:
253,871✔
41
  if (code != 0){
253,871!
42
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
43
  }
44
  return code;
253,871✔
45
}
46

47
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
14,563✔
48
  SSyncState state = syncGetState(pTq->pVnode->sync);
14,563✔
49
  streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
14,588✔
50

51
  if (isLeader) {
14,589✔
52
    tqScanWalAsync(pTq);
9,990✔
53
  }
54
}
14,577✔
55

56
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
2,774,303✔
57
  int32_t    code = TDB_CODE_SUCCESS;
2,774,303✔
58
  int32_t    lino = 0;
2,774,303✔
59
  tqDebug("%s called", __FUNCTION__ );
2,774,303!
60
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
2,774,302!
61
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
2,774,302✔
62
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
2,774,301✔
63

64
  pRsp->withTbName = 1;
2,774,293✔
65
  pRsp->withSchema = 1;
2,774,293✔
66
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
2,774,293✔
67
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
2,774,299!
68

69
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
2,774,299✔
70
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
2,774,296!
71

72
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
2,774,296✔
73
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
2,774,299!
74

75
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
2,774,299✔
76
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
2,774,300!
77

78
END:
2,774,300✔
79
  if (code != 0){
2,774,300!
UNCOV
80
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
81
    taosArrayDestroy(pRsp->blockData);
×
82
    taosArrayDestroy(pRsp->blockDataLen);
×
83
    taosArrayDestroy(pRsp->blockTbName);
×
84
    taosArrayDestroy(pRsp->blockSchema);
×
85
  }
86
  return code;
2,774,295✔
87
}
88

89
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
305,736✔
90
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
91
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
305,736!
UNCOV
92
    return TSDB_CODE_INVALID_PARA;
×
93
  }
94
  uint64_t   consumerId = pRequest->consumerId;
305,736✔
95
  STqOffset* pOffset = NULL;
305,736✔
96
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
305,736✔
97
  int32_t    vgId = TD_VID(pTq->pVnode);
305,727✔
98

99
  *pBlockReturned = false;
305,727✔
100
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
101
  if (code == 0) {
305,727✔
102
    tOffsetCopy(pOffsetVal, &pOffset->val);
199✔
103

104
    char formatBuf[TSDB_OFFSET_LEN] = {0};
198✔
105
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
198✔
106
    tqDebug("tmq poll: consumer:0x%" PRIx64
198!
107
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
108
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
109
    return 0;
198✔
110
  } else {
111
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
112
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
305,528✔
113
      if (pRequest->useSnapshot) {
155,987✔
114
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
155,113!
115
                consumerId, pHandle->subKey, vgId);
116
        if (pHandle->fetchMeta) {
155,114✔
117
          tqOffsetResetToMeta(pOffsetVal, 0);
118
        } else {
119
          SValue val = {0};
155,098✔
120
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
155,097✔
121
        }
122
      } else {
123
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
874✔
124
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
874✔
125
      }
126
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
149,541✔
127
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
66✔
128
      SMqDataRsp dataRsp = {0};
66✔
129
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
66✔
130

131
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
66✔
132
      if (code != 0) {
66!
UNCOV
133
        return code;
×
134
      }
135
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
66✔
136
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
137
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
66!
138
      tDeleteMqDataRsp(&dataRsp);
66✔
139

140
      *pBlockReturned = true;
66✔
141
      return code;
66✔
142
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
149,475!
143
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
149,476!
144
                  " in vg %d, subkey %s, reset none failed",
145
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
146
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
149,483✔
147
    }
148
  }
149

150
  return 0;
155,986✔
151
}
152

153
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
252,994✔
154
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
155
  int32_t    code = TDB_CODE_SUCCESS;
252,994✔
156
  int32_t    lino = 0;
252,994✔
157
  tqDebug("%s called", __FUNCTION__ );
252,994✔
158
  uint64_t consumerId = pRequest->consumerId;
252,998✔
159
  int32_t  vgId = TD_VID(pTq->pVnode);
252,998✔
160
  terrno = 0;
252,998✔
161

162
  SMqDataRsp dataRsp = {0};
252,998✔
163
  code = tqInitDataRsp(&dataRsp, *pOffset);
252,998✔
164
  TSDB_CHECK_CODE(code, lino, end);
252,997!
165

166
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
252,997✔
167
  TSDB_CHECK_CODE(code, lino, end);
252,998!
168

169
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
252,998✔
170
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
252,998✔
171
    goto end;
154,921✔
172
  }
173

174
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
175
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
98,077✔
176
    // lock
177
    taosWLockLatch(&pTq->lock);
31,257✔
178
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
31,258✔
179
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
31,258✔
180
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
29,936✔
181
      taosWUnLockLatch(&pTq->lock);
29,936✔
182
      goto end;
29,936✔
183
    }
184
    taosWUnLockLatch(&pTq->lock);
1,322✔
185
  }
186

187
  // reqOffset represents the current date offset, may be changed if wal not exists
188
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
68,141✔
189
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
68,141✔
190

191
end:
252,998✔
192
  {
193
    char buf[TSDB_OFFSET_LEN] = {0};
252,998✔
194
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
252,998✔
195
    if (code != 0){
252,994✔
196
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
154,918!
197
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
198
    } else {
199
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
98,076✔
200
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
201
    }
202

203
    tDeleteMqDataRsp(&dataRsp);
252,998✔
204
    return code;
252,994✔
205
  }
206
}
207

208
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
209
  SDecoder decoder = {0};                                                                                  \
210
  TYPE     req = {0};                                                                                      \
211
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
212
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
213
  tDecoderInit(&decoder, data, len);                                                                       \
214
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
215
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
216
            " msgType %d",                                                                                 \
217
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
218
    fetchVer++;                                                                                            \
219
    DELETE_FUNC(&req);                                                                                     \
220
    tDecoderClear(&decoder);                                                                               \
221
    continue;                                                                                              \
222
  }                                                                                                        \
223
  DELETE_FUNC(&req);                                                                                       \
224
  tDecoderClear(&decoder);
225

226
static void tDeleteCommon(void* parm) {}
404✔
227

228
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
229
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
230
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
231

232
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
3✔
233
  int32_t         code = 0;
3✔
234

235
  if (!btMetaRsp->batchMetaReq) {
3!
236
    btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
3✔
237
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
3!
238
    btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
3✔
239
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
3!
240
  }
241

242
  SMqMetaRsp tmpMetaRsp = {0};
3✔
243
  tmpMetaRsp.resMsgType = type;
3✔
244
  tmpMetaRsp.metaRspLen = bodyLen;
3✔
245
  tmpMetaRsp.metaRsp = body;
3✔
246
  uint32_t len = 0;
3✔
247
  tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
3!
248
  if (TSDB_CODE_SUCCESS != code) {
3!
UNCOV
249
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
250
    goto END;
×
251
  }
252
  int32_t tLen = sizeof(SMqRspHead) + len;
3✔
253
  void*   tBuf = taosMemoryCalloc(1, tLen);
3!
254
  TQ_NULL_GO_TO_END(tBuf);
3!
255
  void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
3✔
256
  SEncoder encoder = {0};
3✔
257
  tEncoderInit(&encoder, metaBuff, len);
3✔
258
  code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
3✔
259
  tEncoderClear(&encoder);
3✔
260

261
  if (code < 0) {
3!
UNCOV
262
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
263
    goto END;
×
264
  }
265
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
6!
266
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
6!
267

268
END:
3✔
269
  return code;
3✔
270
}
271

272
static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
1✔
273
  int32_t code = 0;
1✔
274
  SVCreateTbBatchReq pReq = {0};
1✔
275
  pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
1✔
276
  pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
1✔
277
  TQ_NULL_GO_TO_END(pReq.pArray);
1!
278
  for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
5✔
279
    void   *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
4✔
280
    TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
8!
281
  }
282
  tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
1!
283
  if (code < 0) {
1!
UNCOV
284
    goto END;
×
285
  }
286
  *len += sizeof(SMsgHead);
1✔
287
  *pBuf = taosMemoryMalloc(*len);
1!
288
  TQ_NULL_GO_TO_END(pBuf);
1!
289
  SEncoder coder = {0};
1✔
290
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
1✔
291
  code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
1✔
292
  tEncoderClear(&coder);
1✔
293

294
END:
1✔
295
  taosArrayDestroy(pReq.pArray);
1✔
296
  return code;
1✔
297
}
298

299
#define SEND_BATCH_META_RSP \
300
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
301
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
302
goto END;
303

304
#define SEND_DATA_RSP \
305
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
306
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
307
goto END;
308
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
2,774,296✔
309
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
310
  int32_t         vgId = TD_VID(pTq->pVnode);
2,774,296✔
311
  SMqDataRsp      taosxRsp = {0};
2,774,296✔
312
  SMqBatchMetaRsp btMetaRsp = {0};
2,774,296✔
313
  int32_t         code = 0;
2,774,296✔
314

315
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
2,774,296!
316
  if (offset->type != TMQ_OFFSET__LOG) {
2,774,295✔
317
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
151!
318

319
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
151✔
320
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
14✔
321
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
14!
322
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
323
      goto END;
14✔
324
    }
325

326
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
137!
327
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
328
    if (taosxRsp.blockNum > 0) {
137✔
329
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
131✔
330
      goto END;
131✔
331
    } else {
332
      tOffsetCopy(offset, &taosxRsp.rspOffset);
6✔
333
    }
334
  }
335

336
  if (offset->type == TMQ_OFFSET__LOG) {
2,774,147!
337
    walReaderVerifyOffset(pHandle->pWalReader, offset);
2,774,147✔
338
    int64_t fetchVer = offset->version;
2,774,153✔
339

340
    uint64_t st = taosGetTimestampMs();
2,774,150✔
341
    int      totalRows = 0;
2,774,150✔
342
    int32_t  totalMetaRows = 0;
2,774,150✔
343
    while (1) {
119,737✔
344
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
2,893,887✔
345
      if (savedEpoch > pRequest->epoch) {
2,893,858!
UNCOV
346
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
347
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
UNCOV
348
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
349
        goto END;
2,774,046✔
350
      }
351

352
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
2,893,858✔
353
        if (totalMetaRows > 0) {
2,773,453✔
354
          SEND_BATCH_META_RSP
2✔
355
        }
356
        SEND_DATA_RSP
2,773,451✔
357
      }
358

359
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
120,473✔
360
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
120,473!
361
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
362

363
      // process meta
364
      if (pHead->msgType != TDMT_VND_SUBMIT) {
120,479✔
365
        if (totalRows > 0) {
666✔
366
          SEND_DATA_RSP
35!
367
        }
368

369
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
631✔
370
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
628✔
371
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
186!
372
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
442✔
373
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
55!
374
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
493✔
375
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
342!
376
          } else if (pHead->msgType == TDMT_VND_DELETE) {
45✔
377
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
7!
378
          }
379
        }
380

381
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
229!
382
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
383
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
229!
384
          SMqMetaRsp metaRsp = {0};
227✔
385
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
227✔
386
          metaRsp.resMsgType = pHead->msgType;
227✔
387
          metaRsp.metaRspLen = pHead->bodyLen;
227✔
388
          metaRsp.metaRsp = pHead->body;
227✔
389
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
227✔
390
          goto END;
227✔
391
        }
392
        code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
2✔
393
        fetchVer++;
2✔
394
        if (code != 0){
2!
UNCOV
395
          goto END;
×
396
        }
397
        totalMetaRows++;
2✔
398
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
4!
UNCOV
399
          SEND_BATCH_META_RSP
×
400
        }
401
        continue;
2✔
402
      }
403

404
      if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
119,813!
UNCOV
405
        SEND_BATCH_META_RSP
×
406
      }
407

408
      // process data
409
      SPackedData submit = {
119,813✔
410
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
119,813✔
411
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
119,813✔
412
          .ver = pHead->version,
119,813✔
413
      };
414

415
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
119,813!
416

417
      if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
119,778✔
418
        int32_t len = 0;
1✔
419
        void *pBuf = NULL;
1✔
420
        code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
1✔
421
        if (code == 0){
1!
422
          code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
1✔
423
        }
424
        taosMemoryFree(pBuf);
1!
425
        taosArrayDestroyP(taosxRsp.createTableReq, NULL);
1✔
426
        taosxRsp.createTableReq = NULL;
1✔
427
        fetchVer++;
1✔
428
        if (code != 0){
1!
429
          goto END;
1✔
430
        }
431
        totalMetaRows++;
1✔
432
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) ||
1!
433
            (taosGetTimestampMs() - st > pRequest->timeout) ||
1!
434
            (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
1!
435
          SEND_BATCH_META_RSP
1✔
436
        }
UNCOV
437
        continue;
×
438
      }
439

440
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
119,777✔
441
          (taosGetTimestampMs() - st > pRequest->timeout) ||
119,340!
442
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
119,347!
443
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
25✔
444
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
437✔
445
          terrno = 0;
6✔
446
        } else{
447
          fetchVer++;
432✔
448
        }
449
        SEND_DATA_RSP
438✔
450
      } else {
451
        fetchVer++;
119,333✔
452
      }
453
    }
454
  }
455

UNCOV
456
END:
×
457
  if (code != 0){
2,774,191!
UNCOV
458
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
459
            pRequest->subKey);
460
  }
461
  tDeleteMqBatchMetaRsp(&btMetaRsp);
2,774,191✔
462
  tDeleteSTaosxRsp(&taosxRsp);
2,774,201✔
463
  return code;
2,774,189✔
464
}
465

466
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
3,176,846✔
467
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
3,176,846!
468
    return TSDB_CODE_TMQ_INVALID_MSG;
2✔
469
  }
470
  int32_t      code = 0;
3,176,844✔
471
  STqOffsetVal reqOffset = {0};
3,176,844✔
472
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
3,176,844✔
473

474
  // reset the offset if needed
475
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
3,176,844✔
476
    bool blockReturned = false;
305,737✔
477
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
305,737✔
478
    if (code != 0) {
305,733✔
479
      goto END;
149,549✔
480
    }
481

482
    // empty block returned, quit
483
    if (blockReturned) {
156,250✔
484
      goto END;
66✔
485
    }
486
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
2,871,107!
UNCOV
487
    uError("req offset type is 0");
×
488
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
489
    goto END;
×
490
  }
491

492
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
3,027,291✔
493
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
252,994✔
494
  } else {
495
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
2,774,297✔
496
  }
497

498
END:
3,176,675✔
499
  if (code != 0){
3,176,675✔
500
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
304,398!
501
  }
502
  tOffsetDestroy(&reqOffset);
3,176,678✔
503
  return code;
3,176,679✔
504
}
505

506
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
2,843,288✔
507
                          int64_t ever) {
508
  if (pMsgHead == NULL) {
2,843,288!
UNCOV
509
    return;
×
510
  }
511
  pMsgHead->consumerId = consumerId;
2,843,288✔
512
  pMsgHead->epoch = epoch;
2,843,288✔
513
  pMsgHead->mqMsgType = type;
2,843,288✔
514
  pMsgHead->walsver = sver;
2,843,288✔
515
  pMsgHead->walever = ever;
2,843,288✔
516
}
517

518
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
17✔
519
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
520
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
17!
UNCOV
521
    return TSDB_CODE_TMQ_INVALID_MSG;
×
522
  }
523
  int32_t len = 0;
17✔
524
  int32_t code = 0;
17✔
525
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
17!
526
  if (code < 0) {
17!
UNCOV
527
    return TAOS_GET_TERRNO(code);
×
528
  }
529
  int32_t tlen = sizeof(SMqRspHead) + len;
17✔
530
  void*   buf = rpcMallocCont(tlen);
17✔
531
  if (buf == NULL) {
17!
UNCOV
532
    return TAOS_GET_TERRNO(terrno);
×
533
  }
534

535
  int64_t sver = 0, ever = 0;
17✔
536
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
17✔
537
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
17✔
538

539
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
17✔
540

541
  SEncoder encoder = {0};
17✔
542
  tEncoderInit(&encoder, abuf, len);
17✔
543
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
17✔
544
  tEncoderClear(&encoder);
17✔
545
  if (code < 0) {
17!
UNCOV
546
    rpcFreeCont(buf);
×
547
    return TAOS_GET_TERRNO(code);
×
548
  }
549
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
17✔
550

551
  tmsgSendRsp(&resp);
17✔
552
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
17!
553
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
554

555
  return 0;
17✔
556
}
557

558
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
227✔
559
                          int32_t vgId) {
560
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
227!
UNCOV
561
    return TSDB_CODE_TMQ_INVALID_MSG;
×
562
  }
563
  int32_t len = 0;
227✔
564
  int32_t code = 0;
227✔
565
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
227!
566
  if (code < 0) {
227!
UNCOV
567
    return TAOS_GET_TERRNO(code);
×
568
  }
569
  int32_t tlen = sizeof(SMqRspHead) + len;
227✔
570
  void*   buf = rpcMallocCont(tlen);
227✔
571
  if (buf == NULL) {
227!
UNCOV
572
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
573
  }
574

575
  int64_t sver = 0, ever = 0;
227✔
576
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
227✔
577
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
227✔
578

579
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
227✔
580

581
  SEncoder encoder = {0};
227✔
582
  tEncoderInit(&encoder, abuf, len);
227✔
583
  code = tEncodeMqMetaRsp(&encoder, pRsp);
227✔
584
  tEncoderClear(&encoder);
227✔
585
  if (code < 0) {
227!
UNCOV
586
    rpcFreeCont(buf);
×
587
    return TAOS_GET_TERRNO(code);
×
588
  }
589

590
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
227✔
591

592
  tmsgSendRsp(&resp);
227✔
593
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
227!
594
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
595

596
  return 0;
227✔
597
}
598

599
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
2,843,070✔
600
                        int32_t type, int64_t sver, int64_t ever) {
601
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
2,843,070!
UNCOV
602
    return TSDB_CODE_TMQ_INVALID_MSG;
×
603
  }
604
  int32_t len = 0;
2,843,070✔
605
  int32_t code = 0;
2,843,070✔
606

607
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
2,843,070✔
608
    pRsp->withSchema = 0;
19,283✔
609
  }
610
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
2,843,070✔
611
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
19,308✔
612
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
613
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
2,843,045!
614
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
25!
615
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
25!
616
  }
617

618
  if (code < 0) {
2,843,033!
UNCOV
619
    return TAOS_GET_TERRNO(code);
×
620
  }
621

622
  int32_t tlen = sizeof(SMqRspHead) + len;
2,843,033✔
623
  void*   buf = rpcMallocCont(tlen);
2,843,033✔
624
  if (buf == NULL) {
2,843,052!
UNCOV
625
    return terrno;
×
626
  }
627

628
  SMqRspHead* pHead = (SMqRspHead*)buf;
2,843,052✔
629
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
2,843,052✔
630

631
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
2,843,045✔
632

633
  SEncoder encoder = {0};
2,843,045✔
634
  tEncoderInit(&encoder, abuf, len);
2,843,045✔
635

636
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
2,843,042✔
637
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
19,308✔
638
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
639
    code = tEncodeMqDataRsp(&encoder, pRsp);
2,843,017✔
640
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
25!
641
    code = tEncodeSTaosxRsp(&encoder, pRsp);
25✔
642
  }
643
  tEncoderClear(&encoder);
2,843,033✔
644
  if (code < 0) {
2,843,037!
UNCOV
645
    rpcFreeCont(buf);
×
646
    return TAOS_GET_TERRNO(code);
×
647
  }
648
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
2,843,037✔
649

650
  tmsgSendRsp(&rsp);
2,843,037✔
651
  return 0;
2,843,007✔
652
}
653

654
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
10,149✔
655
  int32_t     code = 0;
10,149✔
656
  int32_t     line = 0;
10,149✔
657
  SDecoder*   pCoder = &(SDecoder){0};
10,149✔
658
  SDeleteRes* pRes = &(SDeleteRes){0};
10,149✔
659

660
  *pRefBlock = NULL;
10,149✔
661

662
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
10,149✔
663
  TSDB_CHECK_NULL(pRes->uidList, code, line, END, terrno)
10,149!
664

665
  tDecoderInit(pCoder, (uint8_t*)pData, len);
10,149✔
666
  code = tDecodeDeleteRes(pCoder, pRes);
10,149✔
667
  TSDB_CHECK_CODE(code, line, END);
10,147!
668

669
  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
10,147✔
670
  if (numOfTables == 0 || pRes->affectedRows == 0) {
10,147✔
671
    goto END;
7,730✔
672
  }
673

674
  SSDataBlock* pDelBlock = NULL;
2,417✔
675
  code = createSpecialDataBlock(blockType, &pDelBlock);
2,417✔
676
  TSDB_CHECK_CODE(code, line, END);
2,419!
677

678
  code = blockDataEnsureCapacity(pDelBlock, numOfTables);
2,419✔
679
  TSDB_CHECK_CODE(code, line, END);
2,418!
680

681
  pDelBlock->info.rows = numOfTables;
2,418✔
682
  pDelBlock->info.version = ver;
2,418✔
683

684
  for (int32_t i = 0; i < numOfTables; i++) {
4,840✔
685
    // start key column
686
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
2,426✔
687
    TSDB_CHECK_NULL(pStartCol, code, line, END, terrno)
2,426!
688
    code = colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
2,426✔
689
    TSDB_CHECK_CODE(code, line, END);
2,422!
690
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
2,422✔
691
    TSDB_CHECK_NULL(pEndCol, code, line, END, terrno)
2,422!
692
    code = colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
2,422✔
693
    TSDB_CHECK_CODE(code, line, END);
2,424!
694
    // uid column
695
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
2,424✔
696
    TSDB_CHECK_NULL(pUidCol, code, line, END, terrno)
2,424!
697

698
    int64_t* pUid = taosArrayGet(pRes->uidList, i);
2,424✔
699
    code = colDataSetVal(pUidCol, i, (const char*)pUid, false);
2,424✔
700
    TSDB_CHECK_CODE(code, line, END);
2,426!
701
    void* tmp = taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX);
2,426✔
702
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,426✔
703
    colDataSetNULL(tmp, i);
2,425!
704
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
2,425✔
705
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,424!
706
    colDataSetNULL(tmp, i);
2,425✔
707
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
2,425✔
708
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,423!
709
    colDataSetNULL(tmp, i);
2,423!
710
    tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
2,423✔
711
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,422!
712
    colDataSetNULL(tmp, i);
2,422!
713
  }
714

715
  if (type == 0) {
2,414✔
716
    code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
2,409✔
717
    if (code) {
2,409!
UNCOV
718
      blockDataCleanup(pDelBlock);
×
719
      taosMemoryFree(pDelBlock);
×
720
      return code;
×
721
    }
722

723
    ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
2,411✔
724
    ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
2,411✔
725
  } else if (type == 1) {
5!
726
    *pRefBlock = pDelBlock;
8✔
727
  } else {
UNCOV
728
    tqError("unknown type:%d", type);
×
729
    code = TSDB_CODE_TMQ_CONSUMER_ERROR;
×
730
  }
731

732
END:
10,149✔
733
  if (code != 0) {
10,149!
UNCOV
734
    tqError("failed to extract delete data block, line:%d code:%d", line, code);
×
735
  }
736
  tDecoderClear(pCoder);
10,149✔
737
  taosArrayDestroy(pRes->uidList);
10,149✔
738
  return code;
10,149✔
739
}
740

741
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
6,033✔
742
  SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
6,033✔
743
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
6,033✔
744
  int32_t      code = TSDB_CODE_SUCCESS;
6,033✔
745

746
  if (pDelay != NULL) {
6,033✔
747
    *pDelay = 0;
6,032✔
748
  }
749

750
  *fhFinished = false;
6,033✔
751

752
  if (numOfTasks <= 0) {
6,033!
UNCOV
753
    return code;
×
754
  }
755

756
  // extract the required source task for a given stream, identified by streamId
757
  streamMetaRLock(pMeta);
6,033✔
758

759
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
6,033✔
760

761
  for (int32_t i = 0; i < numOfTasks; ++i) {
36,908✔
762
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
30,874✔
763
    if (pId == NULL) {
30,871!
764
      continue;
24,847✔
765
    }
766
    if (pId->streamId != streamId) {
30,871✔
767
      continue;
18,813✔
768
    }
769

770
    STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
12,058✔
771
    SStreamTask* pTask = NULL;
12,058✔
772

773
    code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
12,058✔
774
    if (code != 0) {
12,068!
UNCOV
775
      tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
×
776
      continue;
×
777
    }
778

779
    if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
12,068✔
780
      streamMetaReleaseTask(pMeta, pTask);
6,034✔
781
      continue;
6,034✔
782
    }
783

784
    // here we get the required stream source task
785
    *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
6,034✔
786

787
    int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
6,034✔
788
    if (ver == -1) {
6,034✔
789
      ver = pTask->chkInfo.processedVer;
1,293✔
790
    } else {
791
      ver--;
4,741✔
792
    }
793

794
    SVersionRange verRange = {0};
6,034✔
795
    walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
6,034✔
796

797
    SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
6,034✔
798
    if (pReader == NULL) {
6,034!
UNCOV
799
      tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
×
800
      streamMetaReleaseTask(pMeta, pTask);
×
801
      continue;
×
802
    }
803

804
    int64_t cur = 0;
6,034✔
805
    int64_t latest = 0;
6,034✔
806

807
    code = walFetchHead(pReader, ver);
6,034✔
808
    if (code == TSDB_CODE_SUCCESS) {
6,033!
809
      cur = pReader->pHead->head.ingestTs;
6,033✔
810
    }
811

812
    if (ver == verRange.maxVer) {
6,033✔
813
      latest = cur;
2,575✔
814
    } else {
815
      code = walFetchHead(pReader, verRange.maxVer);
3,458✔
816
      if (code == TSDB_CODE_SUCCESS) {
3,458!
817
        latest = pReader->pHead->head.ingestTs;
3,458✔
818
      }
819
    }
820

821
    if (pDelay != NULL) {  // delay in ms
6,033!
822
      *pDelay = (latest - cur) / 1000;
6,033✔
823
    }
824

825
    walCloseReader(pReader);
6,033✔
826
    streamMetaReleaseTask(pMeta, pTask);
6,034✔
827
  }
828

829
  streamMetaRUnLock(pMeta);
6,034✔
830

831
  return TSDB_CODE_SUCCESS;
6,034✔
832
}
833

834
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
185✔
835
  int32_t          code = 0;
185✔
836
  int32_t          lino = 0;
185✔
837
  SDecoder         dc = {0};
185✔
838
  SVDropTbBatchReq batchReq = {0};
185✔
839
  tDecoderInit(&dc, (uint8_t*)data, len);
185✔
840
  code = tDecodeSVDropTbBatchReq(&dc, &batchReq);
185✔
841
  TSDB_CHECK_CODE(code, lino, _exit);
185!
842
  if (batchReq.nReqs <= 0) goto _exit;
185!
843

844
  SSDataBlock* pBlock = NULL;
185✔
845
  code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock);
185✔
846
  TSDB_CHECK_CODE(code, lino, _exit);
185!
847

848
  code = blockDataEnsureCapacity(pBlock, batchReq.nReqs);
185✔
849
  TSDB_CHECK_CODE(code, lino, _exit);
185!
850

851
  pBlock->info.rows = batchReq.nReqs;
185✔
852
  pBlock->info.version = ver;
185✔
853
  for (int32_t i = 0; i < batchReq.nReqs; ++i) {
375✔
854
    SVDropTbReq* pReq = batchReq.pReqs + i;
190✔
855
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
190✔
856
    TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
190!
857
    code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
190✔
858
    TSDB_CHECK_CODE(code, lino, _exit);
190!
859
  }
860

861
  code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
185✔
862
  TSDB_CHECK_CODE(code, lino, _exit);
185!
863
  ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
185✔
864
  ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock;
185✔
865

866
_exit:
185✔
867
  tDecoderClear(&dc);
185✔
868
  if (TSDB_CODE_SUCCESS != code) {
185!
UNCOV
869
    tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code));
×
870
    blockDataCleanup(pBlock);
×
871
    taosMemoryFree(pBlock);
×
872
  }
873
  return code;
185✔
874
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc