• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.13
/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "dmNodes.h"
19
#include "index.h"
20
#include "qworker.h"
21
#include "tcompression.h"
22
#include "tconv.h"
23
#include "tglobal.h"
24
#include "tgrant.h"
25
#include "tstream.h"
26

27
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
14,658✔
28
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
14,658✔
29

30
  bool    required = false;
14,658✔
31
  int32_t code = (*pWrapper->func.requiredFp)(&input, &required);
14,658✔
32
  if (!required) {
14,658✔
33
    dDebug("node:%s, does not require startup", pWrapper->name);
7,919✔
34
  } else {
35
    dDebug("node:%s, required to startup", pWrapper->name);
6,739✔
36
  }
37

38
  return required;
14,658✔
39
}
40

41
int32_t dmInitDnode(SDnode *pDnode) {
2,443✔
42
  dDebug("start to create dnode");
2,443✔
43
  int32_t code = -1;
2,443✔
44
  char    path[PATH_MAX + 100] = {0};
2,443✔
45

46
  if ((code = dmInitVarsWrapper(pDnode)) != 0) {
2,443!
47
    goto _OVER;
×
48
  }
49

50
  // compress module init
51
  tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor);
2,443✔
52

53
  pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
2,443✔
54
  pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
2,443✔
55
  pDnode->wrappers[VNODE].func = vmGetMgmtFunc();
2,443✔
56
  pDnode->wrappers[QNODE].func = qmGetMgmtFunc();
2,443✔
57
  pDnode->wrappers[SNODE].func = smGetMgmtFunc();
2,443✔
58
  pDnode->wrappers[BNODE].func = bmGetMgmtFunc();
2,443✔
59

60
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
17,101✔
61
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
14,658✔
62
    pWrapper->pDnode = pDnode;
14,658✔
63
    pWrapper->name = dmNodeName(ntype);
14,658✔
64
    pWrapper->ntype = ntype;
14,658✔
65
    (void)taosThreadRwlockInit(&pWrapper->lock, NULL);
14,658✔
66

67
    snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
14,658✔
68
    pWrapper->path = taosStrdup(path);
14,658!
69
    if (pWrapper->path == NULL) {
14,658!
70
      code = terrno;
×
71
      goto _OVER;
×
72
    }
73

74
    pWrapper->required = dmRequireNode(pDnode, pWrapper);
14,658✔
75
  }
76

77
  code = dmCheckRunning(tsDataDir, &pDnode->lockfile);
2,443✔
78
  if (code != 0) {
2,443✔
79
    goto _OVER;
2✔
80
  }
81

82
  if ((code = dmInitModule(pDnode)) != 0) {
2,441✔
83
    goto _OVER;
2✔
84
  }
85

86
  indexInit(tsNumOfCommitThreads);
2,439✔
87
  streamMetaInit();
2,439✔
88

89
  if ((code = dmInitStatusClient(pDnode)) != 0) {
2,439!
90
    goto _OVER;
×
91
  }
92
  if ((code = dmInitSyncClient(pDnode)) != 0) {
2,439!
93
    goto _OVER;
×
94
  }
95

96
  dmReportStartup("dnode-transport", "initialized");
2,439✔
97
  dDebug("dnode is created, ptr:%p", pDnode);
2,439✔
98
  code = 0;
2,439✔
99

100
_OVER:
2,443✔
101
  if (code != 0 && pDnode != NULL) {
2,443!
102
    dmClearVars(pDnode);
4✔
103
    pDnode = NULL;
4✔
104
    dError("failed to create dnode since %s", tstrerror(code));
4!
105
  }
106

107
  return code;
2,443✔
108
}
109

110
void dmCleanupDnode(SDnode *pDnode) {
2,439✔
111
  if (pDnode == NULL) {
2,439!
112
    return;
×
113
  }
114

115
  dmCleanupClient(pDnode);
2,439✔
116
  dmCleanupStatusClient(pDnode);
2,439✔
117
  dmCleanupSyncClient(pDnode);
2,439✔
118
  dmCleanupServer(pDnode);
2,439✔
119

120
  dmClearVars(pDnode);
2,439✔
121
  rpcCleanup();
2,439✔
122
  streamMetaCleanup();
2,439✔
123
  indexCleanup();
2,439✔
124
  taosConvDestroy();
2,439✔
125

126
  // compress destroy
127
  tsCompressExit();
2,439✔
128

129
  dDebug("dnode is closed, ptr:%p", pDnode);
2,439✔
130
}
131

132
int32_t dmInitVarsWrapper(SDnode *pDnode) {
2,443✔
133
  int32_t code = dmInitVars(pDnode);
2,443✔
134
  if (code == -1) {
2,443!
135
    return terrno;
×
136
  }
137
  return 0;
2,443✔
138
}
139
int32_t dmInitVars(SDnode *pDnode) {
2,443✔
140
  int32_t     code = 0;
2,443✔
141
  SDnodeData *pData = &pDnode->data;
2,443✔
142
  pData->dnodeId = 0;
2,443✔
143
  pData->clusterId = 0;
2,443✔
144
  pData->dnodeVer = 0;
2,443✔
145
  pData->engineVer = 0;
2,443✔
146
  pData->updateTime = 0;
2,443✔
147
  pData->rebootTime = taosGetTimestampMs();
2,443✔
148
  pData->dropped = 0;
2,443✔
149
  pData->stopped = 0;
2,443✔
150
  char *machineId = NULL;
2,443✔
151
  code = tGetMachineId(&machineId);
2,443✔
152
  if (machineId) {
2,443!
153
    tstrncpy(pData->machineId, machineId, TSDB_MACHINE_ID_LEN + 1);
2,443✔
154
    taosMemoryFreeClear(machineId);
2,443!
155
  } else {
156
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
157
    code = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
158
    return terrno = code;
×
159
#endif
160
  }
161

162
  pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
2,443✔
163
  if (pData->dnodeHash == NULL) {
2,443!
164
    dError("failed to init dnode hash");
×
165
    return terrno;
×
166
  }
167

168
  if ((code = dmReadEps(pData)) != 0) {
2,443!
169
    dError("failed to read file since %s", tstrerror(code));
×
170
    return code;
×
171
  }
172

173
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
174
  tsiEncryptAlgorithm = pData->encryptAlgorigthm;
2,443✔
175
  tsiEncryptScope = pData->encryptScope;
2,443✔
176
  /*
177
  if(tsiEncryptAlgorithm != 0) {
178
    if(pData->machineId != NULL && strlen(pData->machineId) > 0){
179
      dInfo("get crypt key at startup, machineId:%s", pData->machineId);
180
      int32_t code = 0;
181

182
      //code = taosGetCryptKey(tsAuthCode, pData->machineId, tsCryptKey);
183
      code = 0;
184
      tstrncpy(tsEncryptKey, tsAuthCode, 16);
185

186
      if (code != 0) {
187
        if(code == -1){
188
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
189
          dError("machine code changed, can't get crypt key");
190
        }
191
        if(code == -2){
192
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
193
          dError("failed to get crypt key");
194
        }
195
        return -1;
196
      }
197

198
      if(strlen(tsEncryptKey) == 0){
199
        terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
200
        dError("failed to get crypt key at startup since key is null, machineId:%s", pData->machineId);
201
        return -1;
202
      }
203
    }
204
    else{
205
      terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
206
      dError("failed to get crypt key at startup, machineId:%s", pData->machineId);
207
      return -1;
208
    }
209
  }
210
  */
211
#endif
212

213
  if (pData->dropped) {
2,443!
214
    dError("dnode will not start since its already dropped");
×
215
    return -1;
×
216
  }
217

218
  (void)taosThreadRwlockInit(&pData->lock, NULL);
2,443✔
219
  (void)taosThreadMutexInit(&pData->statusInfolock, NULL);
2,443✔
220
  (void)taosThreadMutexInit(&pDnode->mutex, NULL);
2,443✔
221
  return 0;
2,443✔
222
}
223

224
extern SMonVloadInfo tsVinfo;
225

226
void dmClearVars(SDnode *pDnode) {
2,443✔
227
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
17,101✔
228
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
14,658✔
229
    taosMemoryFreeClear(pWrapper->path);
14,658!
230
    (void)taosThreadRwlockDestroy(&pWrapper->lock);
14,658✔
231
  }
232
  if (pDnode->lockfile != NULL) {
2,443✔
233
    if (taosUnLockFile(pDnode->lockfile) != 0) {
2,442!
234
      dError("failed to unlock file");
×
235
    }
236

237
    (void)taosCloseFile(&pDnode->lockfile);
2,442✔
238
    pDnode->lockfile = NULL;
2,442✔
239
  }
240

241
  SDnodeData *pData = &pDnode->data;
2,443✔
242
  (void)taosThreadRwlockWrlock(&pData->lock);
2,443✔
243
  if (pData->oldDnodeEps != NULL) {
2,443!
244
    if (dmWriteEps(pData) == 0) {
×
245
      dmRemoveDnodePairs(pData);
×
246
    }
247
    taosArrayDestroy(pData->oldDnodeEps);
×
248
    pData->oldDnodeEps = NULL;
×
249
  }
250
  if (pData->dnodeEps != NULL) {
2,443!
251
    taosArrayDestroy(pData->dnodeEps);
2,443✔
252
    pData->dnodeEps = NULL;
2,443✔
253
  }
254
  if (pData->dnodeHash != NULL) {
2,443!
255
    taosHashCleanup(pData->dnodeHash);
2,443✔
256
    pData->dnodeHash = NULL;
2,443✔
257
  }
258
  (void)taosThreadRwlockUnlock(&pData->lock);
2,443✔
259

260
  (void)taosThreadRwlockDestroy(&pData->lock);
2,443✔
261

262
  dDebug("begin to lock status info when thread exit");
2,443✔
263
  if (taosThreadMutexLock(&pData->statusInfolock) != 0) {
2,443!
264
    dError("failed to lock status info lock");
×
265
    return;
×
266
  }
267
  if (tsVinfo.pVloads != NULL) {
2,443✔
268
    taosArrayDestroy(tsVinfo.pVloads);
396✔
269
    tsVinfo.pVloads = NULL;
396✔
270
  }
271
  if (taosThreadMutexUnlock(&pData->statusInfolock) != 0) {
2,443!
272
    dError("failed to unlock status info lock");
×
273
    return;
×
274
  }
275
  if (taosThreadMutexDestroy(&pData->statusInfolock) != 0) {
2,443!
276
    dError("failed to destroy status info lock");
×
277
  }
278
  memset(&pData->statusInfolock, 0, sizeof(pData->statusInfolock));
2,443✔
279

280
  (void)taosThreadMutexDestroy(&pDnode->mutex);
2,443✔
281
  memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
2,443✔
282
}
283

284
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
4,878✔
285
  if (pDnode->status != status) {
4,878!
286
    dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
4,878✔
287
    pDnode->status = status;
4,878✔
288
  }
289
}
4,878✔
290

291
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
1,509✔
292
  SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
1,509✔
293
  SMgmtWrapper *pRetWrapper = pWrapper;
1,509✔
294

295
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
1,509✔
296
  if (pWrapper->deployed) {
1,509✔
297
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
912✔
298
    // dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
299
  } else {
300
    pRetWrapper = NULL;
597✔
301
  }
302
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
1,509✔
303

304
  return pRetWrapper;
1,509✔
305
}
306

307
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
10,935,289✔
308
  int32_t code = 0;
10,935,289✔
309

310
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
10,935,289✔
311
  if (pWrapper->deployed) {
10,948,456✔
312
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
10,838,281✔
313
    // dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
314
  } else {
315
    switch (pWrapper->ntype) {
110,175!
316
      case MNODE:
34,239✔
317
        code = TSDB_CODE_MNODE_NOT_FOUND;
34,239✔
318
        break;
34,239✔
319
      case QNODE:
73,379✔
320
        code = TSDB_CODE_QNODE_NOT_FOUND;
73,379✔
321
        break;
73,379✔
322
      case SNODE:
23✔
323
        code = TSDB_CODE_SNODE_NOT_FOUND;
23✔
324
        break;
23✔
NEW
325
      case BNODE:
×
NEW
326
        code = TSDB_CODE_BNODE_NOT_FOUND;
×
NEW
327
        break;
×
328
      case VNODE:
2,534✔
329
        code = TSDB_CODE_VND_STOPPED;
2,534✔
330
        break;
2,534✔
331
      default:
×
332
        code = TSDB_CODE_APP_IS_STOPPING;
×
333
        break;
×
334
    }
335
  }
336
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
10,949,255✔
337

338
  return code;
10,939,505✔
339
}
340

341
void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
10,909,044✔
342
  if (pWrapper == NULL) return;
10,909,044✔
343

344
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
10,835,642✔
345
  int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
10,840,278✔
346
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
10,840,299✔
347
  // dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
348
}
349

350
static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
×
351
  SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt;
×
352
  pStatus->details[0] = 0;
×
353

354
  if (pDnode->status == DND_STAT_INIT) {
×
355
    pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
×
356
    snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
×
357
  } else if (pDnode->status == DND_STAT_STOPPED) {
×
358
    pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
×
359
  } else {
360
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
×
361
  }
362
}
×
363

364
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
×
365
  dDebug("msg:%p, net test req will be processed", pMsg);
×
366

367
  SRpcMsg rsp = {.info = pMsg->info};
×
368
  rsp.pCont = rpcMallocCont(pMsg->contLen);
×
369
  if (rsp.pCont == NULL) {
×
370
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
371
  } else {
372
    rsp.contLen = pMsg->contLen;
×
373
  }
374

375
  if (rpcSendResponse(&rsp) != 0) {
×
376
    dError("failed to send response, msg:%p", &rsp);
×
377
  }
378
  rpcFreeCont(pMsg->pCont);
×
379
}
×
380

381
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
×
382
  dDebug("msg:%p, server startup status req will be processed", pMsg);
×
383

384
  SServerStatusRsp statusRsp = {0};
×
385
  dmGetServerStartupStatus(pDnode, &statusRsp);
×
386

387
  SRpcMsg rsp = {.info = pMsg->info};
×
388
  int32_t contLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
×
389
  if (contLen < 0) {
×
390
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
391
  } else {
392
    rsp.pCont = rpcMallocCont(contLen);
×
393
    if (rsp.pCont != NULL) {
×
394
      if (tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp) < 0) {
×
395
        rsp.code = TSDB_CODE_APP_ERROR;
×
396
      } else {
397
        rsp.contLen = contLen;
×
398
      }
399
    }
400
  }
401

402
  if (rpcSendResponse(&rsp) != 0) {
×
403
    dError("failed to send response, msg:%p", &rsp);
×
404
  }
405
  rpcFreeCont(pMsg->pCont);
×
406
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc