• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4943

30 Jan 2026 06:19AM UTC coverage: 66.718% (-0.07%) from 66.788%
#4943

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1122 of 2018 new or added lines in 72 files covered. (55.6%)

823 existing lines in 156 files now uncovered.

204811 of 306978 relevant lines covered (66.72%)

123993567.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.93
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
28,123,006✔
24
  int32_t    code = TDB_CODE_SUCCESS;
28,123,006✔
25
  int32_t    lino = 0;
28,123,006✔
26
  tqDebug("%s called", __FUNCTION__ );
28,123,006✔
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
28,123,006✔
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
28,123,006✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
28,123,006✔
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
28,123,006✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
28,123,006✔
34

35
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
28,123,006✔
36
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
28,123,006✔
37
  
38
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
28,122,674✔
39
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
28,123,006✔
40
  pRsp->withTbName = 0;
28,123,006✔
41
  pRsp->withSchema = 1;
28,123,006✔
42

43
END:
28,122,674✔
44
  if (code != 0){
28,122,674✔
45
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
46
    taosArrayDestroy(pRsp->blockData);
×
47
    taosArrayDestroy(pRsp->blockDataLen);
×
48
    taosArrayDestroy(pRsp->blockSchema);
×
49
  }
50
  return code;
28,122,010✔
51
}
52

53
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
7,103,691✔
54
  int32_t    code = TDB_CODE_SUCCESS;
7,103,691✔
55
  int32_t    lino = 0;
7,103,691✔
56
  tqDebug("%s called", __FUNCTION__ );
7,103,691✔
57
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
7,104,899✔
58
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
7,104,899✔
59
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
7,104,899✔
60

61
  pRsp->withTbName = 1;
7,104,899✔
62
  pRsp->withSchema = 1;
7,104,899✔
63
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
7,104,899✔
64
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
7,104,899✔
65

66
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
7,104,593✔
67
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
7,104,302✔
68

69
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
7,104,328✔
70
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
7,104,567✔
71

72
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
7,104,261✔
73
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
7,104,899✔
74

75
END:
7,104,328✔
76
  if (code != 0){
7,104,328✔
77
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
78
    taosArrayDestroy(pRsp->blockData);
×
79
    taosArrayDestroy(pRsp->blockDataLen);
×
80
    taosArrayDestroy(pRsp->blockTbName);
×
81
    taosArrayDestroy(pRsp->blockSchema);
×
82
  }
83
  return code;
7,104,593✔
84
}
85

86
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
670,484✔
87
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
88
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
670,484✔
89
    return TSDB_CODE_INVALID_PARA;
×
90
  }
91
  uint64_t   consumerId = pRequest->consumerId;
671,401✔
92
  STqOffset* pOffset = NULL;
671,401✔
93
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
671,401✔
94
  int32_t    vgId = TD_VID(pTq->pVnode);
669,817✔
95

96
  *pBlockReturned = false;
671,401✔
97
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
98
  if (code == 0) {
670,799✔
99
    tOffsetCopy(pOffsetVal, &pOffset->val);
68,497✔
100

101
    char formatBuf[TSDB_OFFSET_LEN] = {0};
68,497✔
102
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
68,497✔
103
    tqDebug("tmq poll: consumer:0x%" PRIx64
68,497✔
104
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
105
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
106
    return 0;
68,497✔
107
  } else {
108
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
109
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
602,302✔
110
      if (pRequest->useSnapshot) {
469,309✔
111
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
230,667✔
112
                consumerId, pHandle->subKey, vgId);
113
        if (pHandle->fetchMeta) {
231,261✔
114
          tqOffsetResetToMeta(pOffsetVal, 0);
115
        } else {
116
          SValue val = {0};
226,500✔
117
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
×
118
        }
119
      } else {
120
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
238,385✔
121
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
238,936✔
122
      }
123
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
132,373✔
124
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
15,874✔
125
      SMqDataRsp dataRsp = {0};
15,874✔
126
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
15,874✔
127

128
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
15,874✔
129
      if (code != 0) {
15,874✔
130
        return code;
×
131
      }
132
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
15,874✔
133
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
134
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
15,874✔
135
      tDeleteMqDataRsp(&dataRsp);
15,874✔
136

137
      *pBlockReturned = true;
15,874✔
138
      return code;
15,874✔
139
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
116,499✔
140
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
116,499✔
141
                  " in vg %d, subkey %s, reset none failed",
142
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
143
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
116,832✔
144
    }
145
  }
146

147
  return 0;
469,590✔
148
}
149

150
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
27,944,958✔
151
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
152
  int32_t    code = TDB_CODE_SUCCESS;
27,944,958✔
153
  int32_t    lino = 0;
27,944,958✔
154
  tqDebug("%s called", __FUNCTION__ );
27,944,958✔
155
  uint64_t consumerId = pRequest->consumerId;
27,944,958✔
156
  int32_t  vgId = TD_VID(pTq->pVnode);
27,944,958✔
157
  terrno = 0;
27,944,958✔
158

159
  SMqDataRsp dataRsp = {0};
27,944,958✔
160
  code = tqInitDataRsp(&dataRsp, *pOffset);
27,944,958✔
161
  TSDB_CHECK_CODE(code, lino, end);
27,943,962✔
162

163
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
27,943,962✔
164
  TSDB_CHECK_CODE(code, lino, end);
27,944,635✔
165

166
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
27,944,635✔
167
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
27,944,958✔
168
    goto end;
182,044✔
169
  }
170

171
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
172
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
27,762,914✔
173
    // lock
174
    taosWLockLatch(&pTq->lock);
12,647,935✔
175
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
12,647,935✔
176
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
12,647,935✔
177
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
12,381,482✔
178
      taosWUnLockLatch(&pTq->lock);
12,381,482✔
179
      goto end;
12,381,482✔
180
    }
181
    taosWUnLockLatch(&pTq->lock);
266,453✔
182
  }
183

184
  // reqOffset represents the current date offset, may be changed if wal not exists
185
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
15,381,432✔
186
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
15,381,109✔
187

188
end:
27,944,294✔
189
  {
190
    char buf[TSDB_OFFSET_LEN] = {0};
27,944,294✔
191
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
27,944,294✔
192
    if (code != 0){
27,944,294✔
193
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
182,044✔
194
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
195
    } else {
196
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
27,762,250✔
197
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
198
    }
199

200
    tDeleteMqDataRsp(&dataRsp);
27,944,958✔
201
    return code;
27,944,614✔
202
  }
203
}
204

205
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
206
  SDecoder decoder = {0};                                                                                  \
207
  TYPE     req = {0};                                                                                      \
208
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
209
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
210
  tDecoderInit(&decoder, data, len);                                                                       \
211
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
212
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
213
            " msgType %d",                                                                                 \
214
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
215
    fetchVer++;                                                                                            \
216
    DELETE_FUNC(&req);                                                                                     \
217
    tDecoderClear(&decoder);                                                                               \
218
    continue;                                                                                              \
219
  }                                                                                                        \
220
  DELETE_FUNC(&req);                                                                                       \
221
  tDecoderClear(&decoder);
222

223
static void tDeleteCommon(void* parm) {}
71,909✔
224
static void tDeleteAlterTable(SVAlterTbReq* req) {
11,660✔
225
  taosArrayDestroy(req->pMultiTag);
11,660✔
226
}
11,660✔
227

228
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
229
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
230
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
231

232
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
1,656✔
233
  int32_t         code = 0;
1,656✔
234

235
  if (!btMetaRsp->batchMetaReq) {
1,656✔
236
    btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
1,656✔
237
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
1,656✔
238
    btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
1,656✔
239
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
1,656✔
240
  }
241

242
  SMqMetaRsp tmpMetaRsp = {0};
1,656✔
243
  tmpMetaRsp.resMsgType = type;
1,656✔
244
  tmpMetaRsp.metaRspLen = bodyLen;
1,656✔
245
  tmpMetaRsp.metaRsp = body;
1,656✔
246
  uint32_t len = 0;
1,656✔
247
  tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
1,656✔
248
  if (TSDB_CODE_SUCCESS != code) {
1,656✔
249
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
250
    goto END;
×
251
  }
252
  int32_t tLen = sizeof(SMqRspHead) + len;
1,656✔
253
  void*   tBuf = taosMemoryCalloc(1, tLen);
1,656✔
254
  TQ_NULL_GO_TO_END(tBuf);
1,656✔
255
  void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
1,656✔
256
  SEncoder encoder = {0};
1,656✔
257
  tEncoderInit(&encoder, metaBuff, len);
1,656✔
258
  code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
1,656✔
259
  tEncoderClear(&encoder);
1,656✔
260

261
  if (code < 0) {
1,656✔
262
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
263
    goto END;
×
264
  }
265
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
3,312✔
266
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
3,312✔
267

268
END:
1,656✔
269
  return code;
1,656✔
270
}
271

272
static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
1,126✔
273
  int32_t code = 0;
1,126✔
274
  SVCreateTbBatchReq pReq = {0};
1,126✔
275
  pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
1,126✔
276
  pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
1,126✔
277
  TQ_NULL_GO_TO_END(pReq.pArray);
1,126✔
278
  for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
3,047✔
279
    void   *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
1,921✔
280
    TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
3,842✔
281
  }
282
  tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
1,126✔
283
  if (code < 0) {
1,126✔
284
    goto END;
×
285
  }
286
  *len += sizeof(SMsgHead);
1,126✔
287
  *pBuf = taosMemoryMalloc(*len);
1,126✔
288
  TQ_NULL_GO_TO_END(pBuf);
1,126✔
289
  SEncoder coder = {0};
1,126✔
290
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
1,126✔
291
  code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
1,126✔
292
  tEncoderClear(&coder);
1,126✔
293

294
END:
1,126✔
295
  taosArrayDestroy(pReq.pArray);
1,126✔
296
  return code;
1,126✔
297
}
298

299
#define SEND_BATCH_META_RSP \
300
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
301
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
302
goto END;
303

304
#define SEND_DATA_RSP \
305
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
306
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
307
goto END;
308

309
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
7,103,946✔
310
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
311
  int32_t         vgId = TD_VID(pTq->pVnode);
7,103,946✔
312
  SMqDataRsp      taosxRsp = {0};
7,104,303✔
313
  SMqBatchMetaRsp btMetaRsp = {0};
7,104,303✔
314
  int32_t         code = 0;
7,104,568✔
315

316
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
7,104,568✔
317
  if (offset->type != TMQ_OFFSET__LOG) {
7,104,567✔
318
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
44,891✔
319

320
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
44,891✔
321
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
4,232✔
322
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
4,232✔
323
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
324
      goto END;
4,232✔
325
    }
326

327
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
40,659✔
328
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
329
    if (taosxRsp.blockNum > 0) {
40,659✔
330
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
38,595✔
331
      goto END;
38,595✔
332
    } else {
333
      tOffsetCopy(offset, &taosxRsp.rspOffset);
2,064✔
334
    }
335
  }
336

337
  if (offset->type == TMQ_OFFSET__LOG) {
7,061,475✔
338
    walReaderVerifyOffset(pHandle->pWalReader, offset);
7,062,072✔
339
    int64_t fetchVer = offset->version;
7,061,766✔
340

341
    uint64_t st = taosGetTimestampMs();
7,061,807✔
342
    int      totalRows = 0;
7,061,807✔
343
    int32_t  totalMetaRows = 0;
7,061,169✔
344
    while (1) {
40,339,910✔
345
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
47,401,079✔
346
      if (savedEpoch > pRequest->epoch) {
47,410,921✔
347
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
348
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
349
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
350
        goto END;
×
351
      }
352

353
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
47,427,937✔
354
        if (totalMetaRows > 0) {
6,856,228✔
355
          SEND_BATCH_META_RSP
1,060✔
356
        }
357
        SEND_DATA_RSP
13,711,396✔
358
      }
359

360
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
40,645,297✔
361
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
40,645,564✔
362
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
363

364
      // process meta
365
      if (pHead->msgType != TDMT_VND_SUBMIT) {
40,645,035✔
366
        if (totalRows > 0) {
149,030✔
367
          SEND_DATA_RSP
13,344✔
368
        }
369

370
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
142,358✔
371
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
137,364✔
372
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
43,725✔
373
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
93,639✔
374
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteAlterTable)
11,660✔
375
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
102,743✔
376
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
70,584✔
377
          } else if (pHead->msgType == TDMT_VND_DELETE) {
11,395✔
378
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
1,325✔
379
          }
380
        }
381

382
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
53,053✔
383
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
384
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
53,053✔
385
          SMqMetaRsp metaRsp = {0};
52,523✔
386
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
52,523✔
387
          metaRsp.resMsgType = pHead->msgType;
52,523✔
388
          metaRsp.metaRspLen = pHead->bodyLen;
52,523✔
389
          metaRsp.metaRsp = pHead->body;
52,523✔
390
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
52,523✔
391
          goto END;
52,523✔
392
        }
393
        code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
530✔
394
        fetchVer++;
530✔
395
        if (code != 0){
530✔
396
          goto END;
×
397
        }
398
        totalMetaRows++;
530✔
399
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
1,060✔
400
          SEND_BATCH_META_RSP
×
401
        }
402
        continue;
530✔
403
      }
404

405
      if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
40,495,915✔
406
        SEND_BATCH_META_RSP
×
407
      }
408

409
      // process data
410
      SPackedData submit = {
121,487,142✔
411
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
40,495,915✔
412
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
40,495,915✔
413
          .ver = pHead->version,
40,495,915✔
414
      };
415

416
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
40,495,312✔
417

418
      if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
40,399,678✔
419
        int32_t len = 0;
1,126✔
420
        void *pBuf = NULL;
1,126✔
421
        code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
1,126✔
422
        if (code == 0){
1,126✔
423
          code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
1,126✔
424
        }
425
        taosMemoryFree(pBuf);
1,126✔
426
        for (int i = 0; i < taosArrayGetSize(taosxRsp.createTableReq); i++) {
3,047✔
427
          void* pCreateTbReq = taosArrayGetP(taosxRsp.createTableReq, i);
1,921✔
428
          if (pCreateTbReq != NULL) {
1,921✔
429
            tDestroySVSubmitCreateTbReq(pCreateTbReq, TSDB_MSG_FLG_DECODE);
1,921✔
430
          }
431
          taosMemoryFree(pCreateTbReq);
1,921✔
432
        }
433
        taosArrayDestroy(taosxRsp.createTableReq);
1,126✔
434
        taosxRsp.createTableReq = NULL;
1,126✔
435
        fetchVer++;
1,126✔
436
        if (code != 0){
1,126✔
437
          goto END;
×
438
        }
439
        totalMetaRows++;
1,126✔
440
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) ||
1,126✔
441
            (taosGetTimestampMs() - st > pRequest->timeout) ||
1,126✔
442
            (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
1,126✔
443
          SEND_BATCH_META_RSP
2,252✔
444
        }
445
        continue;
×
446
      }
447

448
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
40,461,435✔
449
          (taosGetTimestampMs() - st > pRequest->timeout) ||
40,281,012✔
450
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
40,318,969✔
451
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
×
452
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
204,127✔
453
          terrno = 0;
×
454
        } else{
455
          fetchVer++;
145,523✔
456
        }
457
        SEND_DATA_RSP
291,046✔
458
      } else {
459
        fetchVer++;
40,241,353✔
460
      }
461
    }
462
  }
463

464
END:
7,103,973✔
465
  if (code != 0){
7,103,313✔
466
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
467
            pRequest->subKey);
468
  }
469
  tDeleteMqBatchMetaRsp(&btMetaRsp);
7,103,313✔
470
  tDeleteSTaosxRsp(&taosxRsp);
7,103,602✔
471
  return code;
7,099,850✔
472
}
473

474
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
35,181,567✔
475
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
35,181,567✔
UNCOV
476
    return TSDB_CODE_TMQ_INVALID_MSG;
×
477
  }
478
  int32_t      code = 0;
35,182,563✔
479
  STqOffsetVal reqOffset = {0};
35,182,563✔
480
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
35,182,563✔
481

482
  // reset the offset if needed
483
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
35,182,231✔
484
    bool blockReturned = false;
671,401✔
485
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
671,401✔
486
    if (code != 0) {
670,793✔
487
      goto END;
116,832✔
488
    }
489

490
    // empty block returned, quit
491
    if (blockReturned) {
553,961✔
492
      goto END;
15,874✔
493
    }
494
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
34,510,856✔
495
    uError("req offset type is 0");
×
496
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
497
    goto END;
×
498
  }
499

500
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
35,048,919✔
501
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
27,944,928✔
502
  } else {
503
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
7,104,012✔
504
  }
505

506
END:
35,179,154✔
507
  if (code != 0){
35,178,456✔
508
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
298,567✔
509
  }
510
  tOffsetDestroy(&reqOffset);
35,178,765✔
511
  return code;
35,180,919✔
512
}
513

514
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
22,660,593✔
515
                          int64_t ever) {
516
  if (pMsgHead == NULL) {
22,660,593✔
517
    return;
×
518
  }
519
  pMsgHead->consumerId = consumerId;
22,660,593✔
520
  pMsgHead->epoch = epoch;
22,662,786✔
521
  pMsgHead->mqMsgType = type;
22,662,148✔
522
  pMsgHead->walsver = sver;
22,661,921✔
523
  pMsgHead->walever = ever;
22,662,518✔
524
}
525

526
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
5,888✔
527
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
528
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
5,888✔
529
    return TSDB_CODE_TMQ_INVALID_MSG;
×
530
  }
531
  int32_t len = 0;
5,888✔
532
  int32_t code = 0;
5,888✔
533
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
5,888✔
534
  if (code < 0) {
5,888✔
535
    return TAOS_GET_TERRNO(code);
×
536
  }
537
  int32_t tlen = sizeof(SMqRspHead) + len;
5,888✔
538
  void*   buf = rpcMallocCont(tlen);
5,888✔
539
  if (buf == NULL) {
5,888✔
540
    return TAOS_GET_TERRNO(terrno);
×
541
  }
542

543
  int64_t sver = 0, ever = 0;
5,888✔
544
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
5,888✔
545
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
5,888✔
546

547
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
5,888✔
548

549
  SEncoder encoder = {0};
5,888✔
550
  tEncoderInit(&encoder, abuf, len);
5,888✔
551
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
5,888✔
552
  tEncoderClear(&encoder);
5,888✔
553
  if (code < 0) {
5,888✔
554
    rpcFreeCont(buf);
×
555
    return TAOS_GET_TERRNO(code);
×
556
  }
557
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
5,888✔
558

559
  tmsgSendRsp(&resp);
5,888✔
560
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
5,888✔
561
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
562

563
  return 0;
5,888✔
564
}
565

566
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
52,523✔
567
                          int32_t vgId) {
568
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
52,523✔
569
    return TSDB_CODE_TMQ_INVALID_MSG;
×
570
  }
571
  int32_t len = 0;
52,523✔
572
  int32_t code = 0;
52,523✔
573
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
52,523✔
574
  if (code < 0) {
52,523✔
575
    return TAOS_GET_TERRNO(code);
×
576
  }
577
  int32_t tlen = sizeof(SMqRspHead) + len;
52,523✔
578
  void*   buf = rpcMallocCont(tlen);
52,523✔
579
  if (buf == NULL) {
52,523✔
580
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
581
  }
582

583
  int64_t sver = 0, ever = 0;
52,523✔
584
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
52,523✔
585
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
52,523✔
586

587
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
52,523✔
588

589
  SEncoder encoder = {0};
52,523✔
590
  tEncoderInit(&encoder, abuf, len);
52,523✔
591
  code = tEncodeMqMetaRsp(&encoder, pRsp);
52,523✔
592
  tEncoderClear(&encoder);
52,523✔
593
  if (code < 0) {
52,523✔
594
    rpcFreeCont(buf);
×
595
    return TAOS_GET_TERRNO(code);
×
596
  }
597

598
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
52,523✔
599

600
  tmsgSendRsp(&resp);
52,523✔
601
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
52,523✔
602
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
603

604
  return 0;
52,523✔
605
}
606

607
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
22,605,968✔
608
                        int32_t type, int64_t sver, int64_t ever) {
609
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
22,605,968✔
610
    return TSDB_CODE_TMQ_INVALID_MSG;
×
611
  }
612
  int32_t len = 0;
22,605,968✔
613
  int32_t code = 0;
22,605,968✔
614

615
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
22,605,968✔
616
    pRsp->withSchema = 0;
×
617
  }
618
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
22,605,968✔
619
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
4,387✔
620
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
621
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
22,601,581✔
622
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
4,387✔
623
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
4,387✔
624
  }
625

626
  if (code < 0) {
22,603,101✔
627
    return TAOS_GET_TERRNO(code);
×
628
  }
629

630
  int32_t tlen = sizeof(SMqRspHead) + len;
22,603,101✔
631
  void*   buf = rpcMallocCont(tlen);
22,603,101✔
632
  if (buf == NULL) {
22,601,518✔
633
    return terrno;
×
634
  }
635

636
  SMqRspHead* pHead = (SMqRspHead*)buf;
22,601,518✔
637
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
22,601,518✔
638

639
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
22,602,795✔
640

641
  SEncoder encoder = {0};
22,604,088✔
642
  tEncoderInit(&encoder, abuf, len);
22,604,988✔
643

644
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
22,605,304✔
645
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
4,387✔
646
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
647
    code = tEncodeMqDataRsp(&encoder, pRsp);
22,600,917✔
648
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
4,387✔
649
    code = tEncodeSTaosxRsp(&encoder, pRsp);
4,387✔
650
  }
651
  tEncoderClear(&encoder);
22,604,681✔
652
  if (code < 0) {
22,604,395✔
653
    rpcFreeCont(buf);
×
654
    return TAOS_GET_TERRNO(code);
×
655
  }
656
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
22,604,395✔
657

658
  tmsgSendRsp(&rsp);
22,605,652✔
659
  return 0;
22,605,348✔
660
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc