• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.54
/source/dnode/mnode/impl/src/mndQnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndQnode.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24

25
#define QNODE_VER_NUMBER   1
26
#define QNODE_RESERVE_SIZE 64
27

28
static SSdbRaw *mndQnodeActionEncode(SQnodeObj *pObj);
29
static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw);
30
static int32_t  mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj);
31
static int32_t  mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew);
32
static int32_t  mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj);
33
static int32_t  mndProcessCreateQnodeReq(SRpcMsg *pReq);
34
static int32_t  mndProcessDropQnodeReq(SRpcMsg *pReq);
35
static int32_t  mndProcessQnodeListReq(SRpcMsg *pReq);
36
static int32_t  mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
37
static void     mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
38

39
int32_t mndInitQnode(SMnode *pMnode) {
2,032✔
40
  SSdbTable table = {
2,032✔
41
      .sdbType = SDB_QNODE,
42
      .keyType = SDB_KEY_INT32,
43
      .encodeFp = (SdbEncodeFp)mndQnodeActionEncode,
44
      .decodeFp = (SdbDecodeFp)mndQnodeActionDecode,
45
      .insertFp = (SdbInsertFp)mndQnodeActionInsert,
46
      .updateFp = (SdbUpdateFp)mndQnodeActionUpdate,
47
      .deleteFp = (SdbDeleteFp)mndQnodeActionDelete,
48
  };
49

50
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_QNODE, mndProcessCreateQnodeReq);
2,032✔
51
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_QNODE, mndProcessDropQnodeReq);
2,032✔
52
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndTransProcessRsp);
2,032✔
53
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndTransProcessRsp);
2,032✔
54
  mndSetMsgHandle(pMnode, TDMT_MND_QNODE_LIST, mndProcessQnodeListReq);
2,032✔
55

56
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndRetrieveQnodes);
2,032✔
57
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndCancelGetNextQnode);
2,032✔
58

59
  return sdbSetTable(pMnode->pSdb, table);
2,032✔
60
}
61

62
void mndCleanupQnode(SMnode *pMnode) {}
2,031✔
63

64
SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) {
87,021✔
65
  SQnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_QNODE, &qnodeId);
87,021✔
66
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
87,021!
67
    terrno = TSDB_CODE_MND_QNODE_NOT_EXIST;
71,984✔
68
  }
69
  return pObj;
87,021✔
70
}
71

72
void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj) {
15,553✔
73
  SSdb *pSdb = pMnode->pSdb;
15,553✔
74
  sdbRelease(pSdb, pObj);
15,553✔
75
}
15,553✔
76

77
static SSdbRaw *mndQnodeActionEncode(SQnodeObj *pObj) {
2,045✔
78
  int32_t code = 0;
2,045✔
79
  int32_t lino = 0;
2,045✔
80
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,045✔
81

82
  SSdbRaw *pRaw = sdbAllocRaw(SDB_QNODE, QNODE_VER_NUMBER, sizeof(SQnodeObj) + QNODE_RESERVE_SIZE);
2,045✔
83
  if (pRaw == NULL) goto _OVER;
2,045!
84

85
  int32_t dataPos = 0;
2,045✔
86
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
2,045!
87
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
2,045!
88
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
2,045!
89
  SDB_SET_RESERVE(pRaw, dataPos, QNODE_RESERVE_SIZE, _OVER)
2,045!
90

91
  terrno = 0;
2,045✔
92

93
_OVER:
2,045✔
94
  if (terrno != 0) {
2,045!
95
    mError("qnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
96
    sdbFreeRaw(pRaw);
×
97
    return NULL;
×
98
  }
99

100
  mTrace("qnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
2,045✔
101
  return pRaw;
2,045✔
102
}
103

104
static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) {
1,139✔
105
  int32_t code = 0;
1,139✔
106
  int32_t lino = 0;
1,139✔
107
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,139✔
108
  SSdbRow   *pRow = NULL;
1,139✔
109
  SQnodeObj *pObj = NULL;
1,139✔
110

111
  int8_t sver = 0;
1,139✔
112
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,139!
113

114
  if (sver != QNODE_VER_NUMBER) {
1,139!
115
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
116
    goto _OVER;
×
117
  }
118

119
  pRow = sdbAllocRow(sizeof(SQnodeObj));
1,139✔
120
  if (pRow == NULL) goto _OVER;
1,139!
121

122
  pObj = sdbGetRowObj(pRow);
1,139✔
123
  if (pObj == NULL) goto _OVER;
1,139!
124

125
  int32_t dataPos = 0;
1,139✔
126
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
1,139!
127
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
1,139!
128
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
1,139!
129
  SDB_GET_RESERVE(pRaw, dataPos, QNODE_RESERVE_SIZE, _OVER)
1,139!
130

131
  terrno = 0;
1,139✔
132

133
_OVER:
1,139✔
134
  if (terrno != 0) {
1,139!
135
    mError("qnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
136
    taosMemoryFreeClear(pRow);
×
137
    return NULL;
×
138
  }
139

140
  mTrace("qnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
1,139✔
141
  return pRow;
1,139✔
142
}
143

144
static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj) {
586✔
145
  int32_t code = 0;
586✔
146
  mTrace("qnode:%d, perform insert action, row:%p", pObj->id, pObj);
586✔
147
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
586✔
148
  if (pObj->pDnode == NULL) {
586!
149
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
150
    mError("qnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
×
151
    TAOS_RETURN(code);
×
152
  }
153

154
  TAOS_RETURN(code);
586✔
155
}
156

157
static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
1,139✔
158
  mTrace("qnode:%d, perform delete action, row:%p", pObj->id, pObj);
1,139✔
159
  if (pObj->pDnode != NULL) {
1,139✔
160
    sdbRelease(pSdb, pObj->pDnode);
586✔
161
    pObj->pDnode = NULL;
586✔
162
  }
163

164
  return 0;
1,139✔
165
}
166

167
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew) {
544✔
168
  mTrace("qnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
544✔
169
  pOld->updateTime = pNew->updateTime;
544✔
170
  return 0;
544✔
171
}
172

173
static int32_t mndSetCreateQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
481✔
174
  int32_t  code = 0;
481✔
175
  SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj);
481✔
176
  if (pRedoRaw == NULL) {
481!
177
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
178
    if (terrno != 0) code = terrno;
×
179
    TAOS_RETURN(code);
×
180
  }
181
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
481!
182
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
481!
183
  TAOS_RETURN(code);
481✔
184
}
185

186
static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) {
481✔
187
  int32_t  code = 0;
481✔
188
  SSdbRaw *pUndoRaw = mndQnodeActionEncode(pObj);
481✔
189
  if (pUndoRaw == NULL) {
481!
190
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
191
    if (terrno != 0) code = terrno;
×
192
    TAOS_RETURN(code);
×
193
  }
194
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
481!
195
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
481!
196
  TAOS_RETURN(code);
481✔
197
}
198

199
int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
483✔
200
  int32_t  code = 0;
483✔
201
  SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
483✔
202
  if (pCommitRaw == NULL) {
483!
203
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
204
    if (terrno != 0) code = terrno;
×
205
    TAOS_RETURN(code);
×
206
  }
207
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
483!
208
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
483!
209
  TAOS_RETURN(code);
483✔
210
}
211

212
bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId) { return pQnode->pDnode->id == dnodeId; }
×
213

214
int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
483✔
215
  int32_t          code = 0;
483✔
216
  SDCreateQnodeReq createReq = {0};
483✔
217
  createReq.dnodeId = pDnode->id;
483✔
218

219
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &createReq);
483✔
220
  void   *pReq = taosMemoryMalloc(contLen);
483!
221
  if (pReq == NULL) {
483!
222
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
223
    return -1;
×
224
  }
225
  code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, &createReq);
483✔
226
  if (code < 0) {
483!
227
    mError("qnode:%d, failed to serialize create drop qnode request since %s", createReq.dnodeId, terrstr());
×
228
  }
229
  STransAction action = {0};
483✔
230
  action.epSet = mndGetDnodeEpset(pDnode);
483✔
231
  action.pCont = pReq;
483✔
232
  action.contLen = contLen;
483✔
233
  action.msgType = TDMT_DND_CREATE_QNODE;
483✔
234
  action.acceptableCode = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
483✔
235
  action.groupId = -1;
483✔
236

237
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
483!
238
    taosMemoryFree(pReq);
×
239
    TAOS_RETURN(code);
×
240
  }
241

242
  TAOS_RETURN(code);
483✔
243
}
244

245
static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
481✔
246
  int32_t        code = 0;
481✔
247
  SDDropQnodeReq dropReq = {0};
481✔
248
  dropReq.dnodeId = pDnode->id;
481✔
249

250
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq);
481✔
251
  void   *pReq = taosMemoryMalloc(contLen);
481!
252
  if (pReq == NULL) {
481!
253
    code = terrno;
×
254
    TAOS_RETURN(code);
×
255
  }
256
  code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
481✔
257
  if (code < 0) {
481!
258
    mError("qnode:%d, failed to serialize create drop qnode request since %s", dropReq.dnodeId, terrstr());
×
259
  }
260

261
  STransAction action = {0};
481✔
262
  action.epSet = mndGetDnodeEpset(pDnode);
481✔
263
  action.pCont = pReq;
481✔
264
  action.contLen = contLen;
481✔
265
  action.msgType = TDMT_DND_DROP_QNODE;
481✔
266
  action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED;
481✔
267

268
  if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
481!
269
    taosMemoryFree(pReq);
×
270
    TAOS_RETURN(code);
×
271
  }
272

273
  TAOS_RETURN(code);
481✔
274
}
275

276
static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) {
481✔
277
  int32_t code = -1;
481✔
278

279
  SQnodeObj qnodeObj = {0};
481✔
280
  qnodeObj.id = pDnode->id;
481✔
281
  qnodeObj.createdTime = taosGetTimestampMs();
481✔
282
  qnodeObj.updateTime = qnodeObj.createdTime;
481✔
283

284
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-qnode");
481✔
285
  if (pTrans == NULL) {
481!
286
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
287
    if (terrno != 0) code = terrno;
×
288
    goto _OVER;
×
289
  }
290
  mndTransSetSerial(pTrans);
481✔
291

292
  mInfo("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
481!
293
  TAOS_CHECK_GOTO(mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj), NULL, _OVER);
481!
294
  TAOS_CHECK_GOTO(mndSetCreateQnodeUndoLogs(pTrans, &qnodeObj), NULL, _OVER);
481!
295
  TAOS_CHECK_GOTO(mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj), NULL, _OVER);
481!
296
  TAOS_CHECK_GOTO(mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj), NULL, _OVER);
481!
297
  TAOS_CHECK_GOTO(mndSetCreateQnodeUndoActions(pTrans, pDnode, &qnodeObj), NULL, _OVER);
481!
298
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
481!
299

300
  code = 0;
481✔
301

302
_OVER:
481✔
303
  mndTransDrop(pTrans);
481✔
304
  TAOS_RETURN(code);
481✔
305
}
306

307
static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
484✔
308
  SMnode          *pMnode = pReq->info.node;
484✔
309
  int32_t          code = -1;
484✔
310
  SQnodeObj       *pObj = NULL;
484✔
311
  SDnodeObj       *pDnode = NULL;
484✔
312
  SMCreateQnodeReq createReq = {0};
484✔
313

314
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
484!
315

316
  mInfo("qnode:%d, start to create", createReq.dnodeId);
484!
317
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE), NULL, _OVER);
484✔
318

319
  pObj = mndAcquireQnode(pMnode, createReq.dnodeId);
483✔
320
  if (pObj != NULL) {
483✔
321
    code = TSDB_CODE_MND_QNODE_ALREADY_EXIST;
2✔
322
    goto _OVER;
2✔
323
  } else if (terrno != TSDB_CODE_MND_QNODE_NOT_EXIST) {
481!
324
    goto _OVER;
×
325
  }
326

327
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
481✔
328
  if (pDnode == NULL) {
481!
UNCOV
329
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
UNCOV
330
    goto _OVER;
×
331
  }
332

333
  code = mndCreateQnode(pMnode, pReq, pDnode, &createReq);
481✔
334
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
481!
335

336
  char obj[33] = {0};
481✔
337
  (void)tsnprintf(obj, sizeof(obj), "%d", createReq.dnodeId);
481✔
338

339
  auditRecord(pReq, pMnode->clusterId, "createQnode", "", obj, createReq.sql, createReq.sqlLen);
481✔
340
_OVER:
484✔
341
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
484!
342
    mError("qnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
3!
343
  }
344

345
  mndReleaseQnode(pMnode, pObj);
484✔
346
  mndReleaseDnode(pMnode, pDnode);
484✔
347
  tFreeSMCreateQnodeReq(&createReq);
484✔
348
  TAOS_RETURN(code);
484✔
349
}
350

351
static int32_t mndSetDropQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
9✔
352
  int32_t  code = 0;
9✔
353
  SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj);
9✔
354
  if (pRedoRaw == NULL) {
9!
355
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
356
    if (terrno != 0) code = terrno;
×
357
    TAOS_RETURN(code);
×
358
  }
359
  TAOS_CHECK_RETURN(mndTransAppendGroupRedolog(pTrans, pRedoRaw, -1));
9!
360
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
9!
361
  TAOS_RETURN(code);
9✔
362
}
363

364
static int32_t mndSetDropQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
9✔
365
  int32_t  code = 0;
9✔
366
  SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
9✔
367
  if (pCommitRaw == NULL) {
9!
368
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
369
    if (terrno != 0) code = terrno;
×
370
    TAOS_RETURN(code);
×
371
  }
372
  if (mndTransAppendCommitlog(pTrans, pCommitRaw))
9✔
373
    ;
374
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED))
9✔
375
    ;
376
  TAOS_RETURN(code);
9✔
377
}
378

379
static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
7✔
380
  int32_t        code = 0;
7✔
381
  SDDropQnodeReq dropReq = {0};
7✔
382
  dropReq.dnodeId = pDnode->id;
7✔
383

384
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq);
7✔
385
  void   *pReq = taosMemoryMalloc(contLen);
7!
386
  if (pReq == NULL) {
7!
387
    code = terrno;
×
388
    TAOS_RETURN(code);
×
389
  }
390
  code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
7✔
391
  if (code < 0) {
7!
392
    mError("qnode:%d, failed to serialize create drop qnode request since %s", dropReq.dnodeId, terrstr());
×
393
  }
394

395
  STransAction action = {0};
7✔
396
  action.epSet = mndGetDnodeEpset(pDnode);
7✔
397
  action.pCont = pReq;
7✔
398
  action.contLen = contLen;
7✔
399
  action.msgType = TDMT_DND_DROP_QNODE;
7✔
400
  action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED;
7✔
401
  action.groupId = -1;
7✔
402

403
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
7!
404
    taosMemoryFree(pReq);
×
405
    TAOS_RETURN(code);
×
406
  }
407

408
  TAOS_RETURN(code);
7✔
409
}
410

411
int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj, bool force) {
9✔
412
  if (pObj == NULL) return 0;
9!
413
  TAOS_CHECK_RETURN(mndSetDropQnodeRedoLogs(pTrans, pObj));
9!
414
  TAOS_CHECK_RETURN(mndSetDropQnodeCommitLogs(pTrans, pObj));
9!
415
  if (!force) {
9✔
416
    TAOS_CHECK_RETURN(mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj));
7!
417
  }
418
  return 0;
9✔
419
}
420

421
static int32_t mndDropQnode(SMnode *pMnode, SRpcMsg *pReq, SQnodeObj *pObj) {
6✔
422
  int32_t code = -1;
6✔
423

424
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-qnode");
6✔
425
  if (pTrans == NULL) {
6!
426
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
427
    if (terrno != 0) code = terrno;
×
428
    goto _OVER;
×
429
  }
430
  mndTransSetSerial(pTrans);
6✔
431

432
  mInfo("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
6!
433
  TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER);
6!
434
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
6!
435

436
  code = 0;
6✔
437

438
_OVER:
6✔
439
  mndTransDrop(pTrans);
6✔
440
  TAOS_RETURN(code);
6✔
441
}
442

443
static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
9✔
444
  SMnode        *pMnode = pReq->info.node;
9✔
445
  int32_t        code = -1;
9✔
446
  SQnodeObj     *pObj = NULL;
9✔
447
  SMDropQnodeReq dropReq = {0};
9✔
448

449
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
9!
450

451
  mInfo("qnode:%d, start to drop", dropReq.dnodeId);
9!
452
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE), NULL, _OVER);
9✔
453

454
  if (dropReq.dnodeId <= 0) {
8!
455
    code = TSDB_CODE_INVALID_MSG;
×
456
    goto _OVER;
×
457
  }
458

459
  pObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
8✔
460
  if (pObj == NULL) {
8✔
461
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2✔
462
    if (terrno != 0) code = terrno;
2!
463
    goto _OVER;
2✔
464
  }
465

466
  code = mndDropQnode(pMnode, pReq, pObj);
6✔
467
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
6!
468

469
  char obj[33] = {0};
6✔
470
  (void)tsnprintf(obj, sizeof(obj), "%d", dropReq.dnodeId);
6✔
471

472
  auditRecord(pReq, pMnode->clusterId, "dropQnode", "", obj, dropReq.sql, dropReq.sqlLen);
6✔
473

474
_OVER:
9✔
475
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
9!
476
    mError("qnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
3!
477
  }
478

479
  mndReleaseQnode(pMnode, pObj);
9✔
480
  tFreeSDDropQnodeReq(&dropReq);
9✔
481
  TAOS_RETURN(code);
9✔
482
}
483

484
int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit) {
55,771✔
485
  int32_t    code = 0;
55,771✔
486
  SSdb      *pSdb = pMnode->pSdb;
55,771✔
487
  void      *pIter = NULL;
55,771✔
488
  SQnodeObj *pObj = NULL;
55,771✔
489
  int32_t    numOfRows = 0;
55,771✔
490

491
  SArray *qnodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
55,771✔
492
  if (NULL == qnodeList) {
55,771!
493
    mError("failed to alloc epSet while process qnode list req");
×
494
    code = terrno;
×
495
    TAOS_RETURN(code);
×
496
  }
497

498
  while (1) {
24,213✔
499
    pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pObj);
79,984✔
500
    if (pIter == NULL) break;
79,984✔
501

502
    SQueryNodeLoad nodeLoad = {0};
24,213✔
503
    nodeLoad.addr.nodeId = QNODE_HANDLE;
24,213✔
504
    nodeLoad.addr.epSet.numOfEps = 1;
24,213✔
505
    tstrncpy(nodeLoad.addr.epSet.eps[0].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
24,213✔
506
    nodeLoad.addr.epSet.eps[0].port = pObj->pDnode->port;
24,213✔
507
    nodeLoad.load = QNODE_LOAD_VALUE(pObj);
24,213!
508

509
    if (taosArrayPush(qnodeList, &nodeLoad) == NULL) {
24,213!
510
      sdbRelease(pSdb, pObj);
×
511
      sdbCancelFetch(pSdb, pIter);
×
512
      if (terrno != 0) code = terrno;
×
513
      return code;
×
514
    }
515

516
    numOfRows++;
24,213✔
517
    sdbRelease(pSdb, pObj);
24,213✔
518

519
    if (limit > 0 && numOfRows >= limit) {
24,213!
520
      sdbCancelFetch(pSdb, pIter);
×
521
      break;
×
522
    }
523
  }
524

525
  *pList = qnodeList;
55,771✔
526

527
  return TSDB_CODE_SUCCESS;
55,771✔
528
}
529

530
static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) {
11,829✔
531
  int32_t       code = -1;
11,829✔
532
  SMnode       *pMnode = pReq->info.node;
11,829✔
533
  SQnodeListReq qlistReq = {0};
11,829✔
534
  SQnodeListRsp qlistRsp = {0};
11,829✔
535

536
  TAOS_CHECK_GOTO(tDeserializeSQnodeListReq(pReq->pCont, pReq->contLen, &qlistReq), NULL, _OVER);
11,829!
537

538
  TAOS_CHECK_GOTO(mndCreateQnodeList(pMnode, &qlistRsp.qnodeList, qlistReq.rowNum), NULL, _OVER);
11,829!
539

540
  int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp);
11,829✔
541
  void   *pRsp = rpcMallocCont(rspLen);
11,829✔
542
  if (pRsp == NULL) {
11,829!
543
    code = terrno;
×
544
    goto _OVER;
×
545
  }
546

547
  code = tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp);
11,829✔
548
  if (code < 0) {
11,829!
549
    mError("failed to serialize qnode list response since %s", terrstr());
×
550
  }
551

552
  pReq->info.rspLen = rspLen;
11,829✔
553
  pReq->info.rsp = pRsp;
11,829✔
554
  code = 0;
11,829✔
555

556
_OVER:
11,829✔
557
  tFreeSQnodeListRsp(&qlistRsp);
11,829✔
558
  TAOS_RETURN(code);
11,829✔
559
}
560

561
static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
22✔
562
  SMnode    *pMnode = pReq->info.node;
22✔
563
  SSdb      *pSdb = pMnode->pSdb;
22✔
564
  int32_t    numOfRows = 0;
22✔
565
  int32_t    cols = 0;
22✔
566
  SQnodeObj *pObj = NULL;
22✔
567
  char      *pWrite;
568

569
  while (numOfRows < rows) {
33!
570
    pShow->pIter = sdbFetch(pSdb, SDB_QNODE, pShow->pIter, (void **)&pObj);
33✔
571
    if (pShow->pIter == NULL) break;
33✔
572

573
    cols = 0;
11✔
574
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11✔
575
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false), pSdb, pObj);
11!
576

577
    char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
11✔
578
    STR_WITH_MAXSIZE_TO_VARSTR(ep, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
11✔
579
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11✔
580
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)ep, false), pSdb, pObj);
11!
581

582
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11✔
583
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false), pSdb,
11!
584
                                   pObj);
585

586
    numOfRows++;
11✔
587
    sdbRelease(pSdb, pObj);
11✔
588
  }
589

590
  pShow->numOfRows += numOfRows;
22✔
591

592
  return numOfRows;
22✔
593
}
594

595
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter) {
×
596
  SSdb *pSdb = pMnode->pSdb;
×
597
  sdbCancelFetchByType(pSdb, pIter, SDB_QNODE);
×
598
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc