• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5060

17 May 2026 01:15AM UTC coverage: 73.425% (-0.02%) from 73.443%
#5060

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281800 of 383795 relevant lines covered (73.42%)

134332207.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.78
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
// Response header initialization (shared helper)
19

20
void tqInitMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
316,646,246✔
21
                   int64_t ever) {
22
  if (pMsgHead == NULL) {
316,646,246✔
23
    return;
×
24
  }
25
  pMsgHead->consumerId = consumerId;
316,646,246✔
26
  pMsgHead->epoch = epoch;
316,670,516✔
27
  pMsgHead->mqMsgType = type;
316,660,743✔
28
  pMsgHead->walsver = sver;
316,648,288✔
29
  pMsgHead->walever = ever;
316,650,964✔
30
}
31

32
// Data response initialization
33

34
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
185,727,728✔
35
  int32_t code = TSDB_CODE_SUCCESS;
185,727,728✔
36
  int32_t lino = 0;
185,727,728✔
37
  tqDebug("%s called", __FUNCTION__);
185,727,728✔
38
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
185,731,671✔
39

40
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
185,731,671✔
41
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
185,727,176✔
42

43
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
185,723,308✔
44
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
185,720,329✔
45

46
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
185,714,829✔
47
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
185,705,362✔
48

49
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
185,707,628✔
50
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
185,725,160✔
51
  pRsp->withTbName = 0;
185,728,460✔
52
  pRsp->withSchema = 1;
185,728,091✔
53

54
END:
185,725,693✔
55
  if (code != 0) {
185,725,693✔
56
    tqError("%s failed at:%d, code:%s", __FUNCTION__, lino, tstrerror(code));
×
57
    taosArrayDestroy(pRsp->blockData);
×
58
    taosArrayDestroy(pRsp->blockDataLen);
×
59
    taosArrayDestroy(pRsp->blockSchema);
×
60
  }
61
  return code;
185,707,424✔
62
}
63

64
// Send data response functions
65

66
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
316,597,356✔
67
                        int32_t type, int64_t sver, int64_t ever) {
68
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
316,597,356✔
69
    return TSDB_CODE_INVALID_PARA;
×
70
  }
71
  int32_t len = 0;
316,598,521✔
72
  int32_t code = 0;
316,598,521✔
73

74
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
316,598,521✔
75
    pRsp->withSchema = 0;
×
76
  }
77
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP ||
316,598,521✔
78
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
79
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
316,591,689✔
80
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
6,832✔
81
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
6,832✔
82
  }
83

84
  if (code < 0) {
316,568,775✔
85
    return TAOS_GET_TERRNO(code);
×
86
  }
87

88
  int32_t tlen = sizeof(SMqRspHead) + len;
316,568,775✔
89
  void*   buf = rpcMallocCont(tlen);
316,568,775✔
90
  if (buf == NULL) {
316,553,999✔
91
    return terrno;
×
92
  }
93

94
  SMqRspHead* pHead = (SMqRspHead*)buf;
316,553,999✔
95
  tqInitMqRspHead(pHead, type, epoch, consumerId, sver, ever);
316,553,999✔
96

97
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
316,550,013✔
98

99
  SEncoder encoder = {0};
316,562,100✔
100
  tEncoderInit(&encoder, abuf, len);
316,578,252✔
101

102
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP ||
316,578,758✔
103
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
104
    code = tEncodeMqDataRsp(&encoder, pRsp);
316,571,926✔
105
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
6,832✔
106
    code = tEncodeSTaosxRsp(&encoder, pRsp);
6,832✔
107
  }
108
  tEncoderClear(&encoder);
316,548,513✔
109
  if (code < 0) {
316,522,853✔
110
    rpcFreeCont(buf);
×
111
    return TAOS_GET_TERRNO(code);
×
112
  }
113
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
316,522,853✔
114

115
  tmsgSendRsp(&rsp);
316,535,977✔
116
  return 0;
316,520,789✔
117
}
118

119
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp,
316,593,216✔
120
                      int32_t type, int32_t vgId) {
121
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
316,593,216✔
122
    return TSDB_CODE_INVALID_PARA;
×
123
  }
124
  int64_t sver = 0, ever = 0;
316,597,812✔
125
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
316,596,287✔
126

127
  char buf1[TSDB_OFFSET_LEN] = {0};
316,591,506✔
128
  char buf2[TSDB_OFFSET_LEN] = {0};
316,591,910✔
129
  (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
316,591,146✔
130
  (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
316,592,863✔
131

132
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) start to send rsp, block num:%d, req:%s, rsp:%s, QID:0x%" PRIx64,
316,594,258✔
133
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
134

135
  return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
316,595,362✔
136
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc