• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.79
/source/dnode/mnode/impl/src/mndBnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndBnode.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndUser.h"
23

24
#define BNODE_VER_NUMBER   1
25
#define BNODE_RESERVE_SIZE 64
26

NEW
27
static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj) {
×
NEW
28
  int32_t code = 0;
×
NEW
29
  int32_t lino = 0;
×
30

NEW
31
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
32

NEW
33
  SSdbRaw *pRaw = sdbAllocRaw(SDB_BNODE, BNODE_VER_NUMBER, sizeof(SBnodeObj) + BNODE_RESERVE_SIZE);
×
NEW
34
  if (pRaw == NULL) goto _OVER;
×
35

NEW
36
  int32_t dataPos = 0;
×
NEW
37
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
×
NEW
38
  SDB_SET_INT32(pRaw, dataPos, pObj->proto, _OVER)
×
NEW
39
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
×
NEW
40
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
×
NEW
41
  SDB_SET_RESERVE(pRaw, dataPos, BNODE_RESERVE_SIZE, _OVER)
×
42

NEW
43
  terrno = 0;
×
44

NEW
45
_OVER:
×
NEW
46
  if (terrno != 0) {
×
NEW
47
    mError("bnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
NEW
48
    sdbFreeRaw(pRaw);
×
NEW
49
    return NULL;
×
50
  }
51

NEW
52
  mTrace("bnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
×
NEW
53
  return pRaw;
×
54
}
55

NEW
56
static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw) {
×
NEW
57
  int32_t    code = 0;
×
NEW
58
  int32_t    lino = 0;
×
NEW
59
  SSdbRow   *pRow = NULL;
×
NEW
60
  SBnodeObj *pObj = NULL;
×
NEW
61
  int8_t     sver = 0;
×
62

NEW
63
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
64

NEW
65
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
66

NEW
67
  if (sver != BNODE_VER_NUMBER) {
×
NEW
68
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
NEW
69
    goto _OVER;
×
70
  }
71

NEW
72
  pRow = sdbAllocRow(sizeof(SBnodeObj));
×
NEW
73
  if (pRow == NULL) goto _OVER;
×
74

NEW
75
  pObj = sdbGetRowObj(pRow);
×
NEW
76
  if (pObj == NULL) goto _OVER;
×
77

NEW
78
  int32_t dataPos = 0;
×
NEW
79
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
×
NEW
80
  SDB_GET_INT32(pRaw, dataPos, &pObj->proto, _OVER)
×
NEW
81
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
×
NEW
82
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
×
NEW
83
  SDB_GET_RESERVE(pRaw, dataPos, BNODE_RESERVE_SIZE, _OVER)
×
84

NEW
85
  terrno = 0;
×
86

NEW
87
_OVER:
×
NEW
88
  if (terrno != 0) {
×
NEW
89
    mError("bnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
NEW
90
    taosMemoryFreeClear(pRow);
×
NEW
91
    return NULL;
×
92
  }
93

NEW
94
  mTrace("bnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
×
NEW
95
  return pRow;
×
96
}
97

NEW
98
static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj) {
×
NEW
99
  mTrace("bnode:%d, perform insert action, row:%p", pObj->id, pObj);
×
NEW
100
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
×
NEW
101
  if (pObj->pDnode == NULL) {
×
NEW
102
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
NEW
103
    mError("bnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
×
NEW
104
    return -1;
×
105
  }
106

NEW
107
  return 0;
×
108
}
109

NEW
110
static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) {
×
NEW
111
  mTrace("bnode:%d, perform delete action, row:%p", pObj->id, pObj);
×
NEW
112
  if (pObj->pDnode != NULL) {
×
NEW
113
    sdbRelease(pSdb, pObj->pDnode);
×
NEW
114
    pObj->pDnode = NULL;
×
115
  }
116

NEW
117
  return 0;
×
118
}
119

NEW
120
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew) {
×
NEW
121
  mTrace("bnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
×
NEW
122
  pOld->updateTime = pNew->updateTime;
×
NEW
123
  return 0;
×
124
}
125

NEW
126
static int32_t mndSetCreateBnodeRedoLogs(STrans *pTrans, SBnodeObj *pObj) {
×
NEW
127
  int32_t  code = 0;
×
NEW
128
  SSdbRaw *pRedoRaw = mndBnodeActionEncode(pObj);
×
NEW
129
  if (pRedoRaw == NULL) {
×
NEW
130
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
131
    if (terrno != 0) code = terrno;
×
NEW
132
    TAOS_RETURN(code);
×
133
  }
NEW
134
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
NEW
135
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
NEW
136
  TAOS_RETURN(code);
×
137
}
138

NEW
139
static int32_t mndSetCreateBnodeUndoLogs(STrans *pTrans, SBnodeObj *pObj) {
×
NEW
140
  int32_t  code = 0;
×
NEW
141
  SSdbRaw *pUndoRaw = mndBnodeActionEncode(pObj);
×
NEW
142
  if (pUndoRaw == NULL) {
×
NEW
143
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
144
    if (terrno != 0) code = terrno;
×
NEW
145
    TAOS_RETURN(code);
×
146
  }
NEW
147
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
NEW
148
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
NEW
149
  TAOS_RETURN(code);
×
150
}
151

NEW
152
static int32_t mndSetCreateBnodeCommitLogs(STrans *pTrans, SBnodeObj *pObj) {
×
NEW
153
  int32_t  code = 0;
×
NEW
154
  SSdbRaw *pCommitRaw = mndBnodeActionEncode(pObj);
×
NEW
155
  if (pCommitRaw == NULL) {
×
NEW
156
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
157
    if (terrno != 0) code = terrno;
×
NEW
158
    TAOS_RETURN(code);
×
159
  }
NEW
160
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
NEW
161
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
NEW
162
  TAOS_RETURN(code);
×
163
}
164

NEW
165
static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBnodeObj *pObj) {
×
NEW
166
  int32_t          code = 0;
×
NEW
167
  SDCreateBnodeReq createReq = {0};
×
NEW
168
  createReq.dnodeId = pDnode->id;
×
NEW
169
  createReq.bnodeProto = pObj->proto;
×
170

NEW
171
  int32_t contLen = tSerializeSMCreateBnodeReq(NULL, 0, &createReq);
×
NEW
172
  void   *pReq = taosMemoryMalloc(contLen);
×
NEW
173
  if (pReq == NULL) {
×
NEW
174
    code = terrno;
×
NEW
175
    TAOS_RETURN(code);
×
176
  }
NEW
177
  code = tSerializeSMCreateBnodeReq(pReq, contLen, &createReq);
×
NEW
178
  if (code < 0) {
×
NEW
179
    mError("bnode:%d, failed to serialize create drop bnode request since %s", createReq.dnodeId, terrstr());
×
180

NEW
181
    taosMemoryFree(pReq);
×
NEW
182
    TAOS_RETURN(code);
×
183
  }
184

NEW
185
  STransAction action = {0};
×
NEW
186
  action.epSet = mndGetDnodeEpset(pDnode);
×
NEW
187
  action.pCont = pReq;
×
NEW
188
  action.contLen = contLen;
×
NEW
189
  action.msgType = TDMT_DND_CREATE_BNODE;
×
NEW
190
  action.acceptableCode = TSDB_CODE_BNODE_ALREADY_DEPLOYED;
×
191

NEW
192
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
NEW
193
    taosMemoryFree(pReq);
×
NEW
194
    TAOS_RETURN(code);
×
195
  }
196

NEW
197
  TAOS_RETURN(code);
×
198
}
199

NEW
200
static int32_t mndSetCreateBnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SBnodeObj *pObj) {
×
NEW
201
  int32_t        code = 0;
×
NEW
202
  SDDropBnodeReq dropReq = {0};
×
NEW
203
  dropReq.dnodeId = pDnode->id;
×
204

NEW
205
  int32_t contLen = tSerializeSMDropBnodeReq(NULL, 0, &dropReq);
×
NEW
206
  void   *pReq = taosMemoryMalloc(contLen);
×
NEW
207
  if (pReq == NULL) {
×
NEW
208
    code = terrno;
×
NEW
209
    TAOS_RETURN(code);
×
210
  }
NEW
211
  code = tSerializeSMDropBnodeReq(pReq, contLen, &dropReq);
×
NEW
212
  if (code < 0) {
×
NEW
213
    mError("bnode:%d, failed to serialize create drop bnode request since %s", dropReq.dnodeId, terrstr());
×
214
  }
215

NEW
216
  STransAction action = {0};
×
NEW
217
  action.epSet = mndGetDnodeEpset(pDnode);
×
NEW
218
  action.pCont = pReq;
×
NEW
219
  action.contLen = contLen;
×
NEW
220
  action.msgType = TDMT_DND_DROP_BNODE;
×
NEW
221
  action.acceptableCode = TSDB_CODE_BNODE_NOT_DEPLOYED;
×
222

NEW
223
  if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
×
NEW
224
    taosMemoryFree(pReq);
×
NEW
225
    TAOS_RETURN(code);
×
226
  }
227

NEW
228
  TAOS_RETURN(code);
×
229
}
230

NEW
231
static int32_t mndCreateBnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) {
×
NEW
232
  int32_t code = -1;
×
233

NEW
234
  SBnodeObj bnodeObj = {0};
×
NEW
235
  bnodeObj.id = pDnode->id;
×
NEW
236
  bnodeObj.proto = pCreate->bnodeProto;
×
NEW
237
  bnodeObj.createdTime = taosGetTimestampMs();
×
NEW
238
  bnodeObj.updateTime = bnodeObj.createdTime;
×
239

NEW
240
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-bnode");
×
NEW
241
  if (pTrans == NULL) {
×
NEW
242
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
243
    if (terrno != 0) code = terrno;
×
NEW
244
    goto _OVER;
×
245
  }
NEW
246
  mndTransSetSerial(pTrans);
×
247

NEW
248
  mInfo("trans:%d, to create bnode:%d %d", pTrans->id, pCreate->dnodeId, pCreate->bnodeProto);
×
249

NEW
250
  TAOS_CHECK_GOTO(mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj), NULL, _OVER);
×
NEW
251
  TAOS_CHECK_GOTO(mndSetCreateBnodeUndoLogs(pTrans, &bnodeObj), NULL, _OVER);
×
NEW
252
  TAOS_CHECK_GOTO(mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj), NULL, _OVER);
×
NEW
253
  TAOS_CHECK_GOTO(mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj), NULL, _OVER);
×
NEW
254
  TAOS_CHECK_GOTO(mndSetCreateBnodeUndoActions(pTrans, pDnode, &bnodeObj), NULL, _OVER);
×
NEW
255
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
256

NEW
257
  code = 0;
×
258

NEW
259
_OVER:
×
NEW
260
  mndTransDrop(pTrans);
×
NEW
261
  TAOS_RETURN(code);
×
262
}
263

NEW
264
static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
×
NEW
265
  SMnode          *pMnode = pReq->info.node;
×
NEW
266
  int32_t          code = -1;
×
NEW
267
  SBnodeObj       *pObj = NULL;
×
NEW
268
  SDnodeObj       *pDnode = NULL;
×
NEW
269
  SMCreateBnodeReq createReq = {0};
×
270

NEW
271
  TAOS_CHECK_GOTO(tDeserializeSMCreateBnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
272

NEW
273
  mInfo("bnode:%d, start to create", createReq.dnodeId);
×
NEW
274
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_BNODE), NULL, _OVER);
×
275

NEW
276
  pObj = mndAcquireBnode(pMnode, createReq.dnodeId);
×
NEW
277
  if (pObj != NULL) {
×
NEW
278
    code = terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST;
×
NEW
279
    goto _OVER;
×
NEW
280
  } else if (terrno != TSDB_CODE_MND_BNODE_NOT_EXIST) {
×
NEW
281
    code = terrno;
×
NEW
282
    goto _OVER;
×
283
  }
284

NEW
285
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
×
NEW
286
  if (pDnode == NULL) {
×
NEW
287
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
NEW
288
    goto _OVER;
×
289
  }
290

NEW
291
  code = mndCreateBnode(pMnode, pReq, pDnode, &createReq);
×
NEW
292
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
293

NEW
294
_OVER:
×
NEW
295
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
296
    mError("bnode:%d, failed to create since %s", createReq.dnodeId, tstrerror(code));
×
NEW
297
    TAOS_RETURN(code);
×
298
  }
299

300
  //  mndReleaseBnode(pMnode, pObj);
NEW
301
  mndReleaseDnode(pMnode, pDnode);
×
NEW
302
  tFreeSMCreateBnodeReq(&createReq);
×
NEW
303
  TAOS_RETURN(code);
×
304
}
305

NEW
306
static int32_t mndSetDropBnodeRedoLogs(STrans *pTrans, SBnodeObj *pObj) {
×
NEW
307
  int32_t  code = 0;
×
NEW
308
  SSdbRaw *pRedoRaw = mndBnodeActionEncode(pObj);
×
NEW
309
  if (pRedoRaw == NULL) {
×
NEW
310
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
311
    if (terrno != 0) code = terrno;
×
NEW
312
    TAOS_RETURN(code);
×
313
  }
NEW
314
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
NEW
315
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
NEW
316
  TAOS_RETURN(code);
×
317
}
318

NEW
319
static int32_t mndSetDropBnodeCommitLogs(STrans *pTrans, SBnodeObj *pObj) {
×
NEW
320
  int32_t  code = 0;
×
NEW
321
  SSdbRaw *pCommitRaw = mndBnodeActionEncode(pObj);
×
NEW
322
  if (pCommitRaw == NULL) {
×
NEW
323
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
324
    if (terrno != 0) code = terrno;
×
NEW
325
    TAOS_RETURN(code);
×
326
  }
NEW
327
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
NEW
328
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
NEW
329
  TAOS_RETURN(code);
×
330
}
331

NEW
332
static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBnodeObj *pObj) {
×
NEW
333
  int32_t        code = 0;
×
NEW
334
  SDDropBnodeReq dropReq = {0};
×
NEW
335
  dropReq.dnodeId = pDnode->id;
×
336

NEW
337
  int32_t contLen = tSerializeSMDropBnodeReq(NULL, 0, &dropReq);
×
NEW
338
  void   *pReq = taosMemoryMalloc(contLen);
×
NEW
339
  if (pReq == NULL) {
×
NEW
340
    code = terrno;
×
NEW
341
    TAOS_RETURN(code);
×
342
  }
NEW
343
  code = tSerializeSMDropBnodeReq(pReq, contLen, &dropReq);
×
NEW
344
  if (code < 0) {
×
NEW
345
    mError("bnode:%d, failed to serialize create drop bnode request since %s", dropReq.dnodeId, terrstr());
×
346
  }
347

NEW
348
  STransAction action = {0};
×
NEW
349
  action.epSet = mndGetDnodeEpset(pDnode);
×
NEW
350
  action.pCont = pReq;
×
NEW
351
  action.contLen = contLen;
×
NEW
352
  action.msgType = TDMT_DND_DROP_BNODE;
×
NEW
353
  action.acceptableCode = TSDB_CODE_BNODE_NOT_DEPLOYED;
×
354

NEW
355
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
NEW
356
    taosMemoryFree(pReq);
×
NEW
357
    TAOS_RETURN(code);
×
358
  }
359

NEW
360
  TAOS_RETURN(code);
×
361
}
362

NEW
363
int32_t mndSetDropBnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SBnodeObj *pObj, bool force) {
×
NEW
364
  if (pObj == NULL) return 0;
×
NEW
365
  TAOS_CHECK_RETURN(mndSetDropBnodeRedoLogs(pTrans, pObj));
×
NEW
366
  TAOS_CHECK_RETURN(mndSetDropBnodeCommitLogs(pTrans, pObj));
×
NEW
367
  if (!force) {
×
NEW
368
    TAOS_CHECK_RETURN(mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj));
×
369
  }
NEW
370
  return 0;
×
371
}
372

NEW
373
static int32_t mndDropBnode(SMnode *pMnode, SRpcMsg *pReq, SBnodeObj *pObj) {
×
NEW
374
  int32_t code = -1;
×
375

NEW
376
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-bnode");
×
NEW
377
  if (pTrans == NULL) {
×
NEW
378
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
379
    if (terrno != 0) code = terrno;
×
NEW
380
    goto _OVER;
×
381
  }
NEW
382
  mndTransSetSerial(pTrans);
×
383

NEW
384
  mInfo("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
×
NEW
385
  TAOS_CHECK_GOTO(mndSetDropBnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER);
×
NEW
386
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
387

NEW
388
  code = 0;
×
389

NEW
390
_OVER:
×
NEW
391
  mndTransDrop(pTrans);
×
NEW
392
  TAOS_RETURN(code);
×
393
}
394

NEW
395
static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
×
NEW
396
  SMnode        *pMnode = pReq->info.node;
×
NEW
397
  int32_t        code = -1;
×
NEW
398
  SBnodeObj     *pObj = NULL;
×
NEW
399
  SDnodeObj     *pDnode = NULL;
×
NEW
400
  SMDropBnodeReq dropReq = {0};
×
401

NEW
402
  TAOS_CHECK_GOTO(tDeserializeSMDropBnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
403

NEW
404
  mInfo("bnode:%d, start to drop", dropReq.dnodeId);
×
NEW
405
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_BNODE), NULL, _OVER);
×
406

NEW
407
  if (dropReq.dnodeId <= 0) {
×
NEW
408
    code = TSDB_CODE_INVALID_MSG;
×
NEW
409
    goto _OVER;
×
410
  }
411

NEW
412
  pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
×
NEW
413
  if (pDnode == NULL) {
×
NEW
414
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
NEW
415
    goto _OVER;
×
416
  }
417

NEW
418
  if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
×
NEW
419
    code = TSDB_CODE_DNODE_OFFLINE;
×
NEW
420
    goto _OVER;
×
421
  }
422

NEW
423
  pObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
×
NEW
424
  if (pObj == NULL) {
×
NEW
425
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
426
    if (terrno != 0) code = terrno;
×
NEW
427
    goto _OVER;
×
428
  }
429

430
  // check deletable
NEW
431
  code = mndDropBnode(pMnode, pReq, pObj);
×
NEW
432
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
433

NEW
434
_OVER:
×
NEW
435
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
436
    mError("bnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
×
437
  }
438

NEW
439
  mndReleaseBnode(pMnode, pObj);
×
NEW
440
  tFreeSMDropBnodeReq(&dropReq);
×
NEW
441
  TAOS_RETURN(code);
×
442
}
443

NEW
444
static const char *mndBnodeProtoStr(int32_t proto) {
×
NEW
445
  switch (proto) {
×
NEW
446
    case TSDB_BNODE_OPT_PROTO_MQTT:
×
NEW
447
      return TSDB_BNODE_OPT_PROTO_STR_MQTT;
×
NEW
448
    default:
×
NEW
449
      break;
×
450
  }
NEW
451
  return "unknown";
×
452
}
453

NEW
454
static int32_t mndRetrieveBnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
NEW
455
  SMnode    *pMnode = pReq->info.node;
×
NEW
456
  SSdb      *pSdb = pMnode->pSdb;
×
NEW
457
  int32_t    numOfRows = 0;
×
NEW
458
  int32_t    cols = 0;
×
NEW
459
  SBnodeObj *pObj = NULL;
×
460
  char       buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE];
461

NEW
462
  while (numOfRows < rows) {
×
NEW
463
    pShow->pIter = sdbFetch(pSdb, SDB_BNODE, pShow->pIter, (void **)&pObj);
×
NEW
464
    if (pShow->pIter == NULL) break;
×
465

NEW
466
    cols = 0;
×
NEW
467
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
468
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false), pSdb, pObj);
×
469

NEW
470
    char mqtt_ep[TSDB_EP_LEN] = {0};
×
NEW
471
    char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
×
472

NEW
473
    TAOS_UNUSED(tsnprintf(mqtt_ep, TSDB_EP_LEN - 1, "%s:%hu", pObj->pDnode->fqdn, tsMqttPort));
×
NEW
474
    STR_WITH_MAXSIZE_TO_VARSTR(ep, mqtt_ep, pShow->pMeta->pSchemas[cols].bytes);
×
475

NEW
476
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
477
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)ep, false), pSdb, pObj);
×
478

NEW
479
    STR_TO_VARSTR(buf, mndBnodeProtoStr(pObj->proto));
×
NEW
480
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
481
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)buf, false), pSdb, pObj);
×
482

NEW
483
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
484
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false), pSdb,
×
485
                                   pObj);
486

NEW
487
    numOfRows++;
×
NEW
488
    sdbRelease(pSdb, pObj);
×
489
  }
490

NEW
491
  pShow->numOfRows += numOfRows;
×
492

NEW
493
  return numOfRows;
×
494
}
495

NEW
496
static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter) {
×
NEW
497
  SSdb *pSdb = pMnode->pSdb;
×
NEW
498
  sdbCancelFetchByType(pSdb, pIter, SDB_BNODE);
×
NEW
499
}
×
500

NEW
501
SEpSet mndAcquireEpFromBnode(SMnode *pMnode, const SBnodeObj *pBnode) {
×
NEW
502
  SEpSet epSet = {.numOfEps = 1, .inUse = 0};
×
NEW
503
  memcpy(epSet.eps[0].fqdn, pBnode->pDnode->fqdn, TSDB_FQDN_LEN);
×
NEW
504
  epSet.eps[0].port = pBnode->pDnode->port;
×
NEW
505
  return epSet;
×
506
}
507

508
SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t dnodeId) {
33✔
509
  SBnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_BNODE, &dnodeId);
33✔
510
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
33!
511
    terrno = TSDB_CODE_MND_BNODE_NOT_EXIST;
33✔
512
  }
513
  return pObj;
33✔
514
}
515

NEW
516
void mndReleaseBnode(SMnode *pMnode, SBnodeObj *pObj) {
×
NEW
517
  SSdb *pSdb = pMnode->pSdb;
×
NEW
518
  sdbRelease(pSdb, pObj);
×
NEW
519
}
×
520

521
int32_t mndInitBnode(SMnode *pMnode) {
2,032✔
522
  SSdbTable table = {
2,032✔
523
      .sdbType = SDB_BNODE,
524
      .keyType = SDB_KEY_INT32,
525
      .encodeFp = (SdbEncodeFp)mndBnodeActionEncode,
526
      .decodeFp = (SdbDecodeFp)mndBnodeActionDecode,
527
      .insertFp = (SdbInsertFp)mndBnodeActionInsert,
528
      .updateFp = (SdbUpdateFp)mndBnodeActionUpdate,
529
      .deleteFp = (SdbDeleteFp)mndBnodeActionDelete,
530
  };
531

532
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_BNODE, mndProcessCreateBnodeReq);
2,032✔
533
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_BNODE, mndProcessDropBnodeReq);
2,032✔
534
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndTransProcessRsp);
2,032✔
535
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndTransProcessRsp);
2,032✔
536

537
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndRetrieveBnodes);
2,032✔
538
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndCancelGetNextBnode);
2,032✔
539

540
  return sdbSetTable(pMnode->pSdb, table);
2,032✔
541
}
542

543
void mndCleanupBnode(SMnode *pMnode) {}
2,031✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc