• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.15
/source/dnode/mgmt/node_mgmt/src/dmEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
// clang-format off
18
//#include "storageapi.h"
19
#include "dmMgmt.h"
20
#include "audit.h"
21
#include "libs/function/tudf.h"
22
#include "metrics.h"
23
#include "tgrant.h"
24
#include "tcompare.h"
25
#include "tss.h"
26
#include "tanalytics.h"
27
#include "stream.h"
28
// clang-format on
29

30
#define DM_INIT_AUDIT()                       \
31
  do {                                        \
32
    auditCfg.port = tsMonitorPort;            \
33
    auditCfg.server = tsMonitorFqdn;          \
34
    auditCfg.comp = tsMonitorComp;            \
35
    if ((code = auditInit(&auditCfg)) != 0) { \
36
      return code;                            \
37
    }                                         \
38
  } while (0)
39

40
static SDnode globalDnode = {0};
41

42
SDnode *dmInstance() { return &globalDnode; }
1,446,281✔
43

44
static int32_t dmCheckRepeatInit(SDnode *pDnode) {
2,392✔
45
  int32_t code = 0;
2,392✔
46
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
2,392!
47
    dError("env is already initialized");
×
48
    code = TSDB_CODE_REPEAT_INIT;
×
49
    return code;
×
50
  }
51
  return 0;
2,392✔
52
}
53

54
static int32_t dmInitSystem() {
2,392✔
55
  if (taosIgnSIGPIPE() != 0) {
2,392!
56
    dError("failed to ignore SIGPIPE");
×
57
  }
58

59
  if (taosBlockSIGPIPE() != 0) {
2,392!
60
    dError("failed to block SIGPIPE");
×
61
  }
62

63
  taosResolveCRC();
2,392✔
64
  return 0;
2,392✔
65
}
66

67
static int32_t dmInitMonitor() {
2,392✔
68
  int32_t code = 0;
2,392✔
69
  SMonCfg monCfg = {0};
2,392✔
70

71
  monCfg.maxLogs = tsMonitorMaxLogs;
2,392✔
72
  monCfg.port = tsMonitorPort;
2,392✔
73
  monCfg.server = tsMonitorFqdn;
2,392✔
74
  monCfg.comp = tsMonitorComp;
2,392✔
75
  if ((code = monInit(&monCfg)) != 0) {
2,392!
76
    dError("failed to init monitor since %s", tstrerror(code));
×
77
  }
78
  return code;
2,392✔
79
}
80

81
static int32_t dmInitMetrics() {
2,392✔
82
  int32_t code = 0;
2,392✔
83
  if ((code = initMetricsManager()) != 0) {
2,392!
84
    dError("failed to init metrics since %s", tstrerror(code));
×
85
  }
86
  return code;
2,392✔
87
}
88

89
static int32_t dmInitAudit() {
2,392✔
90
  SAuditCfg auditCfg = {0};
2,392✔
91
  int32_t   code = 0;
2,392✔
92

93
  DM_INIT_AUDIT();
2,392!
94

95
  return 0;
2,392✔
96
}
97

98
static bool dmDataSpaceAvailable() {
2,392✔
99
  SDnode *pDnode = dmInstance();
2,392✔
100
  if (pDnode->pTfs) {
2,392!
101
    return tfsDiskSpaceAvailable(pDnode->pTfs, 0);
2,392✔
102
  }
103
  if (!osDataSpaceAvailable()) {
×
104
    dError("data disk space unavailable, i.e. %s", tsDataDir);
×
105
    return false;
×
106
  }
107
  return true;
×
108
}
109

110
static int32_t dmCheckDiskSpace() {
2,392✔
111
  // availability
112
  int32_t code = 0;
2,392✔
113
  code = osUpdate();
2,392✔
114
  if (code != 0) {
2,392!
115
    dError("failed to update os info since %s", tstrerror(code));
×
116
    code = 0;  // ignore the error, just log it
×
117
  }
118
  if (!dmDataSpaceAvailable()) {
2,392!
119
    code = TSDB_CODE_NO_DISKSPACE;
×
120
    return code;
×
121
  }
122
  if (!osLogSpaceAvailable()) {
2,392!
123
    dError("log disk space unavailable, i.e. %s", tsLogDir);
×
124
    code = TSDB_CODE_NO_DISKSPACE;
×
125
    return code;
×
126
  }
127
  if (!osTempSpaceAvailable()) {
2,392!
128
    dError("temp disk space unavailable, i.e. %s", tsTempDir);
×
129
    code = TSDB_CODE_NO_DISKSPACE;
×
130
    return code;
×
131
  }
132
  return code;
2,392✔
133
}
134

135
int32_t dmDiskInit() {
2,407✔
136
  SDnode  *pDnode = dmInstance();
2,407✔
137
  SDiskCfg dCfg = {.level = 0, .primary = 1, .disable = 0};
2,407✔
138
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
2,407✔
139
  SDiskCfg *pDisks = tsDiskCfg;
2,407✔
140
  int32_t   numOfDisks = tsDiskCfgNum;
2,407✔
141
  if (numOfDisks <= 0 || pDisks == NULL) {
2,407!
142
    pDisks = &dCfg;
24✔
143
    numOfDisks = 1;
24✔
144
  }
145

146
  int32_t code = tfsOpen(pDisks, numOfDisks, &pDnode->pTfs);
2,407✔
147
  if (code != 0) {
2,407✔
148
    dError("failed to init tfs since %s", tstrerror(code));
15!
149
    TAOS_RETURN(code);
15✔
150
  }
151
  return 0;
2,392✔
152
}
153

154
int32_t dmDiskClose() {
2,391✔
155
  SDnode *pDnode = dmInstance();
2,391✔
156
  tfsClose(pDnode->pTfs);
2,391✔
157
  pDnode->pTfs = NULL;
2,391✔
158
  return 0;
2,391✔
159
}
160

161
static bool dmCheckDataDirVersion() {
2,392✔
162
  char checkDataDirJsonFileName[PATH_MAX] = {0};
2,392✔
163
  snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
2,392✔
164
  if (taosCheckExistFile(checkDataDirJsonFileName)) {
2,392!
165
    dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!",
×
166
           tsDataDir);
167
    return false;
×
168
  }
169
  return true;
2,392✔
170
}
171

172
static int32_t dmCheckDataDirVersionWrapper() {
×
173
  if (!dmCheckDataDirVersion()) {
×
174
    return TSDB_CODE_INVALID_DATA_FMT;
×
175
  }
176
  return 0;
×
177
}
178

179
int32_t dmInit() {
2,407✔
180
  dInfo("start to init dnode env");
2,407!
181
  int32_t code = 0;
2,407✔
182

183
#ifdef USE_SHARED_STORAGE
184
  if (tsSsEnabled) {
2,407!
185
    if ((code = tssInit()) != 0) return code;
×
186
    if ((code = tssCreateDefaultInstance()) != 0) return code;
×
187
  }
188
#endif
189

190
  if ((code = dmDiskInit()) != 0) return code;
2,407✔
191
  if (!dmCheckDataDirVersion()) {
2,392!
192
    code = TSDB_CODE_INVALID_DATA_FMT;
×
193
    return code;
×
194
  }
195
  SDnode* pDnode = dmInstance();
2,392✔
196
  if ((code = dmCheckDiskSpace()) != 0) return code;
2,392!
197
  if ((code = dmCheckRepeatInit(pDnode)) != 0) return code;
2,392!
198
  if ((code = dmInitSystem()) != 0) return code;
2,392!
199
  if ((code = dmInitMonitor()) != 0) return code;
2,392!
200
  if ((code = dmInitMetrics()) != 0) return code;
2,392!
201
  if ((code = dmInitAudit()) != 0) return code;
2,392!
202
  if ((code = dmInitDnode(pDnode)) != 0) return code;
2,392✔
203
  if ((code = InitRegexCache() != 0)) return code;
2,391!
204

205
  gExecInfoInit(&pDnode->data, (getDnodeId_f)dmGetDnodeId, dmGetMnodeEpSet);
2,391✔
206
  if ((code = streamInit(&pDnode->data, (getDnodeId_f)dmGetDnodeId, dmGetMnodeEpSet, dmGetSynEpset)) != 0) return code;
2,391!
207

208
  dInfo("dnode env is initialized");
2,391!
209
  return 0;
2,391✔
210
}
211

212
static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
2,415✔
213
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
2,415✔
214
    dError("dnode env is already cleaned up");
24!
215
    return -1;
24✔
216
  }
217
  return 0;
2,391✔
218
}
219

220
void dmCleanup() {
2,415✔
221
  dDebug("start to cleanup dnode env");
2,415✔
222
  SDnode *pDnode = dmInstance();
2,415✔
223
  if (dmCheckRepeatCleanup(pDnode) != 0) return;
2,415✔
224
  dmCleanupDnode(pDnode);
2,391✔
225
  monCleanup();
2,391✔
226
  auditCleanup();
2,391✔
227
  syncCleanUp();
2,391✔
228
  walCleanUp();
2,391✔
229
  cleanupMetrics();
2,391✔
230
  if (udfcClose() != 0) {
2,391!
231
    dError("failed to close udfc");
×
232
  }
233
  udfStopUdfd();
2,391✔
234
  taosAnalyticsCleanup();
2,391✔
235
  taosStopCacheRefreshWorker();
2,391✔
236
  (void)dmDiskClose();
2,391✔
237
  DestroyRegexCache();
2,391✔
238

239
#ifdef USE_SHARED_STORAGE
240
  if (tsSsEnabled) {
2,391!
241
    tssCloseDefaultInstance();
×
242
    tssUninit();
×
243
  }
244
#endif
245

246
  dInfo("dnode env is cleaned up");
2,391!
247

248
  taosMemPoolClose(gMemPoolHandle);
2,391✔
249
  taosCleanupCfg();
2,391✔
250
  taosCloseLog();
2,391✔
251
}
252

253
void dmStop() {
2,391✔
254
  SDnode *pDnode = dmInstance();
2,391✔
255
  pDnode->stop = true;
2,391✔
256
}
2,391✔
257

258
int32_t dmRun() {
2,391✔
259
  SDnode *pDnode = dmInstance();
2,391✔
260
  return dmRunDnode(pDnode);
2,391✔
261
}
262

263
static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
853✔
264
  int32_t code = 0;
853✔
265
  SDnode *pDnode = dmInstance();
853✔
266

267
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
853✔
268
  if (pWrapper != NULL) {
853✔
269
    dmReleaseWrapper(pWrapper);
6✔
270
    switch (ntype) {
6!
271
      case MNODE:
×
272
        code = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
×
273
        break;
×
274
      case QNODE:
6✔
275
        code = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
6✔
276
        break;
6✔
277
      case SNODE:
×
278
        code = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
×
279
        break;
×
280
      case BNODE:
×
281
        code = TSDB_CODE_BNODE_ALREADY_DEPLOYED;
×
282
        break;
×
283
      default:
×
284
        code = TSDB_CODE_APP_ERROR;
×
285
    }
286
    dError("failed to create node since %s", tstrerror(code));
6!
287
    return code;
6✔
288
  }
289

290
  dInfo("start to process create-node-request");
847!
291

292
  pWrapper = &pDnode->wrappers[ntype];
847✔
293

294
  if (taosMulMkDir(pWrapper->path) != 0) {
847!
295
    dmReleaseWrapper(pWrapper);
×
296
    code = terrno;
×
297
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
298
    return code;
×
299
  }
300

301
  dInfo("path %s created", pWrapper->path);
847!
302

303
  (void)taosThreadMutexLock(&pDnode->mutex);
847✔
304
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
847✔
305

306
  dInfo("node:%s, start to create", pWrapper->name);
847!
307
  code = (*pWrapper->func.createFp)(&input, pMsg);
847✔
308
  if (code != 0) {
847✔
309
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
5!
310
  } else {
311
    dInfo("node:%s, has been created", pWrapper->name);
842!
312
    code = dmOpenNode(pWrapper);
842✔
313
    if (code == 0) {
842!
314
      code = dmStartNode(pWrapper);
842✔
315
    }
316
    pWrapper->deployed = true;
842✔
317
    pWrapper->required = true;
842✔
318
  }
319

320
  (void)taosThreadMutexUnlock(&pDnode->mutex);
847✔
321
  return code;
847✔
322
}
323

324

325
static int32_t dmProcessAlterNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
411✔
326
  int32_t code = 0;
411✔
327
  if (SNODE != ntype) {
411!
328
    dError("failed to process msgType %d since node type is NOT snode", pMsg->msgType);
×
329
    return TSDB_CODE_INVALID_MSG;
×
330
  }
331
  
332
  SDnode *pDnode = dmInstance();
411✔
333
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
411✔
334

335
  dInfo("start to process alter-node-request");
411!
336

337
  pWrapper = &pDnode->wrappers[ntype];
411✔
338

339
  (void)taosThreadMutexLock(&pDnode->mutex);
411✔
340
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
411✔
341

342
  dInfo("node:%s, start to update", pWrapper->name);
411!
343
  code = (*pWrapper->func.createFp)(&input, pMsg);
411✔
344
  if (code != 0) {
411!
345
    dError("node:%s, failed to update since %s", pWrapper->name, tstrerror(code));
×
346
  } else {
347
    dInfo("node:%s, has been updated", pWrapper->name);
411!
348
  }
349

350
  (void)taosThreadMutexUnlock(&pDnode->mutex);
411✔
351

352
  dmReleaseWrapper(pWrapper);
411✔
353
  
354
  return code;
411✔
355
}
356

357

358
static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
1,242✔
359
  int32_t code = 0;
1,242✔
360
  SDnode *pDnode = dmInstance();
1,242✔
361

362
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
1,242✔
363
  if (pWrapper == NULL) {
1,242!
364
    dError("fail to process alter node type since node not exist");
×
365
    return TSDB_CODE_INVALID_MSG;
×
366
  }
367
  dmReleaseWrapper(pWrapper);
1,242✔
368

369
  dInfo("node:%s, start to process alter-node-type-request", pWrapper->name);
1,242!
370

371
  pWrapper = &pDnode->wrappers[ntype];
1,242✔
372

373
  if (pWrapper->func.nodeRoleFp != NULL) {
1,242!
374
    ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
1,242✔
375
    dInfo("node:%s, checking node role:%d", pWrapper->name, role);
1,242!
376
    if (role == TAOS_SYNC_ROLE_VOTER) {
1,242!
377
      dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
×
378
      code = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
×
379
      return code;
×
380
    }
381
  }
382

383
  if (pWrapper->func.isCatchUpFp != NULL) {
1,242!
384
    dInfo("node:%s, checking node catch up", pWrapper->name);
1,242!
385
    if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) {
1,242✔
386
      code = TSDB_CODE_MNODE_NOT_CATCH_UP;
1,149✔
387
      return code;
1,149✔
388
    }
389
  }
390

391
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name);
93!
392

393
  (void)taosThreadMutexLock(&pDnode->mutex);
93✔
394

395
  dInfo("node:%s, stopping node", pWrapper->name);
93!
396
  dmStopNode(pWrapper);
93✔
397
  dInfo("node:%s, closing node", pWrapper->name);
93!
398
  dmCloseNode(pWrapper);
93✔
399

400
  pWrapper = &pDnode->wrappers[ntype];
93✔
401
  if (taosMkDir(pWrapper->path) != 0) {
93!
402
    (void)taosThreadMutexUnlock(&pDnode->mutex);
×
403
    code = terrno;
×
404
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
405
    return code;
×
406
  }
407

408
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
93✔
409

410
  dInfo("node:%s, start to create", pWrapper->name);
93!
411
  code = (*pWrapper->func.createFp)(&input, pMsg);
93✔
412
  if (code != 0) {
93!
413
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
×
414
  } else {
415
    dInfo("node:%s, has been created", pWrapper->name);
93!
416
    code = dmOpenNode(pWrapper);
93✔
417
    if (code == 0) {
93!
418
      code = dmStartNode(pWrapper);
93✔
419
    }
420
    pWrapper->deployed = true;
93✔
421
    pWrapper->required = true;
93✔
422
  }
423

424
  (void)taosThreadMutexUnlock(&pDnode->mutex);
93✔
425
  return code;
93✔
426
}
427

428
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
246✔
429
  int32_t code = 0;
246✔
430
  SDnode *pDnode = dmInstance();
246✔
431

432
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
246✔
433
  if (pWrapper == NULL) {
246✔
434
    switch (ntype) {
6!
435
      case MNODE:
×
436
        code = TSDB_CODE_MNODE_NOT_DEPLOYED;
×
437
        break;
×
438
      case QNODE:
6✔
439
        code = TSDB_CODE_QNODE_NOT_DEPLOYED;
6✔
440
        break;
6✔
441
      case SNODE:
×
442
        code = TSDB_CODE_SNODE_NOT_DEPLOYED;
×
443
        break;
×
444
      case BNODE:
×
445
        code = TSDB_CODE_BNODE_NOT_DEPLOYED;
×
446
        break;
×
447
      default:
×
448
        code = TSDB_CODE_APP_ERROR;
×
449
    }
450

451
    dError("failed to drop node since %s", tstrerror(code));
6!
452
    return terrno = code;
6✔
453
  }
454

455
  (void)taosThreadMutexLock(&pDnode->mutex);
240✔
456
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
240✔
457

458
  dInfo("node:%s, start to drop", pWrapper->name);
240!
459
  code = (*pWrapper->func.dropFp)(&input, pMsg);
240✔
460
  if (code != 0) {
240!
461
    dError("node:%s, failed to drop since %s", pWrapper->name, tstrerror(code));
×
462
  } else {
463
    dInfo("node:%s, has been dropped", pWrapper->name);
240!
464
    pWrapper->required = false;
240✔
465
    pWrapper->deployed = false;
240✔
466
  }
467

468
  dmReleaseWrapper(pWrapper);
240✔
469

470
  if (code == 0) {
240!
471
    dmStopNode(pWrapper);
240✔
472
    dmCloseNode(pWrapper);
240✔
473
    taosRemoveDir(pWrapper->path);
240✔
474
  }
475
  (void)taosThreadMutexUnlock(&pDnode->mutex);
240✔
476
  return code;
240✔
477
}
478

479
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
23,452✔
480
  SMgmtInputOpt opt = {
23,452✔
481
      .path = pWrapper->path,
23,452✔
482
      .name = pWrapper->name,
23,452✔
483
      .pTfs = pWrapper->pDnode->pTfs,
23,452✔
484
      .pData = &pWrapper->pDnode->data,
23,452✔
485
      .processCreateNodeFp = dmProcessCreateNodeReq,
486
      .processAlterNodeFp = dmProcessAlterNodeReq,
487
      .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
488
      .processDropNodeFp = dmProcessDropNodeReq,
489
      .sendMonitorReportFp = dmSendMonitorReport,
490
      .sendMetricsReportFp = dmSendMetricsReport,
491
      .monitorCleanExpiredSamplesFp = dmMonitorCleanExpiredSamples,
492
      .metricsCleanExpiredSamplesFp = dmMetricsCleanExpiredSamples,
493
      .sendAuditRecordFp = auditSendRecordsInBatch,
494
      .getVnodeLoadsFp = dmGetVnodeLoads,
495
      .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
496
      .setVnodeSyncTimeoutFp = dmSetVnodeSyncTimeout,
497
      .getMnodeLoadsFp = dmGetMnodeLoads,
498
      .setMnodeSyncTimeoutFp = dmSetMnodeSyncTimeout,
499
      .getQnodeLoadsFp = dmGetQnodeLoads,
500
      .stopDnodeFp = dmStop,
501
  };
502

503
  opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
23,452✔
504
  return opt;
23,452✔
505
}
506

507
void dmReportStartup(const char *pName, const char *pDesc) {
146,584✔
508
  SStartupInfo *pStartup = &(dmInstance()->startup);
146,584✔
509
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
146,584✔
510
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
146,584✔
511
  dDebug("step:%s, %s", pStartup->name, pStartup->desc);
146,584✔
512
}
146,584✔
513

514
int64_t dmGetClusterId() { return globalDnode.data.clusterId; }
×
515

516
bool dmReadyForTest() { return dmInstance()->data.dnodeVer > 0; }
96✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc