• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4788

14 Oct 2025 11:21AM UTC coverage: 60.992% (-2.3%) from 63.264%
#4788

push

travis-ci

web-flow
Merge 7ca9b50f9 into 19574fe21

154868 of 324306 branches covered (47.75%)

Branch coverage included in aggregate %.

207304 of 269498 relevant lines covered (76.92%)

125773493.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.97
/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "dmNodes.h"
19
#include "index.h"
20
#include "qworker.h"
21
#include "tcompression.h"
22
#include "tconv.h"
23
#include "tglobal.h"
24
#include "tgrant.h"
25
#include "tconv.h"
26
#include "stream.h"
27

28
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
4,486,200✔
29
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
4,486,200✔
30
  input.dnodeId = pDnode->data.dnodeId;
4,486,200✔
31

32
  bool    required = false;
4,486,200✔
33
  int32_t code = (*pWrapper->func.requiredFp)(&input, &required);
4,486,200✔
34
  if (!required) {
4,486,200✔
35
    dDebug("node:%s, does not require startup", pWrapper->name);
2,478,333✔
36
  } else {
37
    dDebug("node:%s, required to startup", pWrapper->name);
2,007,867✔
38
  }
39

40
  return required;
4,486,200!
41
}
42

43
int32_t dmInitDnode(SDnode *pDnode) {
747,700✔
44
  dDebug("start to create dnode");
747,700✔
45
  int32_t code = -1;
747,700✔
46
  char    path[PATH_MAX + 100] = {0};
747,700✔
47

48
  if ((code = dmInitVarsWrapper(pDnode)) != 0) {
747,700!
49
    goto _OVER;
×
50
  }
51

52
  // compress module init
53
  tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor);
747,700!
54

55
  pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
747,700✔
56
  pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
747,700✔
57
  pDnode->wrappers[VNODE].func = vmGetMgmtFunc();
747,700✔
58
  pDnode->wrappers[QNODE].func = qmGetMgmtFunc();
747,700✔
59
  pDnode->wrappers[SNODE].func = smGetMgmtFunc();
747,700✔
60
  pDnode->wrappers[BNODE].func = bmGetMgmtFunc();
747,700✔
61

62
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
5,233,900✔
63
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
4,486,200✔
64
    pWrapper->pDnode = pDnode;
4,486,200✔
65
    pWrapper->name = dmNodeName(ntype);
4,486,200✔
66
    pWrapper->ntype = ntype;
4,486,200✔
67
    (void)taosThreadRwlockInit(&pWrapper->lock, NULL);
4,486,200✔
68

69
    snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
4,486,200!
70
    pWrapper->path = taosStrdup(path);
4,486,200!
71
    if (pWrapper->path == NULL) {
4,486,200!
72
      code = terrno;
×
73
      goto _OVER;
×
74
    }
75

76
    pWrapper->required = dmRequireNode(pDnode, pWrapper);
4,486,200✔
77
  }
78

79
  code = dmCheckRunning(tsDataDir, &pDnode->lockfile);
747,700✔
80
  if (code != 0) {
747,700✔
81
    goto _OVER;
578✔
82
  }
83

84
  if ((code = dmInitModule(pDnode)) != 0) {
747,122✔
85
    goto _OVER;
182✔
86
  }
87

88
  indexInit(tsNumOfCommitThreads);
746,940✔
89

90
  if ((code = dmInitStatusClient(pDnode)) != 0) {
746,940!
91
    goto _OVER;
×
92
  }
93
  if ((code = dmInitSyncClient(pDnode)) != 0) {
746,940!
94
    goto _OVER;
×
95
  }
96

97
  dmReportStartup("dnode-transport", "initialized");
746,940✔
98
  dDebug("dnode is created, ptr:%p", pDnode);
746,940✔
99
  code = 0;
746,940✔
100

101
_OVER:
747,700✔
102
  if (code != 0 && pDnode != NULL) {
747,700!
103
    dmClearVars(pDnode);
760✔
104
    pDnode = NULL;
760✔
105
    dError("failed to create dnode since %s", tstrerror(code));
760!
106
  }
107

108
  return code;
747,700✔
109
}
110

111
void dmCleanupDnode(SDnode *pDnode) {
746,940✔
112
  if (pDnode == NULL) {
746,940!
113
    return;
×
114
  }
115

116
  dmCleanupClient(pDnode);
746,940✔
117
  dmCleanupStatusClient(pDnode);
746,940✔
118
  dmCleanupSyncClient(pDnode);
746,940✔
119
  dmCleanupServer(pDnode);
746,940✔
120

121
  dmClearVars(pDnode);
746,940✔
122
  rpcCleanup();
746,940✔
123
  streamCleanup();
746,940✔
124
  indexCleanup();
746,940✔
125
  taosConvDestroy();
746,940✔
126

127
  // compress destroy
128
  tsCompressExit();
746,940✔
129

130
  dDebug("dnode is closed, ptr:%p", pDnode);
746,940✔
131
}
132

133
int32_t dmInitVarsWrapper(SDnode *pDnode) {
747,700✔
134
  int32_t code = dmInitVars(pDnode);
747,700✔
135
  if (code == -1) {
747,700!
136
    return terrno;
×
137
  }
138
  return 0;
747,700✔
139
}
140
int32_t dmInitVars(SDnode *pDnode) {
747,700✔
141
  int32_t     code = 0;
747,700✔
142
  SDnodeData *pData = &pDnode->data;
747,700✔
143
  pData->dnodeId = 0;
747,700✔
144
  pData->clusterId = 0;
747,700✔
145
  pData->dnodeVer = 0;
747,700✔
146
  pData->engineVer = 0;
747,700✔
147
  pData->updateTime = 0;
747,700✔
148
  pData->rebootTime = taosGetTimestampMs();
747,700✔
149
  pData->dropped = 0;
747,700✔
150
  pData->stopped = 0;
747,700✔
151
  char *machineId = NULL;
747,700✔
152
  code = tGetMachineId(&machineId);
747,700✔
153
  if (machineId) {
747,700!
154
    tstrncpy(pData->machineId, machineId, TSDB_MACHINE_ID_LEN + 1);
747,700!
155
    taosMemoryFreeClear(machineId);
747,700!
156
  } else {
157
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
158
    code = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
159
    return terrno = code;
×
160
#endif
161
  }
162

163
  pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
747,700✔
164
  if (pData->dnodeHash == NULL) {
747,700!
165
    dError("failed to init dnode hash");
×
166
    return terrno;
×
167
  }
168

169
  if ((code = dmReadEps(pData)) != 0) {
747,700!
170
    dError("failed to read file since %s", tstrerror(code));
×
171
    return code;
×
172
  }
173

174
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
175
  tsiEncryptAlgorithm = pData->encryptAlgorigthm;
747,700✔
176
  tsiEncryptScope = pData->encryptScope;
747,700✔
177
  /*
178
  if(tsiEncryptAlgorithm != 0) {
179
    if(pData->machineId != NULL && strlen(pData->machineId) > 0){
180
      dInfo("get crypt key at startup, machineId:%s", pData->machineId);
181
      int32_t code = 0;
182

183
      //code = taosGetCryptKey(tsAuthCode, pData->machineId, tsCryptKey);
184
      code = 0;
185
      tstrncpy(tsEncryptKey, tsAuthCode, 16);
186

187
      if (code != 0) {
188
        if(code == -1){
189
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
190
          dError("machine code changed, can't get crypt key");
191
        }
192
        if(code == -2){
193
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
194
          dError("failed to get crypt key");
195
        }
196
        return -1;
197
      }
198

199
      if(strlen(tsEncryptKey) == 0){
200
        terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
201
        dError("failed to get crypt key at startup since key is null, machineId:%s", pData->machineId);
202
        return -1;
203
      }
204
    }
205
    else{
206
      terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
207
      dError("failed to get crypt key at startup, machineId:%s", pData->machineId);
208
      return -1;
209
    }
210
  }
211
  */
212
#endif
213

214
  if (pData->dropped) {
747,700!
215
    dError("dnode will not start since its already dropped");
×
216
    return -1;
×
217
  }
218

219
  (void)taosThreadRwlockInit(&pData->lock, NULL);
747,700✔
220
  (void)taosThreadMutexInit(&pData->statusInfolock, NULL);
747,700✔
221
  (void)taosThreadMutexInit(&pDnode->mutex, NULL);
747,700✔
222
  return 0;
747,700✔
223
}
224

225
extern SMonVloadInfo tsVinfo;
226

227
void dmClearVars(SDnode *pDnode) {
747,700✔
228
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
5,233,900✔
229
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
4,486,200✔
230
    taosMemoryFreeClear(pWrapper->path);
4,486,200!
231
    (void)taosThreadRwlockDestroy(&pWrapper->lock);
4,486,200✔
232
  }
233
  if (pDnode->lockfile != NULL) {
747,700!
234
    if (taosUnLockFile(pDnode->lockfile) != 0) {
747,700!
235
      dError("failed to unlock file");
×
236
    }
237

238
    (void)taosCloseFile(&pDnode->lockfile);
747,700✔
239
    pDnode->lockfile = NULL;
747,700✔
240
  }
241

242
  SDnodeData *pData = &pDnode->data;
747,700✔
243
  (void)taosThreadRwlockWrlock(&pData->lock);
747,700✔
244
  if (pData->oldDnodeEps != NULL) {
747,700!
245
    if (dmWriteEps(pData) == 0) {
×
246
      dmRemoveDnodePairs(pData);
×
247
    }
248
    taosArrayDestroy(pData->oldDnodeEps);
×
249
    pData->oldDnodeEps = NULL;
×
250
  }
251
  if (pData->dnodeEps != NULL) {
747,700!
252
    taosArrayDestroy(pData->dnodeEps);
747,700✔
253
    pData->dnodeEps = NULL;
747,700✔
254
  }
255
  if (pData->dnodeHash != NULL) {
747,700!
256
    taosHashCleanup(pData->dnodeHash);
747,700✔
257
    pData->dnodeHash = NULL;
747,700✔
258
  }
259
  (void)taosThreadRwlockUnlock(&pData->lock);
747,700✔
260

261
  (void)taosThreadRwlockDestroy(&pData->lock);
747,700✔
262

263
  dDebug("begin to lock status info when thread exit");
747,700✔
264
  if (taosThreadMutexLock(&pData->statusInfolock) != 0) {
747,700!
265
    dError("failed to lock status info lock");
×
266
    return;
×
267
  }
268
  if (tsVinfo.pVloads != NULL) {
747,700✔
269
    taosArrayDestroy(tsVinfo.pVloads);
197,055✔
270
    tsVinfo.pVloads = NULL;
197,055✔
271
  }
272
  if (taosThreadMutexUnlock(&pData->statusInfolock) != 0) {
747,700!
273
    dError("failed to unlock status info lock");
×
274
    return;
×
275
  }
276
  if (taosThreadMutexDestroy(&pData->statusInfolock) != 0) {
747,700!
277
    dError("failed to destroy status info lock");
×
278
  }
279
  memset(&pData->statusInfolock, 0, sizeof(pData->statusInfolock));
747,700!
280

281
  (void)taosThreadMutexDestroy(&pDnode->mutex);
747,700✔
282
  memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
747,700!
283
}
284

285
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
1,493,880✔
286
  if (pDnode->status != status) {
1,493,880!
287
    dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
1,493,880✔
288
    pDnode->status = status;
1,493,880✔
289
  }
290
}
1,493,880✔
291

292
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
873,503✔
293
  SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
873,503✔
294
  SMgmtWrapper *pRetWrapper = pWrapper;
873,503✔
295

296
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
873,503✔
297
  if (pWrapper->deployed) {
873,503✔
298
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
663,774✔
299
    // dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
300
  } else {
301
    pRetWrapper = NULL;
209,729✔
302
  }
303
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
873,503✔
304

305
  return pRetWrapper;
873,503✔
306
}
307

308
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
2,147,283,049✔
309
  int32_t code = 0;
2,147,283,049✔
310

311
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
2,147,283,049✔
312
  if (pWrapper->deployed) {
2,147,387,958✔
313
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
2,094,086,501✔
314
    // dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
315
  } else {
316
    switch (pWrapper->ntype) {
53,286,916!
317
      case MNODE:
15,163,705✔
318
        code = TSDB_CODE_MNODE_NOT_FOUND;
15,163,705✔
319
        break;
15,163,705✔
320
      case QNODE:
38,090,007✔
321
        code = TSDB_CODE_QNODE_NOT_FOUND;
38,090,007✔
322
        break;
38,090,007✔
323
      case SNODE:
173✔
324
        code = TSDB_CODE_SNODE_NOT_FOUND;
173✔
325
        break;
173✔
326
      case BNODE:
×
327
        code = TSDB_CODE_BNODE_NOT_FOUND;
×
328
        break;
×
329
      case VNODE:
32,674✔
330
        code = TSDB_CODE_VND_STOPPED;
32,674✔
331
        break;
32,674✔
332
      default:
×
333
        code = TSDB_CODE_APP_IS_STOPPING;
×
334
        break;
×
335
    }
336
  }
337
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
2,147,380,959✔
338

339
  return code;
2,147,377,529✔
340
}
341

342
void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
2,114,299,285✔
343
  if (pWrapper == NULL) return;
2,114,299,285✔
344

345
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
2,094,659,651✔
346
  int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
2,094,783,301✔
347
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
2,094,772,037✔
348
  // dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
349
}
350

351
static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
×
352
  SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt;
×
353
  pStatus->details[0] = 0;
×
354

355
  if (pDnode->status == DND_STAT_INIT) {
×
356
    pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
×
357
    snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
×
358
  } else if (pDnode->status == DND_STAT_STOPPED) {
×
359
    pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
×
360
  } else {
361
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
×
362
  }
363
}
×
364

365
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
×
366
  dDebug("msg:%p, net test req will be processed", pMsg);
×
367

368
  SRpcMsg rsp = {.info = pMsg->info};
×
369
  rsp.pCont = rpcMallocCont(pMsg->contLen);
×
370
  if (rsp.pCont == NULL) {
×
371
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
372
  } else {
373
    rsp.contLen = pMsg->contLen;
×
374
  }
375

376
  if (rpcSendResponse(&rsp) != 0) {
×
377
    dError("failed to send response, msg:%p", &rsp);
×
378
  }
379
  rpcFreeCont(pMsg->pCont);
×
380
}
×
381

382
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
×
383
  dDebug("msg:%p, server startup status req will be processed", pMsg);
×
384

385
  SServerStatusRsp statusRsp = {0};
×
386
  dmGetServerStartupStatus(pDnode, &statusRsp);
×
387

388
  SRpcMsg rsp = {.info = pMsg->info};
×
389
  int32_t contLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
×
390
  if (contLen < 0) {
×
391
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
392
  } else {
393
    rsp.pCont = rpcMallocCont(contLen);
×
394
    if (rsp.pCont != NULL) {
×
395
      if (tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp) < 0) {
×
396
        rsp.code = TSDB_CODE_APP_ERROR;
×
397
      } else {
398
        rsp.contLen = contLen;
×
399
      }
400
    }
401
  }
402

403
  if (rpcSendResponse(&rsp) != 0) {
×
404
    dError("failed to send response, msg:%p", &rsp);
×
405
  }
406
  rpcFreeCont(pMsg->pCont);
×
407
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc