• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4506

15 Jul 2025 12:33AM UTC coverage: 62.026% (-0.7%) from 62.706%
#4506

push

travis-ci

web-flow
docs: update stream docs (#31874)

155391 of 320094 branches covered (48.55%)

Branch coverage included in aggregate %.

240721 of 318525 relevant lines covered (75.57%)

6529048.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.92
/source/dnode/mgmt/node_mgmt/src/dmEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
// clang-format off
18
#include "dmMgmt.h"
19
#include "audit.h"
20
#include "libs/function/tudf.h"
21
#include "metrics.h"
22
#include "tgrant.h"
23
#include "tcompare.h"
24
#include "tss.h"
25
#include "tanalytics.h"
26
// clang-format on
27

28
#define DM_INIT_AUDIT()                       \
29
  do {                                        \
30
    auditCfg.port = tsMonitorPort;            \
31
    auditCfg.server = tsMonitorFqdn;          \
32
    auditCfg.comp = tsMonitorComp;            \
33
    if ((code = auditInit(&auditCfg)) != 0) { \
34
      return code;                            \
35
    }                                         \
36
  } while (0)
37

38
static SDnode globalDnode = {0};
39

40
SDnode *dmInstance() { return &globalDnode; }
1,986,941✔
41

42
static int32_t dmCheckRepeatInit(SDnode *pDnode) {
3,076✔
43
  int32_t code = 0;
3,076✔
44
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
3,076!
45
    dError("env is already initialized");
×
46
    code = TSDB_CODE_REPEAT_INIT;
×
47
    return code;
×
48
  }
49
  return 0;
3,076✔
50
}
51

52
static int32_t dmInitSystem() {
3,076✔
53
  if (taosIgnSIGPIPE() != 0) {
3,076!
54
    dError("failed to ignore SIGPIPE");
×
55
  }
56

57
  if (taosBlockSIGPIPE() != 0) {
3,076!
58
    dError("failed to block SIGPIPE");
×
59
  }
60

61
  taosResolveCRC();
3,076✔
62
  return 0;
3,076✔
63
}
64

65
static int32_t dmInitMonitor() {
3,076✔
66
  int32_t code = 0;
3,076✔
67
  SMonCfg monCfg = {0};
3,076✔
68

69
  monCfg.maxLogs = tsMonitorMaxLogs;
3,076✔
70
  monCfg.port = tsMonitorPort;
3,076✔
71
  monCfg.server = tsMonitorFqdn;
3,076✔
72
  monCfg.comp = tsMonitorComp;
3,076✔
73
  if ((code = monInit(&monCfg)) != 0) {
3,076!
74
    dError("failed to init monitor since %s", tstrerror(code));
×
75
  }
76
  return code;
3,076✔
77
}
78

79
static int32_t dmInitMetrics() {
3,076✔
80
  int32_t code = 0;
3,076✔
81
  if ((code = initMetricsManager()) != 0) {
3,076!
82
    dError("failed to init metrics since %s", tstrerror(code));
×
83
  }
84
  return code;
3,076✔
85
}
86

87
static int32_t dmInitAudit() {
3,076✔
88
  SAuditCfg auditCfg = {0};
3,076✔
89
  int32_t   code = 0;
3,076✔
90

91
  DM_INIT_AUDIT();
3,076!
92

93
  return 0;
3,076✔
94
}
95

96
static bool dmDataSpaceAvailable() {
3,076✔
97
  SDnode *pDnode = dmInstance();
3,076✔
98
  if (pDnode->pTfs) {
3,076!
99
    return tfsDiskSpaceAvailable(pDnode->pTfs, 0);
3,076✔
100
  }
101
  if (!osDataSpaceAvailable()) {
×
102
    dError("data disk space unavailable, i.e. %s", tsDataDir);
×
103
    return false;
×
104
  }
105
  return true;
×
106
}
107

108
static int32_t dmCheckDiskSpace() {
3,076✔
109
  // availability
110
  int32_t code = 0;
3,076✔
111
  code = osUpdate();
3,076✔
112
  if (code != 0) {
3,076!
113
    dError("failed to update os info since %s", tstrerror(code));
×
114
    code = 0;  // ignore the error, just log it
×
115
  }
116
  if (!dmDataSpaceAvailable()) {
3,076!
117
    code = TSDB_CODE_NO_DISKSPACE;
×
118
    return code;
×
119
  }
120
  if (!osLogSpaceAvailable()) {
3,076!
121
    dError("log disk space unavailable, i.e. %s", tsLogDir);
×
122
    code = TSDB_CODE_NO_DISKSPACE;
×
123
    return code;
×
124
  }
125
  if (!osTempSpaceAvailable()) {
3,076!
126
    dError("temp disk space unavailable, i.e. %s", tsTempDir);
×
127
    code = TSDB_CODE_NO_DISKSPACE;
×
128
    return code;
×
129
  }
130
  return code;
3,076✔
131
}
132

133
int32_t dmDiskInit() {
3,085✔
134
  SDnode  *pDnode = dmInstance();
3,085✔
135
  SDiskCfg dCfg = {.level = 0, .primary = 1, .disable = 0};
3,085✔
136
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
3,085✔
137
  SDiskCfg *pDisks = tsDiskCfg;
3,085✔
138
  int32_t   numOfDisks = tsDiskCfgNum;
3,085✔
139
  if (numOfDisks <= 0 || pDisks == NULL) {
3,085!
140
    pDisks = &dCfg;
8✔
141
    numOfDisks = 1;
8✔
142
  }
143

144
  int32_t code = tfsOpen(pDisks, numOfDisks, &pDnode->pTfs);
3,085✔
145
  if (code != 0) {
3,085✔
146
    dError("failed to init tfs since %s", tstrerror(code));
9!
147
    TAOS_RETURN(code);
9✔
148
  }
149
  return 0;
3,076✔
150
}
151

152
int32_t dmDiskClose() {
3,068✔
153
  SDnode *pDnode = dmInstance();
3,068✔
154
  tfsClose(pDnode->pTfs);
3,068✔
155
  pDnode->pTfs = NULL;
3,068✔
156
  return 0;
3,068✔
157
}
158

159
static bool dmCheckDataDirVersion() {
3,076✔
160
  char checkDataDirJsonFileName[PATH_MAX] = {0};
3,076✔
161
  snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
3,076✔
162
  if (taosCheckExistFile(checkDataDirJsonFileName)) {
3,076!
163
    dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!",
×
164
           tsDataDir);
165
    return false;
×
166
  }
167
  return true;
3,076✔
168
}
169

170
static int32_t dmCheckDataDirVersionWrapper() {
×
171
  if (!dmCheckDataDirVersion()) {
×
172
    return TSDB_CODE_INVALID_DATA_FMT;
×
173
  }
174
  return 0;
×
175
}
176

177
int32_t dmInit() {
3,085✔
178
  dInfo("start to init dnode env");
3,085!
179
  int32_t code = 0;
3,085✔
180

181
#ifdef USE_SHARED_STORAGE
182
  if (tsSsEnabled) {
3,085!
183
    if ((code = tssInit()) != 0) return code;
×
184
    if ((code = tssCreateDefaultInstance()) != 0) return code;
×
185
  }
186
#endif
187

188
  if ((code = dmDiskInit()) != 0) return code;
3,085✔
189
  if (!dmCheckDataDirVersion()) {
3,076!
190
    code = TSDB_CODE_INVALID_DATA_FMT;
×
191
    return code;
×
192
  }
193
  if ((code = dmCheckDiskSpace()) != 0) return code;
3,076!
194
  if ((code = dmCheckRepeatInit(dmInstance())) != 0) return code;
3,076!
195
  if ((code = dmInitSystem()) != 0) return code;
3,076!
196
  if ((code = dmInitMonitor()) != 0) return code;
3,076!
197
  if ((code = dmInitMetrics()) != 0) return code;
3,076!
198
  if ((code = dmInitAudit()) != 0) return code;
3,076!
199
  if ((code = dmInitDnode(dmInstance())) != 0) return code;
3,076✔
200
  if ((code = InitRegexCache() != 0)) return code;
3,068!
201

202
  dInfo("dnode env is initialized");
3,068!
203
  return 0;
3,068✔
204
}
205

206
static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
3,076✔
207
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
3,076✔
208
    dError("dnode env is already cleaned up");
8!
209
    return -1;
8✔
210
  }
211
  return 0;
3,068✔
212
}
213

214
void dmCleanup() {
3,076✔
215
  dDebug("start to cleanup dnode env");
3,076✔
216
  SDnode *pDnode = dmInstance();
3,076✔
217
  if (dmCheckRepeatCleanup(pDnode) != 0) return;
3,076✔
218
  dmCleanupDnode(pDnode);
3,068✔
219
  monCleanup();
3,068✔
220
  auditCleanup();
3,068✔
221
  syncCleanUp();
3,068✔
222
  walCleanUp();
3,068✔
223
  cleanupMetrics();
3,068✔
224
  if (udfcClose() != 0) {
3,068!
225
    dError("failed to close udfc");
×
226
  }
227
  udfStopUdfd();
3,068✔
228
  taosAnalyticsCleanup();
3,068✔
229
  taosStopCacheRefreshWorker();
3,068✔
230
  (void)dmDiskClose();
3,068✔
231
  DestroyRegexCache();
3,068✔
232

233
#ifdef USE_SHARED_STORAGE
234
  if (tsSsEnabled) {
3,068!
235
    tssCloseDefaultInstance();
×
236
    tssUninit();
×
237
  }
238
#endif
239

240
  dInfo("dnode env is cleaned up");
3,068!
241

242
  taosMemPoolClose(gMemPoolHandle);
3,068✔
243
  taosCleanupCfg();
3,068✔
244
  taosCloseLog();
3,068✔
245
}
246

247
void dmStop() {
3,068✔
248
  SDnode *pDnode = dmInstance();
3,068✔
249
  pDnode->stop = true;
3,068✔
250
}
3,068✔
251

252
int32_t dmRun() {
3,068✔
253
  SDnode *pDnode = dmInstance();
3,068✔
254
  return dmRunDnode(pDnode);
3,068✔
255
}
256

257
static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
707✔
258
  int32_t code = 0;
707✔
259
  SDnode *pDnode = dmInstance();
707✔
260

261
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
707✔
262
  if (pWrapper != NULL) {
707✔
263
    dmReleaseWrapper(pWrapper);
2✔
264
    switch (ntype) {
2!
265
      case MNODE:
×
266
        code = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
×
267
        break;
×
268
      case QNODE:
2✔
269
        code = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
2✔
270
        break;
2✔
271
      case SNODE:
×
272
        code = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
×
273
        break;
×
274
      case BNODE:
×
275
        code = TSDB_CODE_BNODE_ALREADY_DEPLOYED;
×
276
        break;
×
277
      default:
×
278
        code = TSDB_CODE_APP_ERROR;
×
279
    }
280
    dError("failed to create node since %s", tstrerror(code));
2!
281
    return code;
2✔
282
  }
283

284
  dInfo("start to process create-node-request");
705!
285

286
  pWrapper = &pDnode->wrappers[ntype];
705✔
287
  if (taosMkDir(pWrapper->path) != 0) {
705!
288
    dmReleaseWrapper(pWrapper);
×
289
    code = terrno;
×
290
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
291
    return code;
×
292
  }
293

294
  (void)taosThreadMutexLock(&pDnode->mutex);
705✔
295
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
705✔
296

297
  dInfo("node:%s, start to create", pWrapper->name);
705!
298
  code = (*pWrapper->func.createFp)(&input, pMsg);
705✔
299
  if (code != 0) {
705✔
300
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
1!
301
  } else {
302
    dInfo("node:%s, has been created", pWrapper->name);
704!
303
    code = dmOpenNode(pWrapper);
704✔
304
    if (code == 0) {
704!
305
      code = dmStartNode(pWrapper);
704✔
306
    }
307
    pWrapper->deployed = true;
704✔
308
    pWrapper->required = true;
704✔
309
  }
310

311
  (void)taosThreadMutexUnlock(&pDnode->mutex);
705✔
312
  return code;
705✔
313
}
314

315
static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
1,068✔
316
  int32_t code = 0;
1,068✔
317
  SDnode *pDnode = dmInstance();
1,068✔
318

319
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
1,068✔
320
  if (pWrapper == NULL) {
1,068!
321
    dError("fail to process alter node type since node not exist");
×
322
    return TSDB_CODE_INVALID_MSG;
×
323
  }
324
  dmReleaseWrapper(pWrapper);
1,068✔
325

326
  dInfo("node:%s, start to process alter-node-type-request", pWrapper->name);
1,068!
327

328
  pWrapper = &pDnode->wrappers[ntype];
1,068✔
329

330
  if (pWrapper->func.nodeRoleFp != NULL) {
1,068!
331
    ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
1,068✔
332
    dInfo("node:%s, checking node role:%d", pWrapper->name, role);
1,068!
333
    if (role == TAOS_SYNC_ROLE_VOTER) {
1,068!
334
      dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
×
335
      code = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
×
336
      return code;
×
337
    }
338
  }
339

340
  if (pWrapper->func.isCatchUpFp != NULL) {
1,068!
341
    dInfo("node:%s, checking node catch up", pWrapper->name);
1,068!
342
    if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) {
1,068✔
343
      code = TSDB_CODE_MNODE_NOT_CATCH_UP;
954✔
344
      return code;
954✔
345
    }
346
  }
347

348
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name);
114!
349

350
  (void)taosThreadMutexLock(&pDnode->mutex);
114✔
351

352
  dInfo("node:%s, stopping node", pWrapper->name);
114!
353
  dmStopNode(pWrapper);
114✔
354
  dInfo("node:%s, closing node", pWrapper->name);
114!
355
  dmCloseNode(pWrapper);
114✔
356

357
  pWrapper = &pDnode->wrappers[ntype];
114✔
358
  if (taosMkDir(pWrapper->path) != 0) {
114!
359
    (void)taosThreadMutexUnlock(&pDnode->mutex);
×
360
    code = terrno;
×
361
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
362
    return code;
×
363
  }
364

365
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
114✔
366

367
  dInfo("node:%s, start to create", pWrapper->name);
114!
368
  code = (*pWrapper->func.createFp)(&input, pMsg);
114✔
369
  if (code != 0) {
114!
370
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
×
371
  } else {
372
    dInfo("node:%s, has been created", pWrapper->name);
114!
373
    code = dmOpenNode(pWrapper);
114✔
374
    if (code == 0) {
114!
375
      code = dmStartNode(pWrapper);
114✔
376
    }
377
    pWrapper->deployed = true;
114✔
378
    pWrapper->required = true;
114✔
379
  }
380

381
  (void)taosThreadMutexUnlock(&pDnode->mutex);
114✔
382
  return code;
114✔
383
}
384

385
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
111✔
386
  int32_t code = 0;
111✔
387
  SDnode *pDnode = dmInstance();
111✔
388

389
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
111✔
390
  if (pWrapper == NULL) {
111✔
391
    switch (ntype) {
2!
392
      case MNODE:
×
393
        code = TSDB_CODE_MNODE_NOT_DEPLOYED;
×
394
        break;
×
395
      case QNODE:
2✔
396
        code = TSDB_CODE_QNODE_NOT_DEPLOYED;
2✔
397
        break;
2✔
398
      case SNODE:
×
399
        code = TSDB_CODE_SNODE_NOT_DEPLOYED;
×
400
        break;
×
401
      case BNODE:
×
402
        code = TSDB_CODE_BNODE_NOT_DEPLOYED;
×
403
        break;
×
404
      default:
×
405
        code = TSDB_CODE_APP_ERROR;
×
406
    }
407

408
    dError("failed to drop node since %s", tstrerror(code));
2!
409
    return terrno = code;
2✔
410
  }
411

412
  (void)taosThreadMutexLock(&pDnode->mutex);
109✔
413
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
109✔
414

415
  dInfo("node:%s, start to drop", pWrapper->name);
109!
416
  code = (*pWrapper->func.dropFp)(&input, pMsg);
109✔
417
  if (code != 0) {
109!
418
    dError("node:%s, failed to drop since %s", pWrapper->name, tstrerror(code));
×
419
  } else {
420
    dInfo("node:%s, has been dropped", pWrapper->name);
109!
421
    pWrapper->required = false;
109✔
422
    pWrapper->deployed = false;
109✔
423
  }
424

425
  dmReleaseWrapper(pWrapper);
109✔
426

427
  if (code == 0) {
109!
428
    dmStopNode(pWrapper);
109✔
429
    dmCloseNode(pWrapper);
109✔
430
    taosRemoveDir(pWrapper->path);
109✔
431
  }
432
  (void)taosThreadMutexUnlock(&pDnode->mutex);
109✔
433
  return code;
109✔
434
}
435

436
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
28,605✔
437
  SMgmtInputOpt opt = {
28,605✔
438
      .path = pWrapper->path,
28,605✔
439
      .name = pWrapper->name,
28,605✔
440
      .pTfs = pWrapper->pDnode->pTfs,
28,605✔
441
      .pData = &pWrapper->pDnode->data,
28,605✔
442
      .processCreateNodeFp = dmProcessCreateNodeReq,
443
      .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
444
      .processDropNodeFp = dmProcessDropNodeReq,
445
      .sendMonitorReportFp = dmSendMonitorReport,
446
      .sendMetricsReportFp = dmSendMetricsReport,
447
      .monitorCleanExpiredSamplesFp = dmMonitorCleanExpiredSamples,
448
      .metricsCleanExpiredSamplesFp = dmMetricsCleanExpiredSamples,
449
      .sendAuditRecordFp = auditSendRecordsInBatch,
450
      .getVnodeLoadsFp = dmGetVnodeLoads,
451
      .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
452
      .getMnodeLoadsFp = dmGetMnodeLoads,
453
      .getQnodeLoadsFp = dmGetQnodeLoads,
454
      .stopDnodeFp = dmStop,
455
  };
456

457
  opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
28,605✔
458
  return opt;
28,605✔
459
}
460

461
void dmReportStartup(const char *pName, const char *pDesc) {
182,208✔
462
  SStartupInfo *pStartup = &(dmInstance()->startup);
182,208✔
463
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
182,208✔
464
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
182,208✔
465
  dDebug("step:%s, %s", pStartup->name, pStartup->desc);
182,208✔
466
}
182,208✔
467

468
int64_t dmGetClusterId() { return globalDnode.data.clusterId; }
×
469

470
bool dmReadyForTest() { return dmInstance()->data.dnodeVer > 0; }
32✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc