• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4868

26 Nov 2025 05:46AM UTC coverage: 64.629% (+0.2%) from 64.473%
#4868

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

770 of 945 new or added lines in 33 files covered. (81.48%)

3010 existing lines in 120 files now uncovered.

158425 of 245129 relevant lines covered (64.63%)

113048242.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.53
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
3,690,619✔
24
  int32_t    code = TDB_CODE_SUCCESS;
3,690,619✔
25
  int32_t    lino = 0;
3,690,619✔
26
  tqDebug("%s called", __FUNCTION__ );
3,690,619✔
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
3,690,619✔
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
3,690,619✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
3,690,619✔
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
3,690,619✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
3,690,530✔
34

35
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
3,690,619✔
36
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
3,690,619✔
37
  pRsp->withTbName = 0;
3,690,619✔
38
  pRsp->withSchema = false;
3,690,619✔
39

40
END:
3,690,564✔
41
  if (code != 0){
3,690,564✔
42
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
43
  }
44
  return code;
3,690,458✔
45
}
46

47
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
1,067,168✔
48
  int32_t    code = TDB_CODE_SUCCESS;
1,067,168✔
49
  int32_t    lino = 0;
1,067,168✔
50
  tqDebug("%s called", __FUNCTION__ );
1,067,168✔
51
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
1,067,160✔
52
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
1,067,160✔
53
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
1,067,237✔
54

55
  pRsp->withTbName = 1;
1,067,237✔
56
  pRsp->withSchema = 1;
1,067,237✔
57
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
1,067,237✔
58
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
1,067,237✔
59

60
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
1,067,237✔
61
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
1,067,228✔
62

63
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
1,067,173✔
64
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
1,067,237✔
65

66
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
1,067,087✔
67
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
1,067,173✔
68

69
END:
1,067,237✔
70
  if (code != 0){
1,067,237✔
71
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
72
    taosArrayDestroy(pRsp->blockData);
×
73
    taosArrayDestroy(pRsp->blockDataLen);
×
74
    taosArrayDestroy(pRsp->blockTbName);
×
75
    taosArrayDestroy(pRsp->blockSchema);
×
76
  }
77
  return code;
1,067,228✔
78
}
79

80
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
223,352✔
81
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
82
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
223,352✔
83
    return TSDB_CODE_INVALID_PARA;
×
84
  }
85
  uint64_t   consumerId = pRequest->consumerId;
223,592✔
86
  STqOffset* pOffset = NULL;
223,592✔
87
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
223,592✔
88
  int32_t    vgId = TD_VID(pTq->pVnode);
223,282✔
89

90
  *pBlockReturned = false;
223,326✔
91
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
92
  if (code == 0) {
222,539✔
93
    tOffsetCopy(pOffsetVal, &pOffset->val);
44,710✔
94

95
    char formatBuf[TSDB_OFFSET_LEN] = {0};
44,710✔
96
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
44,710✔
97
    tqDebug("tmq poll: consumer:0x%" PRIx64
44,710✔
98
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
99
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
100
    return 0;
44,710✔
101
  } else {
102
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
103
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
177,829✔
104
      if (pRequest->useSnapshot) {
138,965✔
105
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
54,892✔
106
                consumerId, pHandle->subKey, vgId);
107
        if (pHandle->fetchMeta) {
55,018✔
108
          tqOffsetResetToMeta(pOffsetVal, 0);
109
        } else {
110
          SValue val = {0};
54,148✔
111
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
×
112
        }
113
      } else {
114
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
83,613✔
115
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
84,363✔
116
      }
117
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
38,911✔
118
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
8,407✔
119
      SMqDataRsp dataRsp = {0};
8,407✔
120
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
8,407✔
121

122
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
8,407✔
123
      if (code != 0) {
8,407✔
124
        return code;
×
125
      }
126
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
8,407✔
127
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
128
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
8,407✔
129
      tDeleteMqDataRsp(&dataRsp);
8,407✔
130

131
      *pBlockReturned = true;
8,407✔
132
      return code;
8,407✔
133
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
30,517✔
134
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
30,937✔
135
                  " in vg %d, subkey %s, reset none failed",
136
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
137
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
31,032✔
138
    }
139
  }
140

141
  return 0;
139,288✔
142
}
143

144
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
3,608,777✔
145
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
146
  int32_t    code = TDB_CODE_SUCCESS;
3,608,777✔
147
  int32_t    lino = 0;
3,608,777✔
148
  tqDebug("%s called", __FUNCTION__ );
3,608,777✔
149
  uint64_t consumerId = pRequest->consumerId;
3,609,266✔
150
  int32_t  vgId = TD_VID(pTq->pVnode);
3,608,877✔
151
  terrno = 0;
3,608,877✔
152

153
  SMqDataRsp dataRsp = {0};
3,608,877✔
154
  code = tqInitDataRsp(&dataRsp, *pOffset);
3,608,877✔
155
  TSDB_CHECK_CODE(code, lino, end);
3,608,822✔
156

157
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
3,608,822✔
158
  TSDB_CHECK_CODE(code, lino, end);
3,608,877✔
159

160
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
3,608,877✔
161
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
3,608,877✔
162
    goto end;
44,205✔
163
  }
164

165
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
166
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
3,564,672✔
167
    // lock
168
    taosWLockLatch(&pTq->lock);
1,405,624✔
169
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
1,405,624✔
170
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
1,405,624✔
171
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
1,354,094✔
172
      taosWUnLockLatch(&pTq->lock);
1,354,094✔
173
      goto end;
1,354,094✔
174
    }
175
    taosWUnLockLatch(&pTq->lock);
51,530✔
176
  }
177

178
  // reqOffset represents the current date offset, may be changed if wal not exists
179
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
2,210,578✔
180
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
2,210,493✔
181

182
end:
3,608,690✔
183
  {
184
    char buf[TSDB_OFFSET_LEN] = {0};
3,608,690✔
185
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
3,608,792✔
186
    if (code != 0){
3,608,707✔
187
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
44,205✔
188
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
189
    } else {
190
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
3,564,502✔
191
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
192
    }
193

194
    tDeleteMqDataRsp(&dataRsp);
3,608,894✔
195
    return code;
3,608,877✔
196
  }
197
}
198

199
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
200
  SDecoder decoder = {0};                                                                                  \
201
  TYPE     req = {0};                                                                                      \
202
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
203
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
204
  tDecoderInit(&decoder, data, len);                                                                       \
205
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
206
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
207
            " msgType %d",                                                                                 \
208
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
209
    fetchVer++;                                                                                            \
210
    DELETE_FUNC(&req);                                                                                     \
211
    tDecoderClear(&decoder);                                                                               \
212
    continue;                                                                                              \
213
  }                                                                                                        \
214
  DELETE_FUNC(&req);                                                                                       \
215
  tDecoderClear(&decoder);
216

217
static void tDeleteCommon(void* parm) {}
4,660✔
218
static void tDeleteAlterTable(SVAlterTbReq* req) {
748✔
219
  taosArrayDestroy(req->pMultiTag);
748✔
220
}
748✔
221

222
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
223
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
224
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
225

226
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
51✔
227
  int32_t         code = 0;
51✔
228

229
  if (!btMetaRsp->batchMetaReq) {
51✔
230
    btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
51✔
231
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
51✔
232
    btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
51✔
233
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
51✔
234
  }
235

236
  SMqMetaRsp tmpMetaRsp = {0};
51✔
237
  tmpMetaRsp.resMsgType = type;
51✔
238
  tmpMetaRsp.metaRspLen = bodyLen;
51✔
239
  tmpMetaRsp.metaRsp = body;
51✔
240
  uint32_t len = 0;
51✔
241
  tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
51✔
242
  if (TSDB_CODE_SUCCESS != code) {
51✔
UNCOV
243
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
UNCOV
244
    goto END;
×
245
  }
246
  int32_t tLen = sizeof(SMqRspHead) + len;
51✔
247
  void*   tBuf = taosMemoryCalloc(1, tLen);
51✔
248
  TQ_NULL_GO_TO_END(tBuf);
51✔
249
  void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
51✔
250
  SEncoder encoder = {0};
51✔
251
  tEncoderInit(&encoder, metaBuff, len);
51✔
252
  code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
51✔
253
  tEncoderClear(&encoder);
51✔
254

255
  if (code < 0) {
51✔
UNCOV
256
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
UNCOV
257
    goto END;
×
258
  }
259
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
102✔
260
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
102✔
261

262
END:
51✔
263
  return code;
51✔
264
}
265

266
static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
17✔
267
  int32_t code = 0;
17✔
268
  SVCreateTbBatchReq pReq = {0};
17✔
269
  pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
17✔
270
  pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
17✔
271
  TQ_NULL_GO_TO_END(pReq.pArray);
17✔
272
  for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
85✔
273
    void   *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
68✔
274
    TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
136✔
275
  }
276
  tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
17✔
277
  if (code < 0) {
17✔
UNCOV
278
    goto END;
×
279
  }
280
  *len += sizeof(SMsgHead);
17✔
281
  *pBuf = taosMemoryMalloc(*len);
17✔
282
  TQ_NULL_GO_TO_END(pBuf);
17✔
283
  SEncoder coder = {0};
17✔
284
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
17✔
285
  code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
17✔
286
  tEncoderClear(&coder);
17✔
287

288
END:
17✔
289
  taosArrayDestroy(pReq.pArray);
17✔
290
  return code;
17✔
291
}
292

293
#define SEND_BATCH_META_RSP \
294
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
295
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
296
goto END;
297

298
#define SEND_DATA_RSP \
299
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
300
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
301
goto END;
302
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
1,067,168✔
303
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
304
  int32_t         vgId = TD_VID(pTq->pVnode);
1,067,168✔
305
  SMqDataRsp      taosxRsp = {0};
1,067,237✔
306
  SMqBatchMetaRsp btMetaRsp = {0};
1,067,168✔
307
  int32_t         code = 0;
1,067,168✔
308

309
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
1,067,168✔
310
  if (offset->type != TMQ_OFFSET__LOG) {
1,067,061✔
311
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
8,567✔
312

313
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
8,567✔
314
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
836✔
315
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
836✔
316
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
317
      goto END;
836✔
318
    }
319

320
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
7,731✔
321
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
322
    if (taosxRsp.blockNum > 0) {
7,731✔
323
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
7,595✔
324
      goto END;
7,595✔
325
    } else {
326
      tOffsetCopy(offset, &taosxRsp.rspOffset);
136✔
327
    }
328
  }
329

330
  if (offset->type == TMQ_OFFSET__LOG) {
1,058,639✔
331
    walReaderVerifyOffset(pHandle->pWalReader, offset);
1,058,718✔
332
    int64_t fetchVer = offset->version;
1,058,447✔
333

334
    uint64_t st = taosGetTimestampMs();
1,058,377✔
335
    int      totalRows = 0;
1,058,377✔
336
    int32_t  totalMetaRows = 0;
1,058,464✔
337
    while (1) {
14,070,107✔
338
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
15,128,571✔
339
      if (savedEpoch > pRequest->epoch) {
15,128,395✔
UNCOV
340
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
341
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
UNCOV
342
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
343
        goto END;
×
344
      }
345

346
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
15,127,194✔
347
        if (totalMetaRows > 0) {
1,024,621✔
348
          SEND_BATCH_META_RSP
68✔
349
        }
350
        SEND_DATA_RSP
2,049,174✔
351
      }
352

353
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
14,113,825✔
354
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
14,113,996✔
355
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
356

357
      // process meta
358
      if (pHead->msgType != TDMT_VND_SUBMIT) {
14,113,954✔
359
        if (totalRows > 0) {
9,630✔
360
          SEND_DATA_RSP
904✔
361
        }
362

363
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
9,178✔
364
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
8,859✔
365
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
2,805✔
366
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
6,054✔
367
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteAlterTable)
748✔
368
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
6,685✔
369
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
4,575✔
370
          } else if (pHead->msgType == TDMT_VND_DELETE) {
731✔
371
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
85✔
372
          }
373
        }
374

375
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
3,449✔
376
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
377
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
3,449✔
378
          SMqMetaRsp metaRsp = {0};
3,415✔
379
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
3,415✔
380
          metaRsp.resMsgType = pHead->msgType;
3,415✔
381
          metaRsp.metaRspLen = pHead->bodyLen;
3,415✔
382
          metaRsp.metaRsp = pHead->body;
3,415✔
383
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
3,415✔
384
          goto END;
3,415✔
385
        }
386
        code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
34✔
387
        fetchVer++;
34✔
388
        if (code != 0){
34✔
UNCOV
389
          goto END;
×
390
        }
391
        totalMetaRows++;
34✔
392
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
68✔
UNCOV
393
          SEND_BATCH_META_RSP
×
394
        }
395
        continue;
34✔
396
      }
397

398
      if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
14,104,366✔
UNCOV
399
        SEND_BATCH_META_RSP
×
400
      }
401

402
      // process data
403
      SPackedData submit = {
42,312,840✔
404
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
14,104,366✔
405
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
14,104,280✔
406
          .ver = pHead->version,
14,104,280✔
407
      };
408

409
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
14,104,280✔
410

411
      if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
14,093,070✔
412
        int32_t len = 0;
17✔
413
        void *pBuf = NULL;
17✔
414
        code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
17✔
415
        if (code == 0){
17✔
416
          code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
17✔
417
        }
418
        taosMemoryFree(pBuf);
17✔
419
        for (int i = 0; i < taosArrayGetSize(taosxRsp.createTableReq); i++) {
85✔
420
          void* pCreateTbReq = taosArrayGetP(taosxRsp.createTableReq, i);
68✔
421
          if (pCreateTbReq != NULL) {
68✔
422
            tDestroySVSubmitCreateTbReq(pCreateTbReq, TSDB_MSG_FLG_DECODE);
68✔
423
          }
424
          taosMemoryFree(pCreateTbReq);
68✔
425
        }
426
        taosArrayDestroy(taosxRsp.createTableReq);
17✔
427
        taosxRsp.createTableReq = NULL;
17✔
428
        fetchVer++;
17✔
429
        if (code != 0){
17✔
UNCOV
430
          goto END;
×
431
        }
432
        totalMetaRows++;
17✔
433
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) ||
17✔
434
            (taosGetTimestampMs() - st > pRequest->timeout) ||
17✔
435
            (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
17✔
436
          SEND_BATCH_META_RSP
34✔
437
        }
UNCOV
438
        continue;
×
439
      }
440

441
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
14,099,381✔
442
          (taosGetTimestampMs() - st > pRequest->timeout) ||
14,066,676✔
443
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
14,071,412✔
UNCOV
444
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
×
445
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
38,198✔
UNCOV
446
          terrno = 0;
×
447
        } else{
448
          fetchVer++;
30,301✔
449
        }
450
        SEND_DATA_RSP
60,602✔
451
      } else {
452
        fetchVer++;
14,061,546✔
453
      }
454
    }
455
  }
456

457
END:
1,067,029✔
458
  if (code != 0){
1,066,698✔
UNCOV
459
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
460
            pRequest->subKey);
461
  }
462
  tDeleteMqBatchMetaRsp(&btMetaRsp);
1,066,698✔
463
  tDeleteSTaosxRsp(&taosxRsp);
1,066,611✔
464
  return code;
1,065,657✔
465
}
466

467
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
4,715,413✔
468
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
4,715,413✔
UNCOV
469
    return TSDB_CODE_TMQ_INVALID_MSG;
×
470
  }
471
  int32_t      code = 0;
4,715,553✔
472
  STqOffsetVal reqOffset = {0};
4,715,553✔
473
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
4,715,553✔
474

475
  // reset the offset if needed
476
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
4,715,553✔
477
    bool blockReturned = false;
223,592✔
478
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
223,592✔
479
    if (code != 0) {
223,537✔
480
      goto END;
31,032✔
481
    }
482

483
    // empty block returned, quit
484
    if (blockReturned) {
192,505✔
485
      goto END;
8,407✔
486
    }
487
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
4,491,952✔
UNCOV
488
    uError("req offset type is 0");
×
UNCOV
489
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
UNCOV
490
    goto END;
×
491
  }
492

493
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4,675,926✔
494
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
3,608,295✔
495
  } else {
496
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
1,067,237✔
497
  }
498

499
END:
4,715,087✔
500
  if (code != 0){
4,714,615✔
501
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
75,237✔
502
  }
503
  tOffsetDestroy(&reqOffset);
4,714,615✔
504
  return code;
4,714,578✔
505
}
506

507
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
3,358,955✔
508
                          int64_t ever) {
509
  if (pMsgHead == NULL) {
3,358,955✔
UNCOV
510
    return;
×
511
  }
512
  pMsgHead->consumerId = consumerId;
3,358,955✔
513
  pMsgHead->epoch = epoch;
3,359,040✔
514
  pMsgHead->mqMsgType = type;
3,359,188✔
515
  pMsgHead->walsver = sver;
3,358,866✔
516
  pMsgHead->walever = ever;
3,358,842✔
517
}
518

519
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
887✔
520
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
521
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
887✔
UNCOV
522
    return TSDB_CODE_TMQ_INVALID_MSG;
×
523
  }
524
  int32_t len = 0;
887✔
525
  int32_t code = 0;
887✔
526
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
887✔
527
  if (code < 0) {
887✔
UNCOV
528
    return TAOS_GET_TERRNO(code);
×
529
  }
530
  int32_t tlen = sizeof(SMqRspHead) + len;
887✔
531
  void*   buf = rpcMallocCont(tlen);
887✔
532
  if (buf == NULL) {
887✔
UNCOV
533
    return TAOS_GET_TERRNO(terrno);
×
534
  }
535

536
  int64_t sver = 0, ever = 0;
887✔
537
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
887✔
538
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
887✔
539

540
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
887✔
541

542
  SEncoder encoder = {0};
887✔
543
  tEncoderInit(&encoder, abuf, len);
887✔
544
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
887✔
545
  tEncoderClear(&encoder);
887✔
546
  if (code < 0) {
887✔
UNCOV
547
    rpcFreeCont(buf);
×
UNCOV
548
    return TAOS_GET_TERRNO(code);
×
549
  }
550
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
887✔
551

552
  tmsgSendRsp(&resp);
887✔
553
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
887✔
554
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
555

556
  return 0;
887✔
557
}
558

559
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
3,415✔
560
                          int32_t vgId) {
561
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
3,415✔
UNCOV
562
    return TSDB_CODE_TMQ_INVALID_MSG;
×
563
  }
564
  int32_t len = 0;
3,415✔
565
  int32_t code = 0;
3,415✔
566
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
3,415✔
567
  if (code < 0) {
3,415✔
UNCOV
568
    return TAOS_GET_TERRNO(code);
×
569
  }
570
  int32_t tlen = sizeof(SMqRspHead) + len;
3,415✔
571
  void*   buf = rpcMallocCont(tlen);
3,415✔
572
  if (buf == NULL) {
3,415✔
UNCOV
573
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
574
  }
575

576
  int64_t sver = 0, ever = 0;
3,415✔
577
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
3,415✔
578
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
3,415✔
579

580
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
3,415✔
581

582
  SEncoder encoder = {0};
3,415✔
583
  tEncoderInit(&encoder, abuf, len);
3,415✔
584
  code = tEncodeMqMetaRsp(&encoder, pRsp);
3,415✔
585
  tEncoderClear(&encoder);
3,415✔
586
  if (code < 0) {
3,415✔
UNCOV
587
    rpcFreeCont(buf);
×
UNCOV
588
    return TAOS_GET_TERRNO(code);
×
589
  }
590

591
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
3,415✔
592

593
  tmsgSendRsp(&resp);
3,415✔
594
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
3,415✔
595
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
596

597
  return 0;
3,415✔
598
}
599

600
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
3,355,246✔
601
                        int32_t type, int64_t sver, int64_t ever) {
602
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
3,355,246✔
UNCOV
603
    return TSDB_CODE_TMQ_INVALID_MSG;
×
604
  }
605
  int32_t len = 0;
3,355,255✔
606
  int32_t code = 0;
3,355,255✔
607

608
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
3,355,255✔
UNCOV
609
    pRsp->withSchema = 0;
×
610
  }
611
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
3,355,255✔
612
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
377✔
613
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
614
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
3,354,878✔
615
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
377✔
616
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
377✔
617
  }
618

619
  if (code < 0) {
3,354,728✔
UNCOV
620
    return TAOS_GET_TERRNO(code);
×
621
  }
622

623
  int32_t tlen = sizeof(SMqRspHead) + len;
3,354,728✔
624
  void*   buf = rpcMallocCont(tlen);
3,354,728✔
625
  if (buf == NULL) {
3,354,862✔
UNCOV
626
    return terrno;
×
627
  }
628

629
  SMqRspHead* pHead = (SMqRspHead*)buf;
3,354,862✔
630
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
3,354,862✔
631

632
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
3,354,634✔
633

634
  SEncoder encoder = {0};
3,354,737✔
635
  tEncoderInit(&encoder, abuf, len);
3,354,956✔
636

637
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
3,354,837✔
638
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
377✔
639
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
640
    code = tEncodeMqDataRsp(&encoder, pRsp);
3,354,460✔
641
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
377✔
642
    code = tEncodeSTaosxRsp(&encoder, pRsp);
377✔
643
  }
644
  tEncoderClear(&encoder);
3,354,406✔
645
  if (code < 0) {
3,354,190✔
UNCOV
646
    rpcFreeCont(buf);
×
UNCOV
647
    return TAOS_GET_TERRNO(code);
×
648
  }
649
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
3,354,190✔
650

651
  tmsgSendRsp(&rsp);
3,353,732✔
652
  return 0;
3,354,746✔
653
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc