• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4488

12 Jul 2025 07:47AM UTC coverage: 62.207% (-0.7%) from 62.948%
#4488

push

travis-ci

web-flow
docs: update stream docs (#31822)

157961 of 324087 branches covered (48.74%)

Branch coverage included in aggregate %.

244465 of 322830 relevant lines covered (75.73%)

6561668.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.04
/source/dnode/mgmt/node_mgmt/src/dmEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
// clang-format off
18
#include "dmMgmt.h"
19
#include "audit.h"
20
#include "libs/function/tudf.h"
21
#include "metrics.h"
22
#include "tgrant.h"
23
#include "tcompare.h"
24
#include "tcs.h"
25
#include "tanalytics.h"
26
// clang-format on
27

28
#define DM_INIT_AUDIT()                       \
29
  do {                                        \
30
    auditCfg.port = tsMonitorPort;            \
31
    auditCfg.server = tsMonitorFqdn;          \
32
    auditCfg.comp = tsMonitorComp;            \
33
    if ((code = auditInit(&auditCfg)) != 0) { \
34
      return code;                            \
35
    }                                         \
36
  } while (0)
37

38
static SDnode globalDnode = {0};
39

40
SDnode *dmInstance() { return &globalDnode; }
1,939,028✔
41

42
static int32_t dmCheckRepeatInit(SDnode *pDnode) {
3,063✔
43
  int32_t code = 0;
3,063✔
44
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
3,063!
45
    dError("env is already initialized");
×
46
    code = TSDB_CODE_REPEAT_INIT;
×
47
    return code;
×
48
  }
49
  return 0;
3,063✔
50
}
51

52
static int32_t dmInitSystem() {
3,063✔
53
  if (taosIgnSIGPIPE() != 0) {
3,063!
54
    dError("failed to ignore SIGPIPE");
×
55
  }
56

57
  if (taosBlockSIGPIPE() != 0) {
3,063!
58
    dError("failed to block SIGPIPE");
×
59
  }
60

61
  taosResolveCRC();
3,063✔
62
  return 0;
3,063✔
63
}
64

65
static int32_t dmInitMonitor() {
3,063✔
66
  int32_t code = 0;
3,063✔
67
  SMonCfg monCfg = {0};
3,063✔
68

69
  monCfg.maxLogs = tsMonitorMaxLogs;
3,063✔
70
  monCfg.port = tsMonitorPort;
3,063✔
71
  monCfg.server = tsMonitorFqdn;
3,063✔
72
  monCfg.comp = tsMonitorComp;
3,063✔
73
  if ((code = monInit(&monCfg)) != 0) {
3,063!
74
    dError("failed to init monitor since %s", tstrerror(code));
×
75
  }
76
  return code;
3,063✔
77
}
78

79
static int32_t dmInitMetrics() {
3,063✔
80
  int32_t code = 0;
3,063✔
81
  if ((code = initMetricsManager()) != 0) {
3,063!
82
    dError("failed to init metrics since %s", tstrerror(code));
×
83
  }
84
  return code;
3,063✔
85
}
86

87
static int32_t dmInitAudit() {
3,063✔
88
  SAuditCfg auditCfg = {0};
3,063✔
89
  int32_t   code = 0;
3,063✔
90

91
  DM_INIT_AUDIT();
3,063!
92

93
  return 0;
3,063✔
94
}
95

96
static bool dmDataSpaceAvailable() {
3,063✔
97
  SDnode *pDnode = dmInstance();
3,063✔
98
  if (pDnode->pTfs) {
3,063!
99
    return tfsDiskSpaceAvailable(pDnode->pTfs, 0);
3,063✔
100
  }
101
  if (!osDataSpaceAvailable()) {
×
102
    dError("data disk space unavailable, i.e. %s", tsDataDir);
×
103
    return false;
×
104
  }
105
  return true;
×
106
}
107

108
static int32_t dmCheckDiskSpace() {
3,063✔
109
  // availability
110
  int32_t code = 0;
3,063✔
111
  code = osUpdate();
3,063✔
112
  if (code != 0) {
3,063!
113
    dError("failed to update os info since %s", tstrerror(code));
×
114
    code = 0;  // ignore the error, just log it
×
115
  }
116
  if (!dmDataSpaceAvailable()) {
3,063!
117
    code = TSDB_CODE_NO_DISKSPACE;
×
118
    return code;
×
119
  }
120
  if (!osLogSpaceAvailable()) {
3,063!
121
    dError("log disk space unavailable, i.e. %s", tsLogDir);
×
122
    code = TSDB_CODE_NO_DISKSPACE;
×
123
    return code;
×
124
  }
125
  if (!osTempSpaceAvailable()) {
3,063!
126
    dError("temp disk space unavailable, i.e. %s", tsTempDir);
×
127
    code = TSDB_CODE_NO_DISKSPACE;
×
128
    return code;
×
129
  }
130
  return code;
3,063✔
131
}
132

133
int32_t dmDiskInit() {
3,072✔
134
  SDnode  *pDnode = dmInstance();
3,072✔
135
  SDiskCfg dCfg = {.level = 0, .primary = 1, .disable = 0};
3,072✔
136
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
3,072✔
137
  SDiskCfg *pDisks = tsDiskCfg;
3,072✔
138
  int32_t   numOfDisks = tsDiskCfgNum;
3,072✔
139
  if (numOfDisks <= 0 || pDisks == NULL) {
3,072!
140
    pDisks = &dCfg;
8✔
141
    numOfDisks = 1;
8✔
142
  }
143

144
  int32_t code = tfsOpen(pDisks, numOfDisks, &pDnode->pTfs);
3,072✔
145
  if (code != 0) {
3,072✔
146
    dError("failed to init tfs since %s", tstrerror(code));
9!
147
    TAOS_RETURN(code);
9✔
148
  }
149
  return 0;
3,063✔
150
}
151

152
int32_t dmDiskClose() {
3,056✔
153
  SDnode *pDnode = dmInstance();
3,056✔
154
  tfsClose(pDnode->pTfs);
3,056✔
155
  pDnode->pTfs = NULL;
3,056✔
156
  return 0;
3,056✔
157
}
158

159
static bool dmCheckDataDirVersion() {
3,063✔
160
  char checkDataDirJsonFileName[PATH_MAX] = {0};
3,063✔
161
  snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
3,063✔
162
  if (taosCheckExistFile(checkDataDirJsonFileName)) {
3,063!
163
    dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!",
×
164
           tsDataDir);
165
    return false;
×
166
  }
167
  return true;
3,063✔
168
}
169

170
static int32_t dmCheckDataDirVersionWrapper() {
×
171
  if (!dmCheckDataDirVersion()) {
×
172
    return TSDB_CODE_INVALID_DATA_FMT;
×
173
  }
174
  return 0;
×
175
}
176

177
int32_t dmInit() {
3,072✔
178
  dInfo("start to init dnode env");
3,072!
179
  int32_t code = 0;
3,072✔
180
  if ((code = dmDiskInit()) != 0) return code;
3,072✔
181
  if (!dmCheckDataDirVersion()) {
3,063!
182
    code = TSDB_CODE_INVALID_DATA_FMT;
×
183
    return code;
×
184
  }
185
  if ((code = dmCheckDiskSpace()) != 0) return code;
3,063!
186
  if ((code = dmCheckRepeatInit(dmInstance())) != 0) return code;
3,063!
187
  if ((code = dmInitSystem()) != 0) return code;
3,063!
188
  if ((code = dmInitMonitor()) != 0) return code;
3,063!
189
  if ((code = dmInitMetrics()) != 0) return code;
3,063!
190
  if ((code = dmInitAudit()) != 0) return code;
3,063!
191
  if ((code = dmInitDnode(dmInstance())) != 0) return code;
3,063✔
192
  if ((code = InitRegexCache() != 0)) return code;
3,056!
193
#if defined(USE_S3)
194
  if ((code = tcsInit()) != 0) return code;
3,056!
195
#endif
196

197
  dInfo("dnode env is initialized");
3,056!
198
  return 0;
3,056✔
199
}
200

201
static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
3,064✔
202
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
3,064✔
203
    dError("dnode env is already cleaned up");
8!
204
    return -1;
8✔
205
  }
206
  return 0;
3,056✔
207
}
208

209
void dmCleanup() {
3,064✔
210
  dDebug("start to cleanup dnode env");
3,064✔
211
  SDnode *pDnode = dmInstance();
3,064✔
212
  if (dmCheckRepeatCleanup(pDnode) != 0) return;
3,064✔
213
  dmCleanupDnode(pDnode);
3,056✔
214
  monCleanup();
3,056✔
215
  auditCleanup();
3,056✔
216
  syncCleanUp();
3,056✔
217
  walCleanUp();
3,056✔
218
  cleanupMetrics();
3,056✔
219
  if (udfcClose() != 0) {
3,056!
220
    dError("failed to close udfc");
×
221
  }
222
  udfStopUdfd();
3,056✔
223
  taosAnalyticsCleanup();
3,056✔
224
  taosStopCacheRefreshWorker();
3,056✔
225
  (void)dmDiskClose();
3,056✔
226
  DestroyRegexCache();
3,056✔
227

228
#if defined(USE_S3)
229
  tcsUninit();
3,056✔
230
#endif
231

232
  dInfo("dnode env is cleaned up");
3,056!
233

234
  taosMemPoolClose(gMemPoolHandle);
3,056✔
235
  taosCleanupCfg();
3,056✔
236
  taosCloseLog();
3,056✔
237
}
238

239
void dmStop() {
3,057✔
240
  SDnode *pDnode = dmInstance();
3,057✔
241
  pDnode->stop = true;
3,057✔
242
}
3,057✔
243

244
int32_t dmRun() {
3,056✔
245
  SDnode *pDnode = dmInstance();
3,056✔
246
  return dmRunDnode(pDnode);
3,056✔
247
}
248

249
static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
707✔
250
  int32_t code = 0;
707✔
251
  SDnode *pDnode = dmInstance();
707✔
252

253
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
707✔
254
  if (pWrapper != NULL) {
707✔
255
    dmReleaseWrapper(pWrapper);
2✔
256
    switch (ntype) {
2!
257
      case MNODE:
×
258
        code = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
×
259
        break;
×
260
      case QNODE:
2✔
261
        code = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
2✔
262
        break;
2✔
263
      case SNODE:
×
264
        code = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
×
265
        break;
×
266
      case BNODE:
×
267
        code = TSDB_CODE_BNODE_ALREADY_DEPLOYED;
×
268
        break;
×
269
      default:
×
270
        code = TSDB_CODE_APP_ERROR;
×
271
    }
272
    dError("failed to create node since %s", tstrerror(code));
2!
273
    return code;
2✔
274
  }
275

276
  dInfo("start to process create-node-request");
705!
277

278
  pWrapper = &pDnode->wrappers[ntype];
705✔
279
  if (taosMkDir(pWrapper->path) != 0) {
705!
280
    dmReleaseWrapper(pWrapper);
×
281
    code = terrno;
×
282
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
283
    return code;
×
284
  }
285

286
  (void)taosThreadMutexLock(&pDnode->mutex);
705✔
287
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
705✔
288

289
  dInfo("node:%s, start to create", pWrapper->name);
705!
290
  code = (*pWrapper->func.createFp)(&input, pMsg);
705✔
291
  if (code != 0) {
705✔
292
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
1!
293
  } else {
294
    dInfo("node:%s, has been created", pWrapper->name);
704!
295
    code = dmOpenNode(pWrapper);
704✔
296
    if (code == 0) {
704!
297
      code = dmStartNode(pWrapper);
704✔
298
    }
299
    pWrapper->deployed = true;
704✔
300
    pWrapper->required = true;
704✔
301
  }
302

303
  (void)taosThreadMutexUnlock(&pDnode->mutex);
705✔
304
  return code;
705✔
305
}
306

307
static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
783✔
308
  int32_t code = 0;
783✔
309
  SDnode *pDnode = dmInstance();
783✔
310

311
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
783✔
312
  if (pWrapper == NULL) {
783!
313
    dError("fail to process alter node type since node not exist");
×
314
    return TSDB_CODE_INVALID_MSG;
×
315
  }
316
  dmReleaseWrapper(pWrapper);
783✔
317

318
  dInfo("node:%s, start to process alter-node-type-request", pWrapper->name);
783!
319

320
  pWrapper = &pDnode->wrappers[ntype];
783✔
321

322
  if (pWrapper->func.nodeRoleFp != NULL) {
783!
323
    ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
783✔
324
    dInfo("node:%s, checking node role:%d", pWrapper->name, role);
783!
325
    if (role == TAOS_SYNC_ROLE_VOTER) {
783!
326
      dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
×
327
      code = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
×
328
      return code;
×
329
    }
330
  }
331

332
  if (pWrapper->func.isCatchUpFp != NULL) {
783!
333
    dInfo("node:%s, checking node catch up", pWrapper->name);
783!
334
    if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) {
783✔
335
      code = TSDB_CODE_MNODE_NOT_CATCH_UP;
668✔
336
      return code;
668✔
337
    }
338
  }
339

340
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name);
115!
341

342
  (void)taosThreadMutexLock(&pDnode->mutex);
115✔
343

344
  dInfo("node:%s, stopping node", pWrapper->name);
115!
345
  dmStopNode(pWrapper);
115✔
346
  dInfo("node:%s, closing node", pWrapper->name);
115!
347
  dmCloseNode(pWrapper);
115✔
348

349
  pWrapper = &pDnode->wrappers[ntype];
115✔
350
  if (taosMkDir(pWrapper->path) != 0) {
115!
351
    (void)taosThreadMutexUnlock(&pDnode->mutex);
×
352
    code = terrno;
×
353
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
354
    return code;
×
355
  }
356

357
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
115✔
358

359
  dInfo("node:%s, start to create", pWrapper->name);
115!
360
  code = (*pWrapper->func.createFp)(&input, pMsg);
115✔
361
  if (code != 0) {
115!
362
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
×
363
  } else {
364
    dInfo("node:%s, has been created", pWrapper->name);
115!
365
    code = dmOpenNode(pWrapper);
115✔
366
    if (code == 0) {
115!
367
      code = dmStartNode(pWrapper);
115✔
368
    }
369
    pWrapper->deployed = true;
115✔
370
    pWrapper->required = true;
115✔
371
  }
372

373
  (void)taosThreadMutexUnlock(&pDnode->mutex);
115✔
374
  return code;
115✔
375
}
376

377
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
111✔
378
  int32_t code = 0;
111✔
379
  SDnode *pDnode = dmInstance();
111✔
380

381
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
111✔
382
  if (pWrapper == NULL) {
111✔
383
    switch (ntype) {
2!
384
      case MNODE:
×
385
        code = TSDB_CODE_MNODE_NOT_DEPLOYED;
×
386
        break;
×
387
      case QNODE:
2✔
388
        code = TSDB_CODE_QNODE_NOT_DEPLOYED;
2✔
389
        break;
2✔
390
      case SNODE:
×
391
        code = TSDB_CODE_SNODE_NOT_DEPLOYED;
×
392
        break;
×
393
      case BNODE:
×
394
        code = TSDB_CODE_BNODE_NOT_DEPLOYED;
×
395
        break;
×
396
      default:
×
397
        code = TSDB_CODE_APP_ERROR;
×
398
    }
399

400
    dError("failed to drop node since %s", tstrerror(code));
2!
401
    return terrno = code;
2✔
402
  }
403

404
  (void)taosThreadMutexLock(&pDnode->mutex);
109✔
405
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
109✔
406

407
  dInfo("node:%s, start to drop", pWrapper->name);
109!
408
  code = (*pWrapper->func.dropFp)(&input, pMsg);
109✔
409
  if (code != 0) {
109!
410
    dError("node:%s, failed to drop since %s", pWrapper->name, tstrerror(code));
×
411
  } else {
412
    dInfo("node:%s, has been dropped", pWrapper->name);
109!
413
    pWrapper->required = false;
109✔
414
    pWrapper->deployed = false;
109✔
415
  }
416

417
  dmReleaseWrapper(pWrapper);
109✔
418

419
  if (code == 0) {
109!
420
    dmStopNode(pWrapper);
109✔
421
    dmCloseNode(pWrapper);
109✔
422
    taosRemoveDir(pWrapper->path);
109✔
423
  }
424
  (void)taosThreadMutexUnlock(&pDnode->mutex);
109✔
425
  return code;
109✔
426
}
427

428
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
28,493✔
429
  SMgmtInputOpt opt = {
28,493✔
430
      .path = pWrapper->path,
28,493✔
431
      .name = pWrapper->name,
28,493✔
432
      .pTfs = pWrapper->pDnode->pTfs,
28,493✔
433
      .pData = &pWrapper->pDnode->data,
28,493✔
434
      .processCreateNodeFp = dmProcessCreateNodeReq,
435
      .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
436
      .processDropNodeFp = dmProcessDropNodeReq,
437
      .sendMonitorReportFp = dmSendMonitorReport,
438
      .sendMetricsReportFp = dmSendMetricsReport,
439
      .monitorCleanExpiredSamplesFp = dmMonitorCleanExpiredSamples,
440
      .metricsCleanExpiredSamplesFp = dmMetricsCleanExpiredSamples,
441
      .sendAuditRecordFp = auditSendRecordsInBatch,
442
      .getVnodeLoadsFp = dmGetVnodeLoads,
443
      .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
444
      .getMnodeLoadsFp = dmGetMnodeLoads,
445
      .getQnodeLoadsFp = dmGetQnodeLoads,
446
      .stopDnodeFp = dmStop,
447
  };
448

449
  opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
28,493✔
450
  return opt;
28,493✔
451
}
452

453
void dmReportStartup(const char *pName, const char *pDesc) {
179,060✔
454
  SStartupInfo *pStartup = &(dmInstance()->startup);
179,060✔
455
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
179,060✔
456
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
179,060✔
457
  dDebug("step:%s, %s", pStartup->name, pStartup->desc);
179,060✔
458
}
179,059✔
459

460
int64_t dmGetClusterId() { return globalDnode.data.clusterId; }
×
461

462
bool dmReadyForTest() { return dmInstance()->data.dnodeVer > 0; }
32✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc