• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5046

07 May 2026 07:27AM UTC coverage: 73.144% (-0.02%) from 73.161%
#5046

push

travis-ci

web-flow
feat(parser): add support for QUARTER duration alias and related tests (#35268)

6 of 6 new or added lines in 2 files covered. (100.0%)

590 existing lines in 139 files now uncovered.

277660 of 379607 relevant lines covered (73.14%)

134951918.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.78
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
// Response header initialization (shared helper)
19

20
void tqInitMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
338,042,624✔
21
                   int64_t ever) {
22
  if (pMsgHead == NULL) {
338,042,624✔
23
    return;
×
24
  }
25
  pMsgHead->consumerId = consumerId;
338,042,624✔
26
  pMsgHead->epoch = epoch;
338,054,089✔
27
  pMsgHead->mqMsgType = type;
338,053,049✔
28
  pMsgHead->walsver = sver;
338,044,018✔
29
  pMsgHead->walever = ever;
338,050,486✔
30
}
31

32
// Data response initialization
33

34
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
184,374,898✔
35
  int32_t code = TSDB_CODE_SUCCESS;
184,374,898✔
36
  int32_t lino = 0;
184,374,898✔
37
  tqDebug("%s called", __FUNCTION__);
184,374,898✔
38
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
184,375,286✔
39

40
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
184,375,286✔
41
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
184,374,249✔
42

43
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
184,372,216✔
44
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
184,370,846✔
45

46
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
184,370,389✔
47
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
184,370,212✔
48

49
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
184,367,104✔
50
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
184,373,050✔
51
  pRsp->withTbName = 0;
184,374,439✔
52
  pRsp->withSchema = 1;
184,375,063✔
53

54
END:
184,373,298✔
55
  if (code != 0) {
184,373,298✔
56
    tqError("%s failed at:%d, code:%s", __FUNCTION__, lino, tstrerror(code));
×
57
    taosArrayDestroy(pRsp->blockData);
×
58
    taosArrayDestroy(pRsp->blockDataLen);
×
59
    taosArrayDestroy(pRsp->blockSchema);
×
60
  }
61
  return code;
184,363,554✔
62
}
63

64
// Send data response functions
65

66
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
337,977,771✔
67
                        int32_t type, int64_t sver, int64_t ever) {
68
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
337,977,771✔
UNCOV
69
    return TSDB_CODE_INVALID_PARA;
×
70
  }
71
  int32_t len = 0;
337,979,626✔
72
  int32_t code = 0;
337,979,626✔
73

74
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
337,979,626✔
75
    pRsp->withSchema = 0;
×
76
  }
77
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP ||
337,979,626✔
78
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
79
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
337,973,623✔
80
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
6,003✔
81
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
6,003✔
82
  }
83

84
  if (code < 0) {
337,962,638✔
85
    return TAOS_GET_TERRNO(code);
×
86
  }
87

88
  int32_t tlen = sizeof(SMqRspHead) + len;
337,962,638✔
89
  void*   buf = rpcMallocCont(tlen);
337,962,638✔
90
  if (buf == NULL) {
337,947,305✔
91
    return terrno;
×
92
  }
93

94
  SMqRspHead* pHead = (SMqRspHead*)buf;
337,947,305✔
95
  tqInitMqRspHead(pHead, type, epoch, consumerId, sver, ever);
337,947,305✔
96

97
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
337,948,370✔
98

99
  SEncoder encoder = {0};
337,962,277✔
100
  tEncoderInit(&encoder, abuf, len);
337,971,393✔
101

102
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP ||
337,956,128✔
103
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
104
    code = tEncodeMqDataRsp(&encoder, pRsp);
337,950,125✔
105
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
6,003✔
106
    code = tEncodeSTaosxRsp(&encoder, pRsp);
6,003✔
107
  }
108
  tEncoderClear(&encoder);
337,945,166✔
109
  if (code < 0) {
337,917,086✔
110
    rpcFreeCont(buf);
×
111
    return TAOS_GET_TERRNO(code);
×
112
  }
113
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
337,917,086✔
114

115
  tmsgSendRsp(&rsp);
337,921,780✔
116
  return 0;
337,914,224✔
117
}
118

119
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp,
337,973,225✔
120
                      int32_t type, int32_t vgId) {
121
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
337,973,225✔
122
    return TSDB_CODE_INVALID_PARA;
×
123
  }
124
  int64_t sver = 0, ever = 0;
337,979,339✔
125
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
337,979,029✔
126

127
  char buf1[TSDB_OFFSET_LEN] = {0};
337,977,366✔
128
  char buf2[TSDB_OFFSET_LEN] = {0};
337,977,366✔
129
  (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
337,977,372✔
130
  (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
337,972,677✔
131

132
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) start to send rsp, block num:%d, req:%s, rsp:%s, QID:0x%" PRIx64,
337,977,463✔
133
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
134

135
  return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
337,980,406✔
136
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc