• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.14
/source/dnode/mgmt/node_mgmt/src/dmEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
// clang-format off
18
#include "dmMgmt.h"
19
#include "audit.h"
20
#include "libs/function/tudf.h"
21
#include "tgrant.h"
22
#include "tcompare.h"
23
#include "tcs.h"
24
#include "tanalytics.h"
25
// clang-format on
26

27
#define DM_INIT_AUDIT()                       \
28
  do {                                        \
29
    auditCfg.port = tsMonitorPort;            \
30
    auditCfg.server = tsMonitorFqdn;          \
31
    auditCfg.comp = tsMonitorComp;            \
32
    if ((code = auditInit(&auditCfg)) != 0) { \
33
      return code;                            \
34
    }                                         \
35
  } while (0)
36

37
static SDnode globalDnode = {0};
38

39
SDnode *dmInstance() { return &globalDnode; }
6,377,042✔
40

41
static int32_t dmCheckRepeatInit(SDnode *pDnode) {
1,883✔
42
  int32_t code = 0;
1,883✔
43
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
1,883!
44
    dError("env is already initialized");
×
45
    code = TSDB_CODE_REPEAT_INIT;
×
46
    return code;
×
47
  }
48
  return 0;
1,883✔
49
}
50

51
static int32_t dmInitSystem() {
1,883✔
52
  if (taosIgnSIGPIPE() != 0) {
1,883!
53
    dError("failed to ignore SIGPIPE");
×
54
  }
55

56
  if (taosBlockSIGPIPE() != 0) {
1,883!
57
    dError("failed to block SIGPIPE");
×
58
  }
59

60
  taosResolveCRC();
1,883✔
61
  return 0;
1,883✔
62
}
63

64
static int32_t dmInitMonitor() {
1,883✔
65
  int32_t code = 0;
1,883✔
66
  SMonCfg monCfg = {0};
1,883✔
67

68
  monCfg.maxLogs = tsMonitorMaxLogs;
1,883✔
69
  monCfg.port = tsMonitorPort;
1,883✔
70
  monCfg.server = tsMonitorFqdn;
1,883✔
71
  monCfg.comp = tsMonitorComp;
1,883✔
72
  if ((code = monInit(&monCfg)) != 0) {
1,883!
73
    dError("failed to init monitor since %s", tstrerror(code));
×
74
  }
75
  return code;
1,883✔
76
}
77

78
static int32_t dmInitAudit() {
1,883✔
79
  SAuditCfg auditCfg = {0};
1,883✔
80
  int32_t   code = 0;
1,883✔
81

82
  DM_INIT_AUDIT();
1,883!
83

84
  return 0;
1,883✔
85
}
86

87
static bool dmDataSpaceAvailable() {
1,883✔
88
  SDnode *pDnode = dmInstance();
1,883✔
89
  if (pDnode->pTfs) {
1,883!
90
    return tfsDiskSpaceAvailable(pDnode->pTfs, 0);
1,883✔
91
  }
92
  if (!osDataSpaceAvailable()) {
×
93
    dError("data disk space unavailable, i.e. %s", tsDataDir);
×
94
    return false;
×
95
  }
96
  return true;
×
97
}
98

99
static int32_t dmCheckDiskSpace() {
1,883✔
100
  // availability
101
  int32_t code = 0;
1,883✔
102
  code = osUpdate();
1,883✔
103
  if (code != 0) {
1,883!
104
    code = 0;  // ignore the error, just log it
×
105
    dError("failed to update os info since %s", tstrerror(code));
×
106
  }
107
  if (!dmDataSpaceAvailable()) {
1,883!
108
    code = TSDB_CODE_NO_DISKSPACE;
×
109
    return code;
×
110
  }
111
  if (!osLogSpaceAvailable()) {
1,883!
112
    dError("log disk space unavailable, i.e. %s", tsLogDir);
×
113
    code = TSDB_CODE_NO_DISKSPACE;
×
114
    return code;
×
115
  }
116
  if (!osTempSpaceAvailable()) {
1,883!
117
    dError("temp disk space unavailable, i.e. %s", tsTempDir);
×
118
    code = TSDB_CODE_NO_DISKSPACE;
×
119
    return code;
×
120
  }
121
  return code;
1,883✔
122
}
123

124
int32_t dmDiskInit() {
1,883✔
125
  SDnode  *pDnode = dmInstance();
1,883✔
126
  SDiskCfg dCfg = {.level = 0, .primary = 1, .disable = 0};
1,883✔
127
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
1,883✔
128
  SDiskCfg *pDisks = tsDiskCfg;
1,883✔
129
  int32_t   numOfDisks = tsDiskCfgNum;
1,883✔
130
  if (numOfDisks <= 0 || pDisks == NULL) {
1,883!
131
    pDisks = &dCfg;
8✔
132
    numOfDisks = 1;
8✔
133
  }
134

135
  int32_t code = tfsOpen(pDisks, numOfDisks, &pDnode->pTfs);
1,883✔
136
  if (code != 0) {
1,883!
137
    dError("failed to init tfs since %s", tstrerror(code));
×
138
    TAOS_RETURN(code);
×
139
  }
140
  return 0;
1,883✔
141
}
142

143
int32_t dmDiskClose() {
1,879✔
144
  SDnode *pDnode = dmInstance();
1,879✔
145
  tfsClose(pDnode->pTfs);
1,879✔
146
  pDnode->pTfs = NULL;
1,879✔
147
  return 0;
1,879✔
148
}
149

150
static bool dmCheckDataDirVersion() {
1,883✔
151
  char checkDataDirJsonFileName[PATH_MAX] = {0};
1,883✔
152
  snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
1,883✔
153
  if (taosCheckExistFile(checkDataDirJsonFileName)) {
1,883!
154
    dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!",
×
155
           tsDataDir);
156
    return false;
×
157
  }
158
  return true;
1,883✔
159
}
160

161
static int32_t dmCheckDataDirVersionWrapper() {
×
162
  if (!dmCheckDataDirVersion()) {
×
163
    return TSDB_CODE_INVALID_DATA_FMT;
×
164
  }
165
  return 0;
×
166
}
167

168
int32_t dmInit() {
1,883✔
169
  dInfo("start to init dnode env");
1,883!
170
  int32_t code = 0;
1,883✔
171
  if ((code = dmDiskInit()) != 0) return code;
1,883!
172
  if (!dmCheckDataDirVersion()) {
1,883!
173
    code = TSDB_CODE_INVALID_DATA_FMT;
×
174
    return code;
×
175
  }
176
  if ((code = dmCheckDiskSpace()) != 0) return code;
1,883!
177
  if ((code = dmCheckRepeatInit(dmInstance())) != 0) return code;
1,883!
178
  if ((code = dmInitSystem()) != 0) return code;
1,883!
179
  if ((code = dmInitMonitor()) != 0) return code;
1,883!
180
  if ((code = dmInitAudit()) != 0) return code;
1,883!
181
  if ((code = dmInitDnode(dmInstance())) != 0) return code;
1,883✔
182
  if ((code = InitRegexCache() != 0)) return code;
1,879!
183
#if defined(USE_S3)
184
  if ((code = tcsInit()) != 0) return code;
1,879!
185
#endif
186

187
  dInfo("dnode env is initialized");
1,879!
188
  return 0;
1,879✔
189
}
190

191
static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
1,887✔
192
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
1,887✔
193
    dError("dnode env is already cleaned up");
8!
194
    return -1;
8✔
195
  }
196
  return 0;
1,879✔
197
}
198

199
void dmCleanup() {
1,887✔
200
  dDebug("start to cleanup dnode env");
1,887✔
201
  SDnode *pDnode = dmInstance();
1,887✔
202
  if (dmCheckRepeatCleanup(pDnode) != 0) return;
1,887✔
203
  dmCleanupDnode(pDnode);
1,879✔
204
  monCleanup();
1,879✔
205
  auditCleanup();
1,879✔
206
  syncCleanUp();
1,879✔
207
  walCleanUp();
1,879✔
208
  if (udfcClose() != 0) {
1,879!
209
    dError("failed to close udfc");
×
210
  }
211
  udfStopUdfd();
1,879✔
212
  taosAnalyticsCleanup();
1,879✔
213
  taosStopCacheRefreshWorker();
1,879✔
214
  (void)dmDiskClose();
1,879✔
215
  DestroyRegexCache();
1,879✔
216

217
#if defined(USE_S3)
218
  tcsUninit();
1,879✔
219
#endif
220

221
  dInfo("dnode env is cleaned up");
1,879!
222

223
  taosMemPoolClose(gMemPoolHandle);
1,879✔
224
  taosCleanupCfg();
1,879✔
225
  taosCloseLog();
1,879✔
226
}
227

228
void dmStop() {
1,882✔
229
  SDnode *pDnode = dmInstance();
1,882✔
230
  pDnode->stop = true;
1,882✔
231
}
1,882✔
232

233
int32_t dmRun() {
1,879✔
234
  SDnode *pDnode = dmInstance();
1,879✔
235
  return dmRunDnode(pDnode);
1,879✔
236
}
237

238
static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
530✔
239
  int32_t code = 0;
530✔
240
  SDnode *pDnode = dmInstance();
530✔
241

242
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
530✔
243
  if (pWrapper != NULL) {
530✔
244
    dmReleaseWrapper(pWrapper);
2✔
245
    switch (ntype) {
2!
UNCOV
246
      case MNODE:
×
UNCOV
247
        code = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
×
UNCOV
248
        break;
×
249
      case QNODE:
2✔
250
        code = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
2✔
251
        break;
2✔
252
      case SNODE:
×
253
        code = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
×
254
        break;
×
255
      default:
×
256
        code = TSDB_CODE_APP_ERROR;
×
257
    }
258
    dError("failed to create node since %s", tstrerror(code));
2!
259
    return code;
2✔
260
  }
261

262
  dInfo("start to process create-node-request");
528!
263

264
  pWrapper = &pDnode->wrappers[ntype];
528✔
265
  if (taosMkDir(pWrapper->path) != 0) {
528!
266
    dmReleaseWrapper(pWrapper);
×
267
    code = terrno;
×
268
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
269
    return code;
×
270
  }
271

272
  (void)taosThreadMutexLock(&pDnode->mutex);
528✔
273
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
528✔
274

275
  dInfo("node:%s, start to create", pWrapper->name);
528!
276
  code = (*pWrapper->func.createFp)(&input, pMsg);
528✔
277
  if (code != 0) {
528✔
278
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
2!
279
  } else {
280
    dInfo("node:%s, has been created", pWrapper->name);
526!
281
    code = dmOpenNode(pWrapper);
526✔
282
    if (code == 0) {
526!
283
      code = dmStartNode(pWrapper);
526✔
284
    }
285
    pWrapper->deployed = true;
526✔
286
    pWrapper->required = true;
526✔
287
  }
288

289
  (void)taosThreadMutexUnlock(&pDnode->mutex);
528✔
290
  return code;
528✔
291
}
292

293
static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
796✔
294
  int32_t code = 0;
796✔
295
  SDnode *pDnode = dmInstance();
796✔
296

297
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
796✔
298
  if (pWrapper == NULL) {
796!
299
    dError("fail to process alter node type since node not exist");
×
300
    return TSDB_CODE_INVALID_MSG;
×
301
  }
302
  dmReleaseWrapper(pWrapper);
796✔
303

304
  dInfo("node:%s, start to process alter-node-type-request", pWrapper->name);
796!
305

306
  pWrapper = &pDnode->wrappers[ntype];
796✔
307

308
  if (pWrapper->func.nodeRoleFp != NULL) {
796!
309
    ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
796✔
310
    dInfo("node:%s, checking node role:%d", pWrapper->name, role);
796!
311
    if (role == TAOS_SYNC_ROLE_VOTER) {
796!
312
      dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
×
313
      code = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
×
314
      return code;
×
315
    }
316
  }
317

318
  if (pWrapper->func.isCatchUpFp != NULL) {
796!
319
    dInfo("node:%s, checking node catch up", pWrapper->name);
796!
320
    if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) {
796✔
321
      code = TSDB_CODE_MNODE_NOT_CATCH_UP;
725✔
322
      return code;
725✔
323
    }
324
  }
325

326
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name);
71!
327

328
  (void)taosThreadMutexLock(&pDnode->mutex);
71✔
329

330
  dInfo("node:%s, stopping node", pWrapper->name);
71!
331
  dmStopNode(pWrapper);
71✔
332
  dInfo("node:%s, closing node", pWrapper->name);
71!
333
  dmCloseNode(pWrapper);
71✔
334

335
  pWrapper = &pDnode->wrappers[ntype];
71✔
336
  if (taosMkDir(pWrapper->path) != 0) {
71!
337
    (void)taosThreadMutexUnlock(&pDnode->mutex);
×
338
    code = terrno;
×
339
    dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
×
340
    return code;
×
341
  }
342

343
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
71✔
344

345
  dInfo("node:%s, start to create", pWrapper->name);
71!
346
  code = (*pWrapper->func.createFp)(&input, pMsg);
71✔
347
  if (code != 0) {
71!
348
    dError("node:%s, failed to create since %s", pWrapper->name, tstrerror(code));
×
349
  } else {
350
    dInfo("node:%s, has been created", pWrapper->name);
71!
351
    code = dmOpenNode(pWrapper);
71✔
352
    if (code == 0) {
71!
353
      code = dmStartNode(pWrapper);
71✔
354
    }
355
    pWrapper->deployed = true;
71✔
356
    pWrapper->required = true;
71✔
357
  }
358

359
  (void)taosThreadMutexUnlock(&pDnode->mutex);
71✔
360
  return code;
71✔
361
}
362

363
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
16✔
364
  int32_t code = 0;
16✔
365
  SDnode *pDnode = dmInstance();
16✔
366

367
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
16✔
368
  if (pWrapper == NULL) {
16✔
369
    switch (ntype) {
2!
370
      case MNODE:
×
371
        code = TSDB_CODE_MNODE_NOT_DEPLOYED;
×
372
        break;
×
373
      case QNODE:
2✔
374
        code = TSDB_CODE_QNODE_NOT_DEPLOYED;
2✔
375
        break;
2✔
376
      case SNODE:
×
377
        code = TSDB_CODE_SNODE_NOT_DEPLOYED;
×
378
        break;
×
379
      default:
×
380
        code = TSDB_CODE_APP_ERROR;
×
381
    }
382

383
    dError("failed to drop node since %s", tstrerror(code));
2!
384
    return terrno = code;
2✔
385
  }
386

387
  (void)taosThreadMutexLock(&pDnode->mutex);
14✔
388
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
14✔
389

390
  dInfo("node:%s, start to drop", pWrapper->name);
14!
391
  code = (*pWrapper->func.dropFp)(&input, pMsg);
14✔
392
  if (code != 0) {
14!
393
    dError("node:%s, failed to drop since %s", pWrapper->name, tstrerror(code));
×
394
  } else {
395
    dInfo("node:%s, has been dropped", pWrapper->name);
14!
396
    pWrapper->required = false;
14✔
397
    pWrapper->deployed = false;
14✔
398
  }
399

400
  dmReleaseWrapper(pWrapper);
14✔
401

402
  if (code == 0) {
14!
403
    dmStopNode(pWrapper);
14✔
404
    dmCloseNode(pWrapper);
14✔
405
    taosRemoveDir(pWrapper->path);
14✔
406
  }
407
  (void)taosThreadMutexUnlock(&pDnode->mutex);
14✔
408
  return code;
14✔
409
}
410

411
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
15,807✔
412
  SMgmtInputOpt opt = {
15,807✔
413
      .path = pWrapper->path,
15,807✔
414
      .name = pWrapper->name,
15,807✔
415
      .pTfs = pWrapper->pDnode->pTfs,
15,807✔
416
      .pData = &pWrapper->pDnode->data,
15,807✔
417
      .processCreateNodeFp = dmProcessCreateNodeReq,
418
      .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
419
      .processDropNodeFp = dmProcessDropNodeReq,
420
      .sendMonitorReportFp = dmSendMonitorReport,
421
      .monitorCleanExpiredSamplesFp = dmMonitorCleanExpiredSamples,
422
      .sendAuditRecordFp = auditSendRecordsInBatch,
423
      .getVnodeLoadsFp = dmGetVnodeLoads,
424
      .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
425
      .getMnodeLoadsFp = dmGetMnodeLoads,
426
      .getQnodeLoadsFp = dmGetQnodeLoads,
427
      .stopDnodeFp = dmStop,
428
  };
429

430
  opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
15,807✔
431
  return opt;
15,807✔
432
}
433

434
void dmReportStartup(const char *pName, const char *pDesc) {
104,168✔
435
  SStartupInfo *pStartup = &(dmInstance()->startup);
104,168✔
436
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
104,168✔
437
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
104,168✔
438
  dDebug("step:%s, %s", pStartup->name, pStartup->desc);
104,168✔
439
}
104,168✔
440

441
int64_t dmGetClusterId() { return globalDnode.data.clusterId; }
×
442

443
bool dmReadyForTest() { return dmInstance()->data.dnodeVer > 0; }
32✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc