• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4404

30 Jun 2025 02:45AM UTC coverage: 62.241% (-0.4%) from 62.635%
#4404

push

travis-ci

web-flow
Merge pull request #31480 from taosdata/docs/3.0/TD-34215

add stmt2 docs

153837 of 315978 branches covered (48.69%)

Branch coverage included in aggregate %.

238272 of 314005 relevant lines covered (75.88%)

6134648.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.04
/source/dnode/mgmt/node_mgmt/src/dmEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
// clang-format off
18
#include "dmMgmt.h"
19
#include "audit.h"
20
#include "libs/function/tudf.h"
21
#include "tgrant.h"
22
#include "tcompare.h"
23
#include "tcs.h"
24
#include "tanalytics.h"
25
// clang-format on
26

27
#define DM_INIT_AUDIT()                       \
28
  do {                                        \
29
    auditCfg.port = tsMonitorPort;            \
30
    auditCfg.server = tsMonitorFqdn;          \
31
    auditCfg.comp = tsMonitorComp;            \
32
    if ((code = auditInit(&auditCfg)) != 0) { \
33
      return code;                            \
34
    }                                         \
35
  } while (0)
36

37
static SDnode globalDnode = {0};
38

39
SDnode *dmInstance() { return &globalDnode; }
1,768,832✔
40

41
static int32_t dmCheckRepeatInit(SDnode *pDnode) {
2,675✔
42
  int32_t code = 0;
2,675✔
43
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
2,675!
44
    dError("env is already initialized");
×
45
    code = TSDB_CODE_REPEAT_INIT;
×
46
    return code;
×
47
  }
48
  return 0;
2,675✔
49
}
50

51
static int32_t dmInitSystem() {
2,675✔
52
  if (taosIgnSIGPIPE() != 0) {
2,675!
53
    dError("failed to ignore SIGPIPE");
×
54
  }
55

56
  if (taosBlockSIGPIPE() != 0) {
2,675!
57
    dError("failed to block SIGPIPE");
×
58
  }
59

60
  taosResolveCRC();
2,675✔
61
  return 0;
2,675✔
62
}
63

64
static int32_t dmInitMonitor() {
2,675✔
65
  int32_t code = 0;
2,675✔
66
  SMonCfg monCfg = {0};
2,675✔
67

68
  monCfg.maxLogs = tsMonitorMaxLogs;
2,675✔
69
  monCfg.port = tsMonitorPort;
2,675✔
70
  monCfg.server = tsMonitorFqdn;
2,675✔
71
  monCfg.comp = tsMonitorComp;
2,675✔
72
  if ((code = monInit(&monCfg)) != 0) {
2,675!
73
    dError("failed to init monitor since %s", tstrerror(code));
×
74
  }
75
  return code;
2,675✔
76
}
77

78
static int32_t dmInitMetrics() {
2,675✔
79
  int32_t code = 0;
2,675✔
80
  if ((code = initMetricsManager()) != 0) {
2,675!
81
    dError("failed to init metrics since %s", tstrerror(code));
×
82
  }
83
  return code;
2,675✔
84
}
85

86
static int32_t dmInitAudit() {
2,675✔
87
  SAuditCfg auditCfg = {0};
2,675✔
88
  int32_t   code = 0;
2,675✔
89

90
  DM_INIT_AUDIT();
2,675!
91

92
  return 0;
2,675✔
93
}
94

95
static bool dmDataSpaceAvailable() {
2,675✔
96
  SDnode *pDnode = dmInstance();
2,675✔
97
  if (pDnode->pTfs) {
2,675!
98
    return tfsDiskSpaceAvailable(pDnode->pTfs, 0);
2,675✔
99
  }
100
  if (!osDataSpaceAvailable()) {
×
101
    dError("data disk space unavailable, i.e. %s", tsDataDir);
×
102
    return false;
×
103
  }
104
  return true;
×
105
}
106

107
static int32_t dmCheckDiskSpace() {
2,675✔
108
  // availability
109
  int32_t code = 0;
2,675✔
110
  code = osUpdate();
2,675✔
111
  if (code != 0) {
2,675!
112
    dError("failed to update os info since %s", tstrerror(code));
×
113
    code = 0;  // ignore the error, just log it
×
114
  }
115
  if (!dmDataSpaceAvailable()) {
2,675!
116
    code = TSDB_CODE_NO_DISKSPACE;
×
117
    return code;
×
118
  }
119
  if (!osLogSpaceAvailable()) {
2,675!
120
    dError("log disk space unavailable, i.e. %s", tsLogDir);
×
121
    code = TSDB_CODE_NO_DISKSPACE;
×
122
    return code;
×
123
  }
124
  if (!osTempSpaceAvailable()) {
2,675!
125
    dError("temp disk space unavailable, i.e. %s", tsTempDir);
×
126
    code = TSDB_CODE_NO_DISKSPACE;
×
127
    return code;
×
128
  }
129
  return code;
2,675✔
130
}
131

132
int32_t dmDiskInit() {
2,684✔
133
  SDnode  *pDnode = dmInstance();
2,684✔
134
  SDiskCfg dCfg = {.level = 0, .primary = 1, .disable = 0};
2,684✔
135
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
2,684✔
136
  SDiskCfg *pDisks = tsDiskCfg;
2,684✔
137
  int32_t   numOfDisks = tsDiskCfgNum;
2,684✔
138
  if (numOfDisks <= 0 || pDisks == NULL) {
2,684!
139
    pDisks = &dCfg;
8✔
140
    numOfDisks = 1;
8✔
141
  }
142

143
  int32_t code = tfsOpen(pDisks, numOfDisks, &pDnode->pTfs);
2,684✔
144
  if (code != 0) {
2,684✔
145
    dError("failed to init tfs since %s", tstrerror(code));
9!
146
    TAOS_RETURN(code);
9✔
147
  }
148
  return 0;
2,675✔
149
}
150

151
int32_t dmDiskClose() {
2,666✔
152
  SDnode *pDnode = dmInstance();
2,666✔
153
  tfsClose(pDnode->pTfs);
2,666✔
154
  pDnode->pTfs = NULL;
2,666✔
155
  return 0;
2,666✔
156
}
157

158
static bool dmCheckDataDirVersion() {
2,675✔
159
  char checkDataDirJsonFileName[PATH_MAX] = {0};
2,675✔
160
  snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
2,675✔
161
  if (taosCheckExistFile(checkDataDirJsonFileName)) {
2,675!
162
    dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!",
×
163
           tsDataDir);
164
    return false;
×
165
  }
166
  return true;
2,675✔
167
}
168

169
static int32_t dmCheckDataDirVersionWrapper() {
×
170
  if (!dmCheckDataDirVersion()) {
×
171
    return TSDB_CODE_INVALID_DATA_FMT;
×
172
  }
173
  return 0;
×
174
}
175

176
int32_t dmInit() {
2,684✔
177
  dInfo("start to init dnode env");
2,684!
178
  int32_t code = 0;
2,684✔
179
  if ((code = dmDiskInit()) != 0) return code;
2,684✔
180
  if (!dmCheckDataDirVersion()) {
2,675!
181
    code = TSDB_CODE_INVALID_DATA_FMT;
×
182
    return code;
×
183
  }
184
  if ((code = dmCheckDiskSpace()) != 0) return code;
2,675!
185
  if ((code = dmCheckRepeatInit(dmInstance())) != 0) return code;
2,675!
186
  if ((code = dmInitSystem()) != 0) return code;
2,675!
187
  if ((code = dmInitMonitor()) != 0) return code;
2,675!
188
  if ((code = dmInitMetrics()) != 0) return code;
2,675!
189
  if ((code = dmInitAudit()) != 0) return code;
2,675!
190
  if ((code = dmInitDnode(dmInstance())) != 0) return code;
2,675✔
191
  if ((code = InitRegexCache() != 0)) return code;
2,666!
192
#if defined(USE_S3)
193
  if ((code = tcsInit()) != 0) return code;
2,666!
194
#endif
195

196
  dInfo("dnode env is initialized");
2,666!
197
  return 0;
2,666✔
198
}
199

200
static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
2,674✔
201
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
2,674✔
202
    dError("dnode env is already cleaned up");
8!
203
    return -1;
8✔
204
  }
205
  return 0;
2,666✔
206
}
207

208
void dmCleanup() {
2,674✔
209
  dDebug("start to cleanup dnode env");
2,674✔
210
  SDnode *pDnode = dmInstance();
2,674✔
211
  if (dmCheckRepeatCleanup(pDnode) != 0) return;
2,674✔
212
  dmCleanupDnode(pDnode);
2,666✔
213
  monCleanup();
2,666✔
214
  auditCleanup();
2,666✔
215
  syncCleanUp();
2,666✔
216
  walCleanUp();
2,666✔
217
  cleanupMetrics();
2,666✔
218
  if (udfcClose() != 0) {
2,666!
219
    dError("failed to close udfc");
×
220
  }
221
  udfStopUdfd();
2,666✔
222
  taosAnalyticsCleanup();
2,666✔
223
  taosStopCacheRefreshWorker();
2,666✔
224
  (void)dmDiskClose();
2,666✔
225
  DestroyRegexCache();
2,666✔
226

227
#if defined(USE_S3)
228
  tcsUninit();
2,666✔
229
#endif
230

231
  dInfo("dnode env is cleaned up");
2,666!
232

233
  taosMemPoolClose(gMemPoolHandle);
2,666✔
234
  taosCleanupCfg();
2,666✔
235
  taosCloseLog();
2,666✔
236
}
237

238
void dmStop() {
2,666✔
239
  SDnode *pDnode = dmInstance();
2,666✔
240
  pDnode->stop = true;
2,666✔
241
}
2,666✔
242

243
int32_t dmRun() {
2,666✔
244
  SDnode *pDnode = dmInstance();
2,666✔
245
  return dmRunDnode(pDnode);
2,666✔
246
}
247

248
static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
614✔
249
  int32_t code = 0;
614✔
250
  SDnode *pDnode = dmInstance();
614✔
251

252
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
614✔
253
  if (pWrapper != NULL) {
614✔
254
    dmReleaseWrapper(pWrapper);
2✔
255
    switch (ntype) {
2!
256
      case MNODE:
×
257
        code = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
×
258
        break;
×
259
      case QNODE:
2✔
260
        code = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
2✔
261
        break;
2✔
262
      case SNODE:
×
263
        code = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
×
264
        break;
×
265
      case BNODE:
×
266
        code = TSDB_CODE_BNODE_ALREADY_DEPLOYED;
×
267
        break;
×
268
      default:
×
269
        code = TSDB_CODE_APP_ERROR;
×
270
    }
271
    dError("failed to create node since %s", tstrerror(code));
2!
272
    return code;
2✔
273
  }
274

275
  dInfo("start to process create-node-request");
612!
276

277
  pWrapper = &pDnode->wrappers[ntype];
612✔
278
  if (taosMkDir(pWrapper->path) != 0) {
612!
279
    dmReleaseWrapper(pWrapper);
×
280
    code = terrno;
×
281
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
282
    return code;
×
283
  }
284

285
  (void)taosThreadMutexLock(&pDnode->mutex);
612✔
286
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
612✔
287

288
  dInfo("node:%s, start to create", pWrapper->name);
612!
289
  code = (*pWrapper->func.createFp)(&input, pMsg);
612✔
290
  if (code != 0) {
612✔
291
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
1!
292
  } else {
293
    dInfo("node:%s, has been created", pWrapper->name);
611!
294
    code = dmOpenNode(pWrapper);
611✔
295
    if (code == 0) {
611!
296
      code = dmStartNode(pWrapper);
611✔
297
    }
298
    pWrapper->deployed = true;
611✔
299
    pWrapper->required = true;
611✔
300
  }
301

302
  (void)taosThreadMutexUnlock(&pDnode->mutex);
612✔
303
  return code;
612✔
304
}
305

306
static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
848✔
307
  int32_t code = 0;
848✔
308
  SDnode *pDnode = dmInstance();
848✔
309

310
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
848✔
311
  if (pWrapper == NULL) {
848!
312
    dError("fail to process alter node type since node not exist");
×
313
    return TSDB_CODE_INVALID_MSG;
×
314
  }
315
  dmReleaseWrapper(pWrapper);
848✔
316

317
  dInfo("node:%s, start to process alter-node-type-request", pWrapper->name);
848!
318

319
  pWrapper = &pDnode->wrappers[ntype];
848✔
320

321
  if (pWrapper->func.nodeRoleFp != NULL) {
848!
322
    ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
848✔
323
    dInfo("node:%s, checking node role:%d", pWrapper->name, role);
848!
324
    if (role == TAOS_SYNC_ROLE_VOTER) {
848!
325
      dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
×
326
      code = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
×
327
      return code;
×
328
    }
329
  }
330

331
  if (pWrapper->func.isCatchUpFp != NULL) {
848!
332
    dInfo("node:%s, checking node catch up", pWrapper->name);
848!
333
    if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) {
848✔
334
      code = TSDB_CODE_MNODE_NOT_CATCH_UP;
741✔
335
      return code;
741✔
336
    }
337
  }
338

339
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name);
107!
340

341
  (void)taosThreadMutexLock(&pDnode->mutex);
107✔
342

343
  dInfo("node:%s, stopping node", pWrapper->name);
107!
344
  dmStopNode(pWrapper);
107✔
345
  dInfo("node:%s, closing node", pWrapper->name);
107!
346
  dmCloseNode(pWrapper);
107✔
347

348
  pWrapper = &pDnode->wrappers[ntype];
107✔
349
  if (taosMkDir(pWrapper->path) != 0) {
107!
350
    (void)taosThreadMutexUnlock(&pDnode->mutex);
×
351
    code = terrno;
×
352
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
353
    return code;
×
354
  }
355

356
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
107✔
357

358
  dInfo("node:%s, start to create", pWrapper->name);
107!
359
  code = (*pWrapper->func.createFp)(&input, pMsg);
107✔
360
  if (code != 0) {
107!
361
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
×
362
  } else {
363
    dInfo("node:%s, has been created", pWrapper->name);
107!
364
    code = dmOpenNode(pWrapper);
107✔
365
    if (code == 0) {
107!
366
      code = dmStartNode(pWrapper);
107✔
367
    }
368
    pWrapper->deployed = true;
107✔
369
    pWrapper->required = true;
107✔
370
  }
371

372
  (void)taosThreadMutexUnlock(&pDnode->mutex);
107✔
373
  return code;
107✔
374
}
375

376
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
25✔
377
  int32_t code = 0;
25✔
378
  SDnode *pDnode = dmInstance();
25✔
379

380
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
25✔
381
  if (pWrapper == NULL) {
25✔
382
    switch (ntype) {
2!
383
      case MNODE:
×
384
        code = TSDB_CODE_MNODE_NOT_DEPLOYED;
×
385
        break;
×
386
      case QNODE:
2✔
387
        code = TSDB_CODE_QNODE_NOT_DEPLOYED;
2✔
388
        break;
2✔
389
      case SNODE:
×
390
        code = TSDB_CODE_SNODE_NOT_DEPLOYED;
×
391
        break;
×
392
      case BNODE:
×
393
        code = TSDB_CODE_BNODE_NOT_DEPLOYED;
×
394
        break;
×
395
      default:
×
396
        code = TSDB_CODE_APP_ERROR;
×
397
    }
398

399
    dError("failed to drop node since %s", tstrerror(code));
2!
400
    return terrno = code;
2✔
401
  }
402

403
  (void)taosThreadMutexLock(&pDnode->mutex);
23✔
404
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
23✔
405

406
  dInfo("node:%s, start to drop", pWrapper->name);
23!
407
  code = (*pWrapper->func.dropFp)(&input, pMsg);
23✔
408
  if (code != 0) {
23!
409
    dError("node:%s, failed to drop since %s", pWrapper->name, tstrerror(code));
×
410
  } else {
411
    dInfo("node:%s, has been dropped", pWrapper->name);
23!
412
    pWrapper->required = false;
23✔
413
    pWrapper->deployed = false;
23✔
414
  }
415

416
  dmReleaseWrapper(pWrapper);
23✔
417

418
  if (code == 0) {
23!
419
    dmStopNode(pWrapper);
23✔
420
    dmCloseNode(pWrapper);
23✔
421
    taosRemoveDir(pWrapper->path);
23✔
422
  }
423
  (void)taosThreadMutexUnlock(&pDnode->mutex);
23✔
424
  return code;
23✔
425
}
426

427
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
24,804✔
428
  SMgmtInputOpt opt = {
24,804✔
429
      .path = pWrapper->path,
24,804✔
430
      .name = pWrapper->name,
24,804✔
431
      .pTfs = pWrapper->pDnode->pTfs,
24,804✔
432
      .pData = &pWrapper->pDnode->data,
24,804✔
433
      .processCreateNodeFp = dmProcessCreateNodeReq,
434
      .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
435
      .processDropNodeFp = dmProcessDropNodeReq,
436
      .sendMonitorReportFp = dmSendMonitorReport,
437
      .sendMetricsReportFp = dmSendMetricsReport,
438
      .monitorCleanExpiredSamplesFp = dmMonitorCleanExpiredSamples,
439
      .metricsCleanExpiredSamplesFp = dmMetricsCleanExpiredSamples,
440
      .sendAuditRecordFp = auditSendRecordsInBatch,
441
      .getVnodeLoadsFp = dmGetVnodeLoads,
442
      .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
443
      .getMnodeLoadsFp = dmGetMnodeLoads,
444
      .getQnodeLoadsFp = dmGetQnodeLoads,
445
      .stopDnodeFp = dmStop,
446
  };
447

448
  opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
24,804✔
449
  return opt;
24,804✔
450
}
451

452
void dmReportStartup(const char *pName, const char *pDesc) {
153,182✔
453
  SStartupInfo *pStartup = &(dmInstance()->startup);
153,182✔
454
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
153,181✔
455
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
153,181✔
456
  dDebug("step:%s, %s", pStartup->name, pStartup->desc);
153,181✔
457
}
153,182✔
458

459
int64_t dmGetClusterId() { return globalDnode.data.clusterId; }
×
460

461
bool dmReadyForTest() { return dmInstance()->data.dnodeVer > 0; }
32✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc