• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.98
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
62,240✔
24
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
62,240✔
25
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
62,241✔
26

27
  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
62,240✔
28
    return terrno;
2✔
29
  }
30

31
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
62,238✔
32
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
62,239✔
33
  pRsp->withTbName = 0;
62,239✔
34
  pRsp->withSchema = false;
62,239✔
35
  return 0;
62,239✔
36
}
37

38
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
12,297✔
39
  SSyncState state = syncGetState(pTq->pVnode->sync);
12,297✔
40
  streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
12,300✔
41
}
12,301✔
42

43
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
9,799✔
44
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
9,799✔
45
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
9,799✔
46

47
  pRsp->withTbName = 1;
9,799✔
48
  pRsp->withSchema = 1;
9,799✔
49
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
9,799✔
50
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
9,799✔
51
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
9,799✔
52
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
9,799✔
53

54
  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL ||
9,799!
55
      pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
9,799!
56
    if (pRsp->blockData != NULL) {
×
57
      taosArrayDestroy(pRsp->blockData);
×
58
      pRsp->blockData = NULL;
×
59
    }
60

61
    if (pRsp->blockDataLen != NULL) {
×
62
      taosArrayDestroy(pRsp->blockDataLen);
×
63
      pRsp->blockDataLen = NULL;
×
64
    }
65

66
    if (pRsp->blockTbName != NULL) {
×
67
      taosArrayDestroy(pRsp->blockTbName);
×
68
      pRsp->blockTbName = NULL;
×
69
    }
70

71
    if (pRsp->blockSchema != NULL) {
×
72
      taosArrayDestroy(pRsp->blockSchema);
×
73
      pRsp->blockSchema = NULL;
×
74
    }
75

76
    return terrno;
×
77
  }
78

79
  return 0;
9,799✔
80
}
81

82
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
2,560✔
83
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
84
  uint64_t   consumerId = pRequest->consumerId;
2,560✔
85
  STqOffset* pOffset = NULL;
2,560✔
86
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
2,560✔
87
  int32_t    vgId = TD_VID(pTq->pVnode);
2,558✔
88

89
  *pBlockReturned = false;
2,558✔
90
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
91
  if (code == 0) {
2,558✔
92
    tOffsetCopy(pOffsetVal, &pOffset->val);
172✔
93

94
    char formatBuf[TSDB_OFFSET_LEN] = {0};
174✔
95
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
174✔
96
    tqDebug("tmq poll: consumer:0x%" PRIx64
174!
97
            ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
98
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
99
    return 0;
174✔
100
  } else {
101
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
102
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
2,386✔
103
      if (pRequest->useSnapshot) {
1,684✔
104
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
842!
105
                consumerId, pHandle->subKey, vgId);
106
        if (pHandle->fetchMeta) {
842✔
107
          tqOffsetResetToMeta(pOffsetVal, 0);
108
        } else {
109
          SValue val = {0};
824!
110
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
824✔
111
        }
112
      } else {
113
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
842✔
114
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
843✔
115
      }
116
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
702✔
117
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
54✔
118
      SMqDataRsp dataRsp = {0};
54✔
119
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
54✔
120

121
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
54✔
122
      if (code != 0) {
54!
123
        return code;
×
124
      }
125
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
54!
126
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
127
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
54✔
128
      tDeleteMqDataRsp(&dataRsp);
54✔
129

130
      *pBlockReturned = true;
54✔
131
      return code;
54✔
132
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
648!
133
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
648!
134
              " in vg %d, subkey %s, reset none failed",
135
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
136
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
648✔
137
    }
138
  }
139

140
  return 0;
1,685✔
141
}
142

143
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
61,491✔
144
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
145
  uint64_t consumerId = pRequest->consumerId;
61,491✔
146
  int32_t  vgId = TD_VID(pTq->pVnode);
61,491✔
147
  terrno = 0;
61,491✔
148

149
  SMqDataRsp dataRsp = {0};
61,492✔
150
  int code = tqInitDataRsp(&dataRsp, *pOffset);
61,492✔
151
  if (code != 0) {
61,491!
152
    goto end;
×
153
  }
154

155
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
61,491✔
156
  if (code != 0) {
61,490!
157
    goto end;
×
158
  }
159

160
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
61,490✔
161
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
61,492✔
162
    goto end;
649✔
163
  }
164

165
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
166
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
60,843✔
167
    // lock
168
    taosWLockLatch(&pTq->lock);
28,987✔
169
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
28,987✔
170
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
28,987✔
171
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
28,044✔
172
      taosWUnLockLatch(&pTq->lock);
28,044✔
173
      goto end;
28,044✔
174
    }
175
    taosWUnLockLatch(&pTq->lock);
943✔
176
  }
177

178
  // reqOffset represents the current date offset, may be changed if wal not exists
179
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
32,799✔
180
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
32,799✔
181

182
end : {
61,492✔
183
  char buf[TSDB_OFFSET_LEN] = {0};
61,492✔
184
  tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
61,492✔
185
  tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64
61,492!
186
          " code:%d",
187
          consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
188
  tDeleteMqDataRsp(&dataRsp);
61,492✔
189
  return code;
61,492✔
190
}
191
}
192

193
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
194
  SDecoder decoder = {0};                                                                                  \
195
  TYPE     req = {0};                                                                                      \
196
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
197
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
198
  tDecoderInit(&decoder, data, len);                                                                       \
199
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
200
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
201
            " msgType %d",                                                                                 \
202
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
203
    fetchVer++;                                                                                            \
204
    DELETE_FUNC(&req);                                                                                     \
205
    tDecoderClear(&decoder);                                                                               \
206
    continue;                                                                                              \
207
  }                                                                                                        \
208
  DELETE_FUNC(&req);                                                                                       \
209
  tDecoderClear(&decoder);
210

211
static void tDeleteCommon(void* parm) {}
320✔
212

213
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
9,799✔
214
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
215
  int32_t         vgId = TD_VID(pTq->pVnode);
9,799✔
216
  SMqDataRsp      taosxRsp = {0};
9,799✔
217
  SMqBatchMetaRsp btMetaRsp = {0};
9,799✔
218
  int32_t         code = 0;
9,799✔
219

220
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
9,799!
221
  if (offset->type != TMQ_OFFSET__LOG) {
9,799✔
222
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset));
166!
223

224
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
166✔
225
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
16✔
226
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
16!
227
              ",ts:%" PRId64,
228
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,
229
              btMetaRsp.rspOffset.ts);
230
      goto END;
16✔
231
    }
232

233
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
150!
234
            ",ts:%" PRId64,
235
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
236
            taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
237
    if (taosxRsp.blockNum > 0) {
150✔
238
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
141✔
239
      goto END;
141✔
240
    } else {
241
      tOffsetCopy(offset, &taosxRsp.rspOffset);
9✔
242
    }
243
  }
244

245
  if (offset->type == TMQ_OFFSET__LOG) {
9,642!
246
    walReaderVerifyOffset(pHandle->pWalReader, offset);
9,642✔
247
    int64_t fetchVer = offset->version;
9,642✔
248

249
    uint64_t st = taosGetTimestampMs();
9,642✔
250
    int      totalRows = 0;
9,642✔
251
    int32_t  totalMetaRows = 0;
9,642✔
252
    while (1) {
121,221✔
253
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
130,863✔
254
      if (savedEpoch > pRequest->epoch) {
130,807!
255
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
256
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
257
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
258
        goto END;
9,642✔
259
      }
260

261
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
130,807✔
262
        if (totalMetaRows > 0) {
8,901✔
263
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
2✔
264
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
2✔
265
          if (totalRows != 0) {
2!
266
            tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, totalRows error, vgId:%d offset %" PRId64,
×
267
                    pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
268
            code = code == 0 ? TSDB_CODE_TQ_INTERNAL_ERROR : code;
×
269
          }
270
          goto END;
2✔
271
        }
272
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
8,899✔
273
        code = tqSendDataRsp(
8,899✔
274
            pHandle, pMsg, pRequest, &taosxRsp,
275
            taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
8,899✔
276
        goto END;
8,899✔
277
      }
278

279
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
122,092✔
280
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
122,092!
281
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
282

283
      // process meta
284
      if (pHead->msgType != TDMT_VND_SUBMIT) {
122,101✔
285
        if (totalRows > 0) {
556✔
286
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
26✔
287
          code = tqSendDataRsp(
26✔
288
              pHandle, pMsg, pRequest, &taosxRsp,
289
              taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
26✔
290
          goto END;
214✔
291
        }
292

293
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
530✔
294
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
527✔
295
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
169!
296
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
358✔
297
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
44!
298
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
394✔
299
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
271!
300
          } else if (pHead->msgType == TDMT_VND_DELETE) {
43✔
301
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
5!
302
          }
303
        }
304

305
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
190!
306
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
307
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
190!
308
          SMqMetaRsp metaRsp = {0};
188✔
309
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
188✔
310
          metaRsp.resMsgType = pHead->msgType;
188✔
311
          metaRsp.metaRspLen = pHead->bodyLen;
188✔
312
          metaRsp.metaRsp = pHead->body;
188✔
313
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
188✔
314
          goto END;
188✔
315
        }
316

317
        if (!btMetaRsp.batchMetaReq) {
2!
318
          btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
2✔
319
          if (btMetaRsp.batchMetaReq == NULL) {
2!
320
            code = TAOS_GET_TERRNO(terrno);
×
321
            goto END;
×
322
          }
323
          btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
2✔
324
          if (btMetaRsp.batchMetaLen == NULL) {
2!
325
            code = TAOS_GET_TERRNO(terrno);
×
326
            goto END;
×
327
          }
328
        }
329
        fetchVer++;
2✔
330

331
        SMqMetaRsp tmpMetaRsp = {0};
2✔
332
        tmpMetaRsp.resMsgType = pHead->msgType;
2✔
333
        tmpMetaRsp.metaRspLen = pHead->bodyLen;
2✔
334
        tmpMetaRsp.metaRsp = pHead->body;
2✔
335
        uint32_t len = 0;
2✔
336
        tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
2!
337
        if (TSDB_CODE_SUCCESS != code) {
2!
338
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
339
          continue;
×
340
        }
341
        int32_t tLen = sizeof(SMqRspHead) + len;
2✔
342
        void*   tBuf = taosMemoryCalloc(1, tLen);
2!
343
        if (tBuf == NULL) {
2!
344
          code = TAOS_GET_TERRNO(terrno);
×
345
          goto END;
×
346
        }
347
        void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
2✔
348
        SEncoder encoder = {0};
2✔
349
        tEncoderInit(&encoder, metaBuff, len);
2✔
350
        code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
2✔
351
        tEncoderClear(&encoder);
2✔
352

353
        if (code < 0) {
2!
354
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
355
          continue;
×
356
        }
357
        if (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf) == NULL) {
4!
358
          code = TAOS_GET_TERRNO(terrno);
×
359
          goto END;
×
360
        }
361
        if (taosArrayPush(btMetaRsp.batchMetaLen, &tLen) == NULL) {
4!
362
          code = TAOS_GET_TERRNO(terrno);
×
363
          goto END;
×
364
        }
365
        totalMetaRows++;
2✔
366
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) {
4!
367
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
368
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
369
          goto END;
×
370
        }
371
        continue;
2✔
372
      }
373

374
      if (totalMetaRows > 0) {
121,545!
375
        tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
376
        code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
377
        goto END;
×
378
      }
379

380
      // process data
381
      SPackedData submit = {
121,545✔
382
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
121,545✔
383
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
121,545✔
384
          .ver = pHead->version,
121,545✔
385
      };
386

387
      code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded);
121,545✔
388
      if (code < 0) {
121,421!
UNCOV
389
        tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
390
                pRequest->subKey);
391
        goto END;
×
392
      }
393

394
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
242,295!
395
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
527✔
396
        code = tqSendDataRsp(
527✔
397
            pHandle, pMsg, pRequest, &taosxRsp,
398
            taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
527!
399
        goto END;
527✔
400
      } else {
401
        fetchVer++;
120,879✔
402
      }
403
    }
404
  }
405

406
END:
×
407
  tDeleteMqBatchMetaRsp(&btMetaRsp);
9,799✔
408
  tDeleteSTaosxRsp(&taosxRsp);
9,799✔
409
  return code;
9,796✔
410
}
411

412
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
71,989✔
413
  int32_t      code = 0;
71,989✔
414
  STqOffsetVal reqOffset = {0};
71,989✔
415
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
71,989✔
416

417
  // reset the offset if needed
418
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
71,992✔
419
    bool blockReturned = false;
2,560✔
420
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
2,560✔
421
    if (code != 0) {
2,560✔
422
      goto END;
702✔
423
    }
424

425
    // empty block returned, quit
426
    if (blockReturned) {
1,912✔
427
      goto END;
54✔
428
    }
429
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
69,432!
430
    uError("req offset type is 0");
×
431
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
432
    goto END;
×
433
  }
434

435
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
71,290✔
436
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
61,491✔
437
  } else {
438
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
9,799✔
439
  }
440

441
END:
71,989✔
442
  tOffsetDestroy(&reqOffset);
71,989✔
443
  return code;
71,989✔
444
}
445

446
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
43,346✔
447
                          int64_t ever) {
448
  pMsgHead->consumerId = consumerId;
43,346✔
449
  pMsgHead->epoch = epoch;
43,346✔
450
  pMsgHead->mqMsgType = type;
43,346✔
451
  pMsgHead->walsver = sver;
43,346✔
452
  pMsgHead->walever = ever;
43,346✔
453
}
43,346✔
454

455
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
18✔
456
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
457
  int32_t len = 0;
18✔
458
  int32_t code = 0;
18✔
459
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
18!
460
  if (code < 0) {
18!
461
    return TAOS_GET_TERRNO(code);
×
462
  }
463
  int32_t tlen = sizeof(SMqRspHead) + len;
18✔
464
  void*   buf = rpcMallocCont(tlen);
18✔
465
  if (buf == NULL) {
18!
466
    return TAOS_GET_TERRNO(terrno);
×
467
  }
468

469
  int64_t sver = 0, ever = 0;
18✔
470
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
18✔
471
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
18✔
472

473
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
18✔
474

475
  SEncoder encoder = {0};
18✔
476
  tEncoderInit(&encoder, abuf, len);
18✔
477
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
18✔
478
  tEncoderClear(&encoder);
18✔
479
  if (code < 0) {
18!
480
    rpcFreeCont(buf);
×
481
    return TAOS_GET_TERRNO(code);
×
482
  }
483
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
18✔
484

485
  tmsgSendRsp(&resp);
18✔
486
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
18!
487
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
488

489
  return 0;
18✔
490
}
491

492
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
188✔
493
                          int32_t vgId) {
494
  int32_t len = 0;
188✔
495
  int32_t code = 0;
188✔
496
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
188!
497
  if (code < 0) {
188!
498
    return TAOS_GET_TERRNO(code);
×
499
  }
500
  int32_t tlen = sizeof(SMqRspHead) + len;
188✔
501
  void*   buf = rpcMallocCont(tlen);
188✔
502
  if (buf == NULL) {
188!
503
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
504
  }
505

506
  int64_t sver = 0, ever = 0;
188✔
507
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
188✔
508
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
188✔
509

510
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
188✔
511

512
  SEncoder encoder = {0};
188✔
513
  tEncoderInit(&encoder, abuf, len);
188✔
514
  code = tEncodeMqMetaRsp(&encoder, pRsp);
188✔
515
  tEncoderClear(&encoder);
188✔
516
  if (code < 0) {
188!
517
    rpcFreeCont(buf);
×
518
    return TAOS_GET_TERRNO(code);
×
519
  }
520

521
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
188✔
522

523
  tmsgSendRsp(&resp);
188✔
524
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
188!
525
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
526

527
  return 0;
188✔
528
}
529

530
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
43,141✔
531
                        int32_t type, int64_t sver, int64_t ever) {
532
  int32_t len = 0;
43,141✔
533
  int32_t code = 0;
43,141✔
534

535
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
43,141✔
536
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
43,128!
537
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
13!
538
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
13!
539
  }
540

541
  if (code < 0) {
43,141!
542
    return TAOS_GET_TERRNO(code);
×
543
  }
544

545
  int32_t tlen = sizeof(SMqRspHead) + len;
43,141✔
546
  void*   buf = rpcMallocCont(tlen);
43,141✔
547
  if (buf == NULL) {
43,141!
548
    return terrno;
×
549
  }
550

551
  SMqRspHead* pHead = (SMqRspHead*)buf;
43,141✔
552
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
43,141✔
553

554
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
43,140✔
555

556
  SEncoder encoder = {0};
43,140✔
557
  tEncoderInit(&encoder, abuf, len);
43,140✔
558

559
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
43,140✔
560
    code = tEncodeMqDataRsp(&encoder, pRsp);
43,127✔
561
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
13!
562
    code = tEncodeSTaosxRsp(&encoder, pRsp);
13✔
563
  }
564
  tEncoderClear(&encoder);
43,139✔
565
  if (code < 0) {
43,139!
566
    rpcFreeCont(buf);
×
567
    return TAOS_GET_TERRNO(code);
×
568
  }
569
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
43,139✔
570

571
  tmsgSendRsp(&rsp);
43,139✔
572
  return 0;
43,141✔
573
}
574

575
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
10,329✔
576
  int32_t     code = 0;
10,329✔
577
  int32_t     line = 0;
10,329✔
578
  SDecoder*   pCoder = &(SDecoder){0};
10,329✔
579
  SDeleteRes* pRes = &(SDeleteRes){0};
10,329✔
580

581
  *pRefBlock = NULL;
10,329✔
582

583
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
10,329✔
584
  TSDB_CHECK_NULL(pRes->uidList, code, line, END, terrno)
10,329!
585

586
  tDecoderInit(pCoder, (uint8_t*)pData, len);
10,329✔
587
  code = tDecodeDeleteRes(pCoder, pRes);
10,329✔
588
  TSDB_CHECK_CODE(code, line, END);
10,324!
589

590
  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
10,324✔
591
  if (numOfTables == 0 || pRes->affectedRows == 0) {
10,324✔
592
    goto END;
7,875✔
593
  }
594

595
  SSDataBlock* pDelBlock = NULL;
2,449✔
596
  code = createSpecialDataBlock(blockType, &pDelBlock);
2,449✔
597
  TSDB_CHECK_CODE(code, line, END);
2,449!
598

599
  code = blockDataEnsureCapacity(pDelBlock, numOfTables);
2,449✔
600
  TSDB_CHECK_CODE(code, line, END);
2,449!
601

602
  pDelBlock->info.rows = numOfTables;
2,449✔
603
  pDelBlock->info.version = ver;
2,449✔
604

605
  for (int32_t i = 0; i < numOfTables; i++) {
4,905✔
606
    // start key column
607
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
2,456✔
608
    TSDB_CHECK_NULL(pStartCol, code, line, END, terrno)
2,456!
609
    code = colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
2,456✔
610
    TSDB_CHECK_CODE(code, line, END);
2,456!
611
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
2,456✔
612
    TSDB_CHECK_NULL(pEndCol, code, line, END, terrno)
2,456!
613
    code = colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
2,456✔
614
    TSDB_CHECK_CODE(code, line, END);
2,456!
615
    // uid column
616
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
2,456✔
617
    TSDB_CHECK_NULL(pUidCol, code, line, END, terrno)
2,456!
618

619
    int64_t* pUid = taosArrayGet(pRes->uidList, i);
2,456✔
620
    code = colDataSetVal(pUidCol, i, (const char*)pUid, false);
2,456✔
621
    TSDB_CHECK_CODE(code, line, END);
2,456!
622
    void* tmp = taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX);
2,456✔
623
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,456!
624
    colDataSetNULL(tmp, i);
2,456!
625
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
2,456✔
626
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,456!
627
    colDataSetNULL(tmp, i);
2,456!
628
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
2,456✔
629
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,456!
630
    colDataSetNULL(tmp, i);
2,456!
631
    tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
2,456✔
632
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,456!
633
    colDataSetNULL(tmp, i);
2,456!
634
  }
635

636
  if (type == 0) {
2,449✔
637
    code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
2,441✔
638
    if (code) {
2,441!
639
      blockDataCleanup(pDelBlock);
×
640
      taosMemoryFree(pDelBlock);
×
641
      return code;
×
642
    }
643

644
    ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
2,441✔
645
    ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
2,441✔
646
  } else if (type == 1) {
8!
647
    *pRefBlock = pDelBlock;
8✔
648
  } else {
649
    tqError("unknown type:%d", type);
×
650
    code = TSDB_CODE_TMQ_CONSUMER_ERROR;
×
651
  }
652

653
END:
10,324✔
654
  if (code != 0) {
10,324!
655
    tqError("failed to extract delete data block, line:%d code:%d", line, code);
×
656
  }
657
  tDecoderClear(pCoder);
10,324✔
658
  taosArrayDestroy(pRes->uidList);
10,325✔
659
  return code;
10,327✔
660
}
661

662
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
5,357✔
663
  SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
5,357✔
664
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,357✔
665
  int32_t      code = TSDB_CODE_SUCCESS;
5,358✔
666

667
  if (pDelay != NULL) {
5,358!
668
    *pDelay = 0;
5,358✔
669
  }
670

671
  *fhFinished = false;
5,358✔
672

673
  if (numOfTasks <= 0) {
5,358!
674
    return code;
×
675
  }
676

677
  // extract the required source task for a given stream, identified by streamId
678
  streamMetaRLock(pMeta);
5,358✔
679

680
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,357✔
681

682
  for (int32_t i = 0; i < numOfTasks; ++i) {
32,777✔
683
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
27,419✔
684
    if (pId == NULL) {
27,420!
685
      continue;
22,068✔
686
    }
687
    if (pId->streamId != streamId) {
27,420✔
688
      continue;
16,710✔
689
    }
690

691
    STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
10,710✔
692
    SStreamTask* pTask = NULL;
10,710✔
693

694
    code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
10,710✔
695
    if (code != 0) {
10,716!
696
      tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
×
697
      continue;
×
698
    }
699

700
    if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
10,716✔
701
      streamMetaReleaseTask(pMeta, pTask);
5,358✔
702
      continue;
5,358✔
703
    }
704

705
    // here we get the required stream source task
706
    *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
5,358✔
707

708
    int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
5,358✔
709
    if (ver == -1) {
5,358✔
710
      ver = pTask->chkInfo.processedVer;
1,723✔
711
    } else {
712
      ver--;
3,635✔
713
    }
714

715
    SVersionRange verRange = {0};
5,358✔
716
    walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
5,358✔
717

718
    SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
5,358✔
719
    if (pReader == NULL) {
5,358!
720
      tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
×
721
      streamMetaReleaseTask(pMeta, pTask);
×
722
      continue;
×
723
    }
724

725
    int64_t cur = 0;
5,358✔
726
    int64_t latest = 0;
5,358✔
727

728
    code = walFetchHead(pReader, ver);
5,358✔
729
    if (code == TSDB_CODE_SUCCESS) {
5,358!
730
      cur = pReader->pHead->head.ingestTs;
5,358✔
731
    }
732

733
    if (ver == verRange.maxVer) {
5,358✔
734
      latest = cur;
2,208✔
735
    } else {
736
      code = walFetchHead(pReader, verRange.maxVer);
3,150✔
737
      if (code == TSDB_CODE_SUCCESS) {
3,150!
738
        latest = pReader->pHead->head.ingestTs;
3,150✔
739
      }
740
    }
741

742
    if (pDelay != NULL) {  // delay in ms
5,358!
743
      *pDelay = (latest - cur) / 1000;
5,358✔
744
    }
745

746
    walCloseReader(pReader);
5,358✔
747
    streamMetaReleaseTask(pMeta, pTask);
5,358✔
748
  }
749

750
  streamMetaRUnLock(pMeta);
5,358✔
751

752
  return TSDB_CODE_SUCCESS;
5,358✔
753
}
754

755
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
185✔
756
  int32_t          code = 0;
185✔
757
  int32_t          lino = 0;
185✔
758
  SDecoder         dc = {0};
185✔
759
  SVDropTbBatchReq batchReq = {0};
185✔
760
  tDecoderInit(&dc, (uint8_t*)data, len);
185✔
761
  code = tDecodeSVDropTbBatchReq(&dc, &batchReq);
185✔
762
  TSDB_CHECK_CODE(code, lino, _exit);
185!
763
  if (batchReq.nReqs <= 0) goto _exit;
185!
764

765
  SSDataBlock* pBlock = NULL;
185✔
766
  code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock);
185✔
767
  TSDB_CHECK_CODE(code, lino, _exit);
185!
768

769
  code = blockDataEnsureCapacity(pBlock, batchReq.nReqs);
185✔
770
  TSDB_CHECK_CODE(code, lino, _exit);
185!
771

772
  pBlock->info.rows = batchReq.nReqs;
185✔
773
  pBlock->info.version = ver;
185✔
774
  for (int32_t i = 0; i < batchReq.nReqs; ++i) {
375✔
775
    SVDropTbReq* pReq = batchReq.pReqs + i;
190✔
776
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
190✔
777
    TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
190!
778
    code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
190✔
779
    TSDB_CHECK_CODE(code, lino, _exit);
190!
780
  }
781

782
  code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
185✔
783
  TSDB_CHECK_CODE(code, lino, _exit);
185!
784
  ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
185✔
785
  ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock;
185✔
786

787
_exit:
185✔
788
  tDecoderClear(&dc);
185✔
789
  if (TSDB_CODE_SUCCESS != code) {
185!
790
    tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code));
×
791
    blockDataCleanup(pBlock);
×
792
    taosMemoryFree(pBlock);
×
793
  }
794
  return code;
185✔
795
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc