• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.66
/source/dnode/mgmt/mgmt_bnode/src/bmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "bmInt.h"
18
#include "tjson.h"
19

20
static int32_t bmRequire(const SMgmtInputOpt *pInput, bool *required) {
2,443✔
21
  return dmReadFile(pInput->path, pInput->name, required);
2,443✔
22
}
23

NEW
24
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
×
NEW
25
  pOption->msgCb = pMgmt->msgCb;
×
NEW
26
  pOption->dnodeId = pMgmt->pData->dnodeId;
×
NEW
27
}
×
28

NEW
29
static void bmClose(SBnodeMgmt *pMgmt) {
×
NEW
30
  if (pMgmt->pBnode != NULL) {
×
31
    // bmStopWorker(pMgmt);
NEW
32
    bndClose(pMgmt->pBnode);
×
NEW
33
    pMgmt->pBnode = NULL;
×
34
  }
35

NEW
36
  taosMemoryFree(pMgmt);
×
NEW
37
}
×
38

NEW
39
static int32_t bndOpenWrapper(SBnodeOpt *pOption, SBnode **pBnode) {
×
NEW
40
  int32_t code = bndOpen(pOption, pBnode);
×
NEW
41
  return code;
×
42
}
43

NEW
44
int32_t bmPutMsgToQueue(SBnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
×
45
  int32_t  code;
46
  SRpcMsg *pMsg;
47

NEW
48
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
×
NEW
49
  if (code) {
×
NEW
50
    rpcFreeCont(pRpc->pCont);
×
NEW
51
    pRpc->pCont = NULL;
×
NEW
52
    return code = terrno;
×
53
  }
54

NEW
55
  SBnode *pBnode = pMgmt->pBnode;
×
NEW
56
  if (pBnode == NULL) {
×
NEW
57
    code = terrno;
×
NEW
58
    dError("msg:%p failed to put into bnode queue since %s, type:%s qtype:%d len:%d", pMsg, tstrerror(code),
×
59
           TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
NEW
60
    taosFreeQitem(pMsg);
×
NEW
61
    rpcFreeCont(pRpc->pCont);
×
NEW
62
    pRpc->pCont = NULL;
×
NEW
63
    return code;
×
64
  }
65

NEW
66
  SMsgHead *pHead = pRpc->pCont;
×
NEW
67
  pHead->contLen = htonl(pHead->contLen);
×
NEW
68
  pHead->vgId = SNODE_HANDLE;
×
NEW
69
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
×
NEW
70
  pRpc->pCont = NULL;
×
71

72
  switch (qtype) {
73
    case WRITE_QUEUE:
74
      // code = bmPutNodeMsgToWriteQueue(pMgmt, pMsg);
75
      // break;
76
    default:
NEW
77
      code = TSDB_CODE_INVALID_PARA;
×
NEW
78
      rpcFreeCont(pMsg->pCont);
×
NEW
79
      taosFreeQitem(pMsg);
×
NEW
80
      return code;
×
81
  }
82
  return code;
83
}
84

NEW
85
static int32_t bmDecodeFile(SJson *pJson, int32_t *proto) {
×
NEW
86
  int32_t code = 0;
×
87

NEW
88
  code = tjsonGetIntValue(pJson, "proto", proto);
×
NEW
89
  return code;
×
90
}
91

NEW
92
static int32_t bmReadFile(const char *path, const char *name, int32_t *proto) {
×
NEW
93
  int32_t   code = -1;
×
NEW
94
  TdFilePtr pFile = NULL;
×
NEW
95
  char     *content = NULL;
×
NEW
96
  SJson    *pJson = NULL;
×
NEW
97
  char      file[PATH_MAX] = {0};
×
NEW
98
  int32_t   nBytes = snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
×
NEW
99
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
×
NEW
100
    code = TSDB_CODE_OUT_OF_BUFFER;
×
NEW
101
    goto _OVER;
×
102
  }
103

NEW
104
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
×
NEW
105
    dInfo("file:%s not exist", file);
×
NEW
106
    code = 0;
×
NEW
107
    goto _OVER;
×
108
  }
109

NEW
110
  pFile = taosOpenFile(file, TD_FILE_READ);
×
NEW
111
  if (pFile == NULL) {
×
NEW
112
    code = terrno;
×
NEW
113
    dError("failed to open file:%s since %s", file, tstrerror(code));
×
NEW
114
    goto _OVER;
×
115
  }
116

NEW
117
  int64_t size = 0;
×
NEW
118
  code = taosFStatFile(pFile, &size, NULL);
×
NEW
119
  if (code != 0) {
×
NEW
120
    dError("failed to fstat file:%s since %s", file, tstrerror(code));
×
NEW
121
    goto _OVER;
×
122
  }
123

NEW
124
  content = taosMemoryMalloc(size + 1);
×
NEW
125
  if (content == NULL) {
×
NEW
126
    code = terrno;
×
NEW
127
    goto _OVER;
×
128
  }
129

NEW
130
  if (taosReadFile(pFile, content, size) != size) {
×
NEW
131
    code = terrno;
×
NEW
132
    dError("failed to read file:%s since %s", file, tstrerror(code));
×
NEW
133
    goto _OVER;
×
134
  }
135

NEW
136
  content[size] = '\0';
×
137

NEW
138
  pJson = tjsonParse(content);
×
NEW
139
  if (pJson == NULL) {
×
NEW
140
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
NEW
141
    goto _OVER;
×
142
  }
143

NEW
144
  if (bmDecodeFile(pJson, proto) < 0) {
×
NEW
145
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
NEW
146
    goto _OVER;
×
147
  }
148

NEW
149
  code = 0;
×
NEW
150
  dInfo("succceed to read bnode file %s", file);
×
151

NEW
152
_OVER:
×
NEW
153
  if (content != NULL) taosMemoryFree(content);
×
NEW
154
  if (pJson != NULL) cJSON_Delete(pJson);
×
NEW
155
  if (pFile != NULL) taosCloseFile(&pFile);
×
156

NEW
157
  if (code != 0) {
×
NEW
158
    dError("failed to read bnode file:%s since %s", file, tstrerror(code));
×
159
  }
NEW
160
  return code;
×
161
}
162

NEW
163
static int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
×
NEW
164
  int32_t     code = 0;
×
NEW
165
  SBnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SBnodeMgmt));
×
NEW
166
  if (pMgmt == NULL) {
×
NEW
167
    return terrno;
×
168
  }
169

NEW
170
  pMgmt->pData = pInput->pData;
×
NEW
171
  pMgmt->path = pInput->path;
×
NEW
172
  pMgmt->name = pInput->name;
×
NEW
173
  pMgmt->msgCb = pInput->msgCb;
×
NEW
174
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)bmPutMsgToQueue;
×
NEW
175
  pMgmt->msgCb.mgmt = pMgmt;
×
176

NEW
177
  SBnodeOpt option = {0};
×
NEW
178
  bmInitOption(pMgmt, &option);
×
179

NEW
180
  code = bmReadFile(pInput->path, pInput->name, &option.proto);
×
NEW
181
  if (code != 0) {
×
NEW
182
    dError("failed to read bnode since %s", tstrerror(code));
×
NEW
183
    bmClose(pMgmt);
×
NEW
184
    return code;
×
185
  }
186

NEW
187
  code = bndOpenWrapper(&option, &pMgmt->pBnode);
×
NEW
188
  if (code != 0) {
×
NEW
189
    dError("failed to open bnode since %s", tstrerror(code));
×
NEW
190
    bmClose(pMgmt);
×
NEW
191
    return code;
×
192
  }
NEW
193
  tmsgReportStartup("bnode-impl", "initialized");
×
194

195
  /*
196
  if ((code = bmStartWorker(pMgmt)) != 0) {
197
    dError("failed to start bnode worker since %s", tstrerror(code));
198
    bmClose(pMgmt);
199
    return code;
200
  }
201
  tmsgReportStartup("bnode-worker", "initialized");
202
  */
NEW
203
  pOutput->pMgmt = pMgmt;
×
NEW
204
  return code;
×
205
}
206

NEW
207
static int32_t bmEncodeFile(SJson *pJson, bool deployed, int32_t proto) {
×
NEW
208
  if (tjsonAddDoubleToObject(pJson, "deployed", deployed) < 0) {
×
NEW
209
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
210
  }
211

NEW
212
  if (tjsonAddIntegerToObject(pJson, "proto", proto) < 0) {
×
NEW
213
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
214
  }
215

NEW
216
  return 0;
×
217
}
218

NEW
219
static int32_t bmWriteFile(const char *path, const char *name, bool deployed, int32_t proto) {
×
NEW
220
  int32_t   code = -1;
×
NEW
221
  char     *buffer = NULL;
×
NEW
222
  SJson    *pJson = NULL;
×
NEW
223
  TdFilePtr pFile = NULL;
×
NEW
224
  char      file[PATH_MAX] = {0};
×
NEW
225
  char      realfile[PATH_MAX] = {0};
×
226

NEW
227
  int32_t nBytes = snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
×
NEW
228
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
×
NEW
229
    code = TSDB_CODE_OUT_OF_BUFFER;
×
NEW
230
    goto _OVER;
×
231
  }
232

NEW
233
  nBytes = snprintf(realfile, sizeof(realfile), "%s%s%s.json", path, TD_DIRSEP, name);
×
NEW
234
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
×
NEW
235
    code = TSDB_CODE_OUT_OF_BUFFER;
×
NEW
236
    goto _OVER;
×
237
  }
238

NEW
239
  pJson = tjsonCreateObject();
×
NEW
240
  if (pJson == NULL) {
×
NEW
241
    code = terrno;
×
NEW
242
    goto _OVER;
×
243
  }
244

NEW
245
  if ((code = bmEncodeFile(pJson, deployed, proto)) != 0) goto _OVER;
×
246

NEW
247
  buffer = tjsonToString(pJson);
×
NEW
248
  if (buffer == NULL) {
×
NEW
249
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
NEW
250
    goto _OVER;
×
251
  }
252

NEW
253
  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
NEW
254
  if (pFile == NULL) {
×
NEW
255
    code = terrno;
×
NEW
256
    goto _OVER;
×
257
  }
258

NEW
259
  int32_t len = strlen(buffer);
×
NEW
260
  if (taosWriteFile(pFile, buffer, len) <= 0) {
×
NEW
261
    code = terrno;
×
NEW
262
    goto _OVER;
×
263
  }
NEW
264
  if (taosFsyncFile(pFile) < 0) {
×
NEW
265
    code = terrno;
×
NEW
266
    goto _OVER;
×
267
  }
268

NEW
269
  if (taosCloseFile(&pFile) != 0) {
×
NEW
270
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
NEW
271
    goto _OVER;
×
272
  }
NEW
273
  TAOS_CHECK_GOTO(taosRenameFile(file, realfile), NULL, _OVER);
×
274

NEW
275
  dInfo("succeed to write file:%s, deloyed:%d", realfile, deployed);
×
276

NEW
277
_OVER:
×
NEW
278
  if (pJson != NULL) tjsonDelete(pJson);
×
NEW
279
  if (buffer != NULL) taosMemoryFree(buffer);
×
NEW
280
  if (pFile != NULL) taosCloseFile(&pFile);
×
281

NEW
282
  if (code != 0) {
×
NEW
283
    dError("failed to write file:%s since %s, deloyed:%d", realfile, tstrerror(code), deployed);
×
284
  }
NEW
285
  return code;
×
286
}
287

NEW
288
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
×
NEW
289
  int32_t          code = 0;
×
NEW
290
  SDCreateBnodeReq createReq = {0};
×
NEW
291
  if (tDeserializeSMCreateBnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
×
NEW
292
    code = TSDB_CODE_INVALID_MSG;
×
NEW
293
    return code;
×
294
  }
295

NEW
296
  if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
×
NEW
297
    code = TSDB_CODE_INVALID_OPTION;
×
NEW
298
    dError("failed to create bnode since %s", tstrerror(code));
×
299

NEW
300
    tFreeSMCreateBnodeReq(&createReq);
×
NEW
301
    return code;
×
302
  }
303

NEW
304
  bool deployed = true;
×
NEW
305
  if ((code = bmWriteFile(pInput->path, pInput->name, deployed, createReq.bnodeProto)) != 0) {
×
NEW
306
    dError("failed to write bnode file since %s", tstrerror(code));
×
307

NEW
308
    tFreeSMCreateBnodeReq(&createReq);
×
NEW
309
    return code;
×
310
  }
311

NEW
312
  tFreeSMCreateBnodeReq(&createReq);
×
313

NEW
314
  return 0;
×
315
}
316

NEW
317
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
×
NEW
318
  int32_t        code = 0;
×
NEW
319
  SDDropBnodeReq dropReq = {0};
×
NEW
320
  if (tDeserializeSMDropBnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
×
NEW
321
    code = TSDB_CODE_INVALID_MSG;
×
322

NEW
323
    return code;
×
324
  }
325

NEW
326
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
×
NEW
327
    code = TSDB_CODE_INVALID_OPTION;
×
NEW
328
    dError("failed to drop bnode since %s", tstrerror(code));
×
329

NEW
330
    tFreeSMDropBnodeReq(&dropReq);
×
NEW
331
    return code;
×
332
  }
333

NEW
334
  bool deployed = false;
×
NEW
335
  if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) {
×
NEW
336
    dError("failed to write bnode file since %s", tstrerror(code));
×
337

NEW
338
    tFreeSMDropBnodeReq(&dropReq);
×
NEW
339
    return code;
×
340
  }
341

NEW
342
  tFreeSMDropBnodeReq(&dropReq);
×
343

NEW
344
  return 0;
×
345
}
346

347
SArray *bmGetMsgHandles() {
2,441✔
348
  int32_t code = -1;
2,441✔
349
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
2,441✔
350
  if (pArray == NULL) goto _OVER;
2,441!
351

352
  code = 0;
2,441✔
353
_OVER:
2,441✔
354
  if (code != 0) {
2,441!
NEW
355
    taosArrayDestroy(pArray);
×
NEW
356
    return NULL;
×
357
  } else {
358
    return pArray;
2,441✔
359
  }
360
}
361

362
SMgmtFunc bmGetMgmtFunc() {
2,443✔
363
  SMgmtFunc mgmtFunc = {0};
2,443✔
364
  mgmtFunc.openFp = bmOpen;
2,443✔
365
  mgmtFunc.closeFp = (NodeCloseFp)bmClose;
2,443✔
366
  mgmtFunc.createFp = (NodeCreateFp)bmProcessCreateReq;
2,443✔
367
  mgmtFunc.dropFp = (NodeDropFp)bmProcessDropReq;
2,443✔
368
  mgmtFunc.requiredFp = bmRequire;
2,443✔
369
  mgmtFunc.getHandlesFp = bmGetMsgHandles;
2,443✔
370

371
  return mgmtFunc;
2,443✔
372
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc