• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.78
/source/dnode/snode/src/snode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "rsync.h"
18
#include "sndInt.h"
19
#include "tqCommon.h"
20
#include "tuuid.h"
21

22
// clang-format off
23
#define sndError(...) do {  if (sndDebugFlag & DEBUG_ERROR) { taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__);}} while (0)
24
#define sndInfo(...)  do {  if (sndDebugFlag & DEBUG_INFO)  { taosPrintLog("SND INFO  ", DEBUG_INFO,  sndDebugFlag, __VA_ARGS__);}} while (0)
25
#define sndDebug(...) do {  if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND DEBUG ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
26
// clang-format on
27

28
int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
21✔
29
  if (!(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0)) {
21!
30
    return TSDB_CODE_INVALID_PARA;
×
31
  }
32
  int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
21✔
33
  if (code != TSDB_CODE_SUCCESS) {
21!
34
    return code;
×
35
  }
36

37
  pTask->pBackend = NULL;
21✔
38
  streamTaskOpenAllUpstreamInput(pTask);
21✔
39

40
  streamTaskResetUpstreamStageInfo(pTask);
21✔
41

42
  SCheckpointInfo *pChkInfo = &pTask->chkInfo;
21✔
43
  tqSetRestoreVersionInfo(pTask);
21✔
44

45
  char *p = streamTaskGetStatus(pTask).name;
21✔
46
  if (pTask->info.fillHistory == STREAM_HISTORY_TASK) {
21✔
47
    sndInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
4!
48
            " nextProcessVer:%" PRId64
49
            " child id:%d, level:%d, status:%s taskType:%d, related stream task:0x%x trigger:%" PRId64 " ms",
50
            SNODE_HANDLE, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
51
            pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
52
            (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam);
53
  } else {
54
    sndInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
17!
55
            " nextProcessVer:%" PRId64
56
            " child id:%d, level:%d, status:%s taskType:%d, related helper-task:0x%x trigger:%" PRId64 " ms",
57
            SNODE_HANDLE, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
58
            pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
59
            (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam);
60
  }
61
  return 0;
21✔
62
}
63

64
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
14✔
65
  int32_t code = 0;
14✔
66
  SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
14!
67
  if (pSnode == NULL) {
14!
68
    return NULL;
×
69
  }
70

71
  stopRsync();
14✔
72
  code = startRsync();
14✔
73
  if (code != 0) {
14!
74
    terrno = code;
×
75
    goto _ERR;
×
76
  }
77

78
  pSnode->msgCb = pOption->msgCb;
14✔
79
  code = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE,
28✔
80
                        taosGetTimestampMs(), tqStartTaskCompleteCallback, &pSnode->pMeta);
81
  if (code != TSDB_CODE_SUCCESS) {
14!
82
    terrno = code;
×
83
    goto _ERR;
×
84
  }
85

86
  streamMetaLoadAllTasks(pSnode->pMeta);
14✔
87
  return pSnode;
14✔
88

89
_ERR:
×
90
  taosMemoryFree(pSnode);
×
91
  return NULL;
×
92
}
93

94
int32_t sndInit(SSnode *pSnode) {
14✔
95
  if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false) != 0) {
14!
96
    sndError("failed to start all tasks");
×
97
  }
98
  return 0;
14✔
99
}
100

101
void sndClose(SSnode *pSnode) {
14✔
102
  stopRsync();
14✔
103

104
  streamMetaNotifyClose(pSnode->pMeta);
14✔
105
  if (streamMetaCommit(pSnode->pMeta) != 0) {
14!
106
    sndError("failed to commit stream meta");
×
107
  }
108
  streamMetaClose(pSnode->pMeta);
14✔
109
  taosMemoryFree(pSnode);
14!
110
}
14✔
111

112
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
1,557✔
113
  switch (pMsg->msgType) {
1,557!
114
    case TDMT_STREAM_TASK_RUN:
507✔
115
      return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true);
507✔
116
    case TDMT_STREAM_TASK_DISPATCH:
514✔
117
      return tqStreamTaskProcessDispatchReq(pSnode->pMeta, pMsg, &pSnode->msgCb);
514✔
118
    case TDMT_STREAM_TASK_DISPATCH_RSP:
146✔
119
      return tqStreamTaskProcessDispatchRsp(pSnode->pMeta, pMsg);
146✔
120
    case TDMT_STREAM_RETRIEVE:
×
121
      return tqStreamTaskProcessRetrieveReq(pSnode->pMeta, pMsg);
×
122
    case TDMT_STREAM_RETRIEVE_RSP:  // 1036
6✔
123
      break;
6✔
124
    case TDMT_VND_STREAM_TASK_CHECK:
79✔
125
      return tqStreamTaskProcessCheckReq(pSnode->pMeta, pMsg);
79✔
126
    case TDMT_VND_STREAM_TASK_CHECK_RSP:
55✔
127
      return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true);
55✔
128
    case TDMT_STREAM_TASK_CHECKPOINT_READY:
11✔
129
      return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
11✔
130
    case TDMT_MND_STREAM_HEARTBEAT_RSP:
205✔
131
      return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg);
205✔
132
    case TDMT_MND_STREAM_REQ_CHKPT_RSP:
3✔
133
      return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg);
3✔
134
    case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
11✔
135
      return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg);
11✔
136
    case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
4✔
137
      return tqStreamProcessChkptReportRsp(pSnode->pMeta, pMsg);
4✔
138
    case TDMT_STREAM_RETRIEVE_TRIGGER:
×
139
      return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
×
140
    case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
×
141
      return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg);
×
142
    case TDMT_STREAM_CHKPT_EXEC:
16✔
143
      return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true);
16✔
UNCOV
144
    default:
×
UNCOV
145
      sndError("invalid snode msg:%d", pMsg->msgType);
×
146
      return TSDB_CODE_INVALID_MSG;
×
147
  }
148
  return 0;
6✔
149
}
150

151
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
51✔
152
  switch (pMsg->msgType) {
51!
153
    case TDMT_STREAM_TASK_DEPLOY: {
12✔
154
      void   *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
12✔
155
      int32_t len = pMsg->contLen - sizeof(SMsgHead);
12✔
156
      return tqStreamTaskProcessDeployReq(pSnode->pMeta, &pSnode->msgCb, pMsg->info.conn.applyIndex, pReq, len, true,
12✔
157
                                          true);
158
    }
159

UNCOV
160
    case TDMT_STREAM_TASK_DROP:
×
UNCOV
161
      return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
×
162
    case TDMT_VND_STREAM_TASK_UPDATE:
3✔
163
      return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true, true);
3✔
164
    case TDMT_VND_STREAM_TASK_RESET:
×
165
      return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont);
×
166
    case TDMT_VND_STREAM_ALL_STOP:
×
167
      return tqStreamTaskProcessAllTaskStopReq(pSnode->pMeta, &pSnode->msgCb, pMsg);
×
168
    case TDMT_STREAM_TASK_PAUSE:
15✔
169
      return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
15✔
170
    case TDMT_STREAM_TASK_RESUME:
15✔
171
      return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
15✔
172
    case TDMT_STREAM_TASK_STOP:
×
173
      return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
×
174
    case TDMT_STREAM_TASK_START:
×
175
      return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
×
176
    case TDMT_STREAM_TASK_UPDATE_CHKPT:
4✔
177
      return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont);
4✔
178
    case TDMT_STREAM_CONSEN_CHKPT:
2✔
179
      return tqStreamTaskProcessConsenChkptIdReq(pSnode->pMeta, pMsg);
2✔
180
    default:
×
181
      return TSDB_CODE_INVALID_MSG;
×
182
  }
183
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc