• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4912

04 Jan 2026 09:05AM UTC coverage: 64.888% (-0.1%) from 65.028%
#4912

push

travis-ci

web-flow
merge: from main to 3.0 branch #34156

1206 of 4524 new or added lines in 22 files covered. (26.66%)

5351 existing lines in 123 files now uncovered.

194856 of 300296 relevant lines covered (64.89%)

118198896.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.09
/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "dmNodes.h"
19
#include "index.h"
20
#include "qworker.h"
21
#include "tcompression.h"
22
#include "tconv.h"
23
#include "tglobal.h"
24
#include "tgrant.h"
25
#include "tconv.h"
26
#include "stream.h"
27
#if defined(TD_ENTERPRISE) && defined(TD_HAS_TAOSK)
28
#include "taoskInt.h"
29
#endif
30

31
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
3,811,717✔
32
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
3,811,717✔
33
  input.dnodeId = pDnode->data.dnodeId;
3,811,717✔
34

35
  bool    required = false;
3,811,717✔
36
  int32_t code = (*pWrapper->func.requiredFp)(&input, &required);
3,811,717✔
37
  if (!required) {
3,811,717✔
38
    dDebug("node:%s, does not require startup", pWrapper->name);
1,811,782✔
39
  } else {
40
    dDebug("node:%s, required to startup", pWrapper->name);
1,999,935✔
41
  }
42

43
  return required;
3,811,717✔
44
}
45

46
int32_t dmInitDnode(SDnode *pDnode) {
544,531✔
47
  dDebug("start to create dnode");
544,531✔
48
  int32_t code = -1;
544,531✔
49
  char    path[PATH_MAX + 100] = {0};
544,531✔
50

51
  if ((code = dmInitVarsWrapper(pDnode)) != 0) {
544,531✔
52
    goto _OVER;
×
53
  }
54

55
  // compress module init
56
  tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor);
544,531✔
57

58
  pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
544,531✔
59
  pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
544,531✔
60
  pDnode->wrappers[VNODE].func = vmGetMgmtFunc();
544,531✔
61
  pDnode->wrappers[QNODE].func = qmGetMgmtFunc();
544,531✔
62
  pDnode->wrappers[SNODE].func = smGetMgmtFunc();
544,531✔
63
  pDnode->wrappers[BNODE].func = bmGetMgmtFunc();
544,531✔
64
  pDnode->wrappers[XNODE].func = xmGetMgmtFunc();
544,531✔
65

66
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
4,356,248✔
67
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
3,811,717✔
68
    pWrapper->pDnode = pDnode;
3,811,717✔
69
    pWrapper->name = dmNodeName(ntype);
3,811,717✔
70
    pWrapper->ntype = ntype;
3,811,717✔
71
    (void)taosThreadRwlockInit(&pWrapper->lock, NULL);
3,811,717✔
72

73
    snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
3,811,717✔
74
    pWrapper->path = taosStrdup(path);
3,811,717✔
75
    if (pWrapper->path == NULL) {
3,811,717✔
UNCOV
76
      code = terrno;
×
77
      goto _OVER;
×
78
    }
79

80
    pWrapper->required = dmRequireNode(pDnode, pWrapper);
3,811,717✔
81
  }
82

83
  code = dmCheckRunning(tsDataDir, &pDnode->lockfile);
544,531✔
84
  if (code != 0) {
544,531✔
85
    goto _OVER;
8,837✔
86
  }
87

88
  if ((code = dmInitModule(pDnode)) != 0) {
535,694✔
89
    goto _OVER;
28✔
90
  }
91

92
  indexInit(tsNumOfCommitThreads);
535,666✔
93

94
  if ((code = dmInitStatusClient(pDnode)) != 0) {
535,666✔
UNCOV
95
    goto _OVER;
×
96
  }
97
  if ((code = dmInitSyncClient(pDnode)) != 0) {
535,666✔
UNCOV
98
    goto _OVER;
×
99
  }
100

101
  dmReportStartup("dnode-transport", "initialized");
535,666✔
102
  dDebug("dnode is created, ptr:%p", pDnode);
535,666✔
103
  code = 0;
535,666✔
104

105
_OVER:
544,531✔
106
  if (code != 0 && pDnode != NULL) {
544,531✔
107
    dmClearVars(pDnode);
8,865✔
108
    pDnode = NULL;
8,865✔
109
    dError("failed to create dnode since %s", tstrerror(code));
8,865✔
110
  }
111

112
  return code;
544,531✔
113
}
114

115
void dmCleanupDnode(SDnode *pDnode) {
535,666✔
116
  if (pDnode == NULL) {
535,666✔
UNCOV
117
    return;
×
118
  }
119

120
  dmCleanupClient(pDnode);
535,666✔
121
  dmCleanupStatusClient(pDnode);
535,666✔
122
  dmCleanupSyncClient(pDnode);
535,666✔
123
  dmCleanupServer(pDnode);
535,666✔
124

125
  dmClearVars(pDnode);
535,666✔
126
  rpcCleanup();
535,666✔
127
  streamCleanup();
535,666✔
128
  indexCleanup();
535,666✔
129
  taosConvDestroy();
535,666✔
130

131
  // compress destroy
132
  tsCompressExit();
535,666✔
133

134
  dDebug("dnode is closed, ptr:%p", pDnode);
535,666✔
135
}
136

137
int32_t dmInitVarsWrapper(SDnode *pDnode) {
544,531✔
138
  int32_t code = dmInitVars(pDnode);
544,531✔
139
  if (code == -1) {
544,531✔
UNCOV
140
    return terrno;
×
141
  }
142
  return 0;
544,531✔
143
}
144
int32_t dmInitVars(SDnode *pDnode) {
544,531✔
145
  int32_t     code = 0;
544,531✔
146
  SDnodeData *pData = &pDnode->data;
544,531✔
147
  pData->dnodeId = 0;
544,531✔
148
  pData->clusterId = 0;
544,531✔
149
  pData->dnodeVer = 0;
544,531✔
150
  pData->engineVer = 0;
544,531✔
151
  pData->updateTime = 0;
544,531✔
152
  pData->rebootTime = taosGetTimestampMs();
544,531✔
153
  pData->dropped = 0;
544,531✔
154
  pData->stopped = 0;
544,531✔
155
  char *machineId = NULL;
544,531✔
156
  code = tGetMachineId(&machineId);
544,531✔
157
  if (machineId) {
544,531✔
158
    tstrncpy(pData->machineId, machineId, TSDB_MACHINE_ID_LEN + 1);
544,531✔
159
    taosMemoryFreeClear(machineId);
544,531✔
160
  } else {
161
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
UNCOV
162
    code = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
163
    return terrno = code;
×
164
#endif
165
  }
166

167
  pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
544,531✔
168
  if (pData->dnodeHash == NULL) {
544,531✔
UNCOV
169
    dError("failed to init dnode hash");
×
170
    return terrno;
×
171
  }
172

173
  if ((code = dmReadEps(pData)) != 0) {
544,531✔
UNCOV
174
    dError("failed to read file since %s", tstrerror(code));
×
175
    return code;
×
176
  }
177

178
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
179
  tsiEncryptAlgorithm = pData->encryptAlgorigthm;
544,531✔
180
  tsiEncryptScope = pData->encryptScope;
544,531✔
181

182
#if defined(TD_HAS_TAOSK) || defined(TD_ASTRA_TODO)
183
  // Load local encryption keys and initialize key version
184
  {
185
    char masterKeyFile[PATH_MAX] = {0};
544,531✔
186
    char derivedKeyFile[PATH_MAX] = {0};
544,531✔
187
    snprintf(masterKeyFile, sizeof(masterKeyFile), "%s%sdnode%sconfig%smaster.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
544,531✔
188
             TD_DIRSEP);
189
    snprintf(derivedKeyFile, sizeof(derivedKeyFile), "%s%sdnode%sconfig%sderived.bin", tsDataDir, TD_DIRSEP, TD_DIRSEP,
544,531✔
190
             TD_DIRSEP);
191

192
    char    svrKey[129] = {0};
544,531✔
193
    char    dbKey[129] = {0};
544,531✔
194
    char    cfgKey[129] = {0};
544,531✔
195
    char    metaKey[129] = {0};
544,531✔
196
    char    dataKey[129] = {0};
544,531✔
197
    int32_t algorithm = 0;
544,531✔
198
    int32_t cfgAlgorithm = 0;
544,531✔
199
    int32_t metaAlgorithm = 0;
544,531✔
200
    int32_t fileVersion = 0;
544,531✔
201
    int32_t keyVersion = 0;
544,531✔
202
    int64_t createTime = 0;
544,531✔
203
    int64_t svrKeyUpdateTime = 0;
544,531✔
204
    int64_t dbKeyUpdateTime = 0;
544,531✔
205

206
    code = taoskLoadEncryptKeys(masterKeyFile, derivedKeyFile, svrKey, dbKey, cfgKey, metaKey, dataKey, &algorithm,
544,531✔
207
                                &cfgAlgorithm, &metaAlgorithm, &fileVersion, &keyVersion, &createTime, 
208
                                &svrKeyUpdateTime, &dbKeyUpdateTime);
209
    if (code == 0) {
544,531✔
UNCOV
210
      tsLocalKeyVersion = keyVersion;
×
211
      dInfo("loaded local encryption keys, version:%d", tsLocalKeyVersion);
×
212
    } else {
213
      tsLocalKeyVersion = 0;
544,531✔
214
      dInfo("no local encryption keys found or failed to load, will sync from mnode");
544,531✔
215
    }
216
  }
217
#endif
218
#endif
219
  /*
220
  if(tsiEncryptAlgorithm != 0) {
221
    if(pData->machineId != NULL && strlen(pData->machineId) > 0){
222
      dInfo("get crypt key at startup, machineId:%s", pData->machineId);
223
      int32_t code = 0;
224

225
      //code = taosGetCryptKey(tsAuthCode, pData->machineId, tsCryptKey);
226
      code = 0;
227
      tstrncpy(tsEncryptKey, tsAuthCode, 16);
228

229
      if (code != 0) {
230
        if(code == -1){
231
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
232
          dError("machine code changed, can't get crypt key");
233
        }
234
        if(code == -2){
235
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
236
          dError("failed to get crypt key");
237
        }
238
        return -1;
239
      }
240

241
      if(strlen(tsEncryptKey) == 0){
242
        terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
243
        dError("failed to get crypt key at startup since key is null, machineId:%s", pData->machineId);
244
        return -1;
245
      }
246
    }
247
    else{
248
      terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
249
      dError("failed to get crypt key at startup, machineId:%s", pData->machineId);
250
      return -1;
251
    }
252
  }
253
  */
254

255
  if (pData->dropped) {
544,531✔
UNCOV
256
    dError("dnode will not start since its already dropped");
×
257
    return -1;
×
258
  }
259

260
  (void)taosThreadRwlockInit(&pData->lock, NULL);
544,531✔
261
  (void)taosThreadMutexInit(&pData->statusInfolock, NULL);
544,531✔
262
  (void)taosThreadMutexInit(&pDnode->mutex, NULL);
544,531✔
263
  return 0;
544,531✔
264
}
265

266
extern SMonVloadInfo tsVinfo;
267

268
void dmClearVars(SDnode *pDnode) {
544,531✔
269
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
4,356,248✔
270
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
3,811,717✔
271
    taosMemoryFreeClear(pWrapper->path);
3,811,717✔
272
    (void)taosThreadRwlockDestroy(&pWrapper->lock);
3,811,717✔
273
  }
274
  if (pDnode->lockfile != NULL) {
544,531✔
275
    if (taosUnLockFile(pDnode->lockfile) != 0) {
544,288✔
UNCOV
276
      dError("failed to unlock file");
×
277
    }
278

279
    (void)taosCloseFile(&pDnode->lockfile);
544,288✔
280
    pDnode->lockfile = NULL;
544,288✔
281
  }
282

283
  SDnodeData *pData = &pDnode->data;
544,531✔
284
  (void)taosThreadRwlockWrlock(&pData->lock);
544,531✔
285
  if (pData->oldDnodeEps != NULL) {
544,531✔
UNCOV
286
    if (dmWriteEps(pData) == 0) {
×
287
      dmRemoveDnodePairs(pData);
×
288
    }
UNCOV
289
    taosArrayDestroy(pData->oldDnodeEps);
×
290
    pData->oldDnodeEps = NULL;
×
291
  }
292
  if (pData->dnodeEps != NULL) {
544,531✔
293
    taosArrayDestroy(pData->dnodeEps);
544,531✔
294
    pData->dnodeEps = NULL;
544,531✔
295
  }
296
  if (pData->dnodeHash != NULL) {
544,531✔
297
    taosHashCleanup(pData->dnodeHash);
544,531✔
298
    pData->dnodeHash = NULL;
544,531✔
299
  }
300
  (void)taosThreadRwlockUnlock(&pData->lock);
544,531✔
301

302
  (void)taosThreadRwlockDestroy(&pData->lock);
544,531✔
303

304
  dDebug("begin to lock status info when thread exit");
544,531✔
305
  if (taosThreadMutexLock(&pData->statusInfolock) != 0) {
544,531✔
UNCOV
306
    dError("failed to lock status info lock");
×
307
    return;
×
308
  }
309
  if (tsVinfo.pVloads != NULL) {
544,531✔
310
    taosArrayDestroy(tsVinfo.pVloads);
154,207✔
311
    tsVinfo.pVloads = NULL;
154,207✔
312
  }
313
  if (taosThreadMutexUnlock(&pData->statusInfolock) != 0) {
544,531✔
UNCOV
314
    dError("failed to unlock status info lock");
×
315
    return;
×
316
  }
317
  if (taosThreadMutexDestroy(&pData->statusInfolock) != 0) {
544,531✔
UNCOV
318
    dError("failed to destroy status info lock");
×
319
  }
320
  memset(&pData->statusInfolock, 0, sizeof(pData->statusInfolock));
544,531✔
321

322
  (void)taosThreadMutexDestroy(&pDnode->mutex);
544,531✔
323
  memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
544,531✔
324
}
325

326
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
1,071,332✔
327
  if (pDnode->status != status) {
1,071,332✔
328
    dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
1,071,332✔
329
    pDnode->status = status;
1,071,332✔
330
  }
331
}
1,071,332✔
332

333
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
374,732✔
334
  SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
374,732✔
335
  SMgmtWrapper *pRetWrapper = pWrapper;
374,732✔
336

337
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
374,732✔
338
  if (pWrapper->deployed) {
374,732✔
339
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
286,601✔
340
    // dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
341
  } else {
342
    pRetWrapper = NULL;
88,131✔
343
  }
344
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
374,732✔
345

346
  return pRetWrapper;
374,732✔
347
}
348

349
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
2,147,483,647✔
350
  int32_t code = 0;
2,147,483,647✔
351

352
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
2,147,483,647✔
353
  if (pWrapper->deployed) {
2,147,483,647✔
354
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
2,147,483,647✔
355
    // dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
356
  } else {
357
    switch (pWrapper->ntype) {
56,801,018✔
358
      case MNODE:
16,493,637✔
359
        code = TSDB_CODE_MNODE_NOT_FOUND;
16,493,637✔
360
        break;
16,493,637✔
361
      case QNODE:
40,189,647✔
362
        code = TSDB_CODE_QNODE_NOT_FOUND;
40,189,647✔
363
        break;
40,189,647✔
364
      case SNODE:
281✔
365
        code = TSDB_CODE_SNODE_NOT_FOUND;
281✔
366
        break;
281✔
UNCOV
367
      case BNODE:
×
368
        code = TSDB_CODE_BNODE_NOT_FOUND;
×
369
        break;
×
NEW
370
      case XNODE:
×
NEW
371
        code = TSDB_CODE_XNODE_NOT_FOUND;
×
NEW
372
        break;
×
373
      case VNODE:
117,340✔
374
        code = TSDB_CODE_VND_STOPPED;
117,340✔
375
        break;
117,340✔
376
      default:
×
UNCOV
377
        code = TSDB_CODE_APP_IS_STOPPING;
×
UNCOV
378
        break;
×
379
    }
380
  }
381
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
2,147,483,647✔
382

383
  return code;
2,147,483,647✔
384
}
385

386
void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
2,147,483,647✔
387
  if (pWrapper == NULL) return;
2,147,483,647✔
388

389
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
2,147,483,647✔
390
  int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
2,147,483,647✔
391
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
2,147,483,647✔
392
  // dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
393
}
394

UNCOV
395
static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
×
UNCOV
396
  SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt;
×
UNCOV
397
  pStatus->details[0] = 0;
×
398

399
  if (pDnode->status == DND_STAT_INIT) {
×
400
    pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
×
401
    snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
×
UNCOV
402
  } else if (pDnode->status == DND_STAT_STOPPED) {
×
403
    pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
×
404
  } else {
405
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
×
406
  }
407
}
×
408

409
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
×
UNCOV
410
  dDebug("msg:%p, net test req will be processed", pMsg);
×
411

UNCOV
412
  SRpcMsg rsp = {.info = pMsg->info};
×
413
  rsp.pCont = rpcMallocCont(pMsg->contLen);
×
414
  if (rsp.pCont == NULL) {
×
UNCOV
415
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
416
  } else {
417
    rsp.contLen = pMsg->contLen;
×
418
  }
419

UNCOV
420
  if (rpcSendResponse(&rsp) != 0) {
×
421
    dError("failed to send response, msg:%p", &rsp);
×
422
  }
UNCOV
423
  rpcFreeCont(pMsg->pCont);
×
424
}
×
425

UNCOV
426
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
×
427
  dDebug("msg:%p, server startup status req will be processed", pMsg);
×
428

UNCOV
429
  SServerStatusRsp statusRsp = {0};
×
430
  dmGetServerStartupStatus(pDnode, &statusRsp);
×
431

UNCOV
432
  SRpcMsg rsp = {.info = pMsg->info};
×
433
  int32_t contLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
×
434
  if (contLen < 0) {
×
UNCOV
435
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
436
  } else {
437
    rsp.pCont = rpcMallocCont(contLen);
×
438
    if (rsp.pCont != NULL) {
×
439
      if (tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp) < 0) {
×
UNCOV
440
        rsp.code = TSDB_CODE_APP_ERROR;
×
441
      } else {
442
        rsp.contLen = contLen;
×
443
      }
444
    }
445
  }
446

UNCOV
447
  if (rpcSendResponse(&rsp) != 0) {
×
UNCOV
448
    dError("failed to send response, msg:%p", &rsp);
×
449
  }
UNCOV
450
  rpcFreeCont(pMsg->pCont);
×
451
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc