• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "dmNodes.h"
19
#include "index.h"
20
#include "qworker.h"
21
#include "tcompression.h"
22
#include "tglobal.h"
23
#include "tgrant.h"
24
#include "tstream.h"
25
#include "tconv.h"
26

UNCOV
27
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
×
UNCOV
28
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
×
29

UNCOV
30
  bool    required = false;
×
UNCOV
31
  int32_t code = (*pWrapper->func.requiredFp)(&input, &required);
×
UNCOV
32
  if (!required) {
×
UNCOV
33
    dDebug("node:%s, does not require startup", pWrapper->name);
×
34
  } else {
UNCOV
35
    dDebug("node:%s, required to startup", pWrapper->name);
×
36
  }
37

UNCOV
38
  return required;
×
39
}
40

UNCOV
41
int32_t dmInitDnode(SDnode *pDnode) {
×
UNCOV
42
  dDebug("start to create dnode");
×
UNCOV
43
  int32_t code = -1;
×
UNCOV
44
  char    path[PATH_MAX + 100] = {0};
×
45

UNCOV
46
  if ((code = dmInitVarsWrapper(pDnode)) != 0) {
×
47
    goto _OVER;
×
48
  }
49

50
  // compress module init
UNCOV
51
  tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor);
×
52

UNCOV
53
  pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
×
UNCOV
54
  pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
×
UNCOV
55
  pDnode->wrappers[VNODE].func = vmGetMgmtFunc();
×
UNCOV
56
  pDnode->wrappers[QNODE].func = qmGetMgmtFunc();
×
UNCOV
57
  pDnode->wrappers[SNODE].func = smGetMgmtFunc();
×
58

UNCOV
59
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
×
UNCOV
60
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
×
UNCOV
61
    pWrapper->pDnode = pDnode;
×
UNCOV
62
    pWrapper->name = dmNodeName(ntype);
×
UNCOV
63
    pWrapper->ntype = ntype;
×
UNCOV
64
    (void)taosThreadRwlockInit(&pWrapper->lock, NULL);
×
65

UNCOV
66
    snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
×
UNCOV
67
    pWrapper->path = taosStrdup(path);
×
UNCOV
68
    if (pWrapper->path == NULL) {
×
69
      code = terrno;
×
70
      goto _OVER;
×
71
    }
72

UNCOV
73
    pWrapper->required = dmRequireNode(pDnode, pWrapper);
×
74
  }
75

UNCOV
76
  code = dmCheckRunning(tsDataDir, &pDnode->lockfile);
×
UNCOV
77
  if (code != 0) {
×
UNCOV
78
    goto _OVER;
×
79
  }
80

UNCOV
81
  if ((code = dmInitModule(pDnode)) != 0) {
×
UNCOV
82
    goto _OVER;
×
83
  }
84

UNCOV
85
  indexInit(tsNumOfCommitThreads);
×
UNCOV
86
  streamMetaInit();
×
87

UNCOV
88
  if ((code = dmInitStatusClient(pDnode)) != 0) {
×
89
    goto _OVER;
×
90
  }
UNCOV
91
  if ((code = dmInitSyncClient(pDnode)) != 0) {
×
92
    goto _OVER;
×
93
  }
94

UNCOV
95
  dmReportStartup("dnode-transport", "initialized");
×
UNCOV
96
  dDebug("dnode is created, ptr:%p", pDnode);
×
UNCOV
97
  code = 0;
×
98

UNCOV
99
_OVER:
×
UNCOV
100
  if (code != 0 && pDnode != NULL) {
×
UNCOV
101
    dmClearVars(pDnode);
×
UNCOV
102
    pDnode = NULL;
×
UNCOV
103
    dError("failed to create dnode since %s", tstrerror(code));
×
104
  }
105

UNCOV
106
  return code;
×
107
}
108

UNCOV
109
void dmCleanupDnode(SDnode *pDnode) {
×
UNCOV
110
  if (pDnode == NULL) {
×
111
    return;
×
112
  }
113

UNCOV
114
  dmCleanupClient(pDnode);
×
UNCOV
115
  dmCleanupStatusClient(pDnode);
×
UNCOV
116
  dmCleanupSyncClient(pDnode);
×
UNCOV
117
  dmCleanupServer(pDnode);
×
118

UNCOV
119
  dmClearVars(pDnode);
×
UNCOV
120
  rpcCleanup();
×
UNCOV
121
  streamMetaCleanup();
×
UNCOV
122
  indexCleanup();
×
UNCOV
123
  taosConvDestroy();
×
124

125
  // compress destroy
UNCOV
126
  tsCompressExit();
×
127

UNCOV
128
  dDebug("dnode is closed, ptr:%p", pDnode);
×
129
}
130

UNCOV
131
int32_t dmInitVarsWrapper(SDnode *pDnode) {
×
UNCOV
132
  int32_t code = dmInitVars(pDnode);
×
UNCOV
133
  if (code == -1) {
×
134
    return terrno;
×
135
  }
UNCOV
136
  return 0;
×
137
}
UNCOV
138
int32_t dmInitVars(SDnode *pDnode) {
×
UNCOV
139
  int32_t     code = 0;
×
UNCOV
140
  SDnodeData *pData = &pDnode->data;
×
UNCOV
141
  pData->dnodeId = 0;
×
UNCOV
142
  pData->clusterId = 0;
×
UNCOV
143
  pData->dnodeVer = 0;
×
UNCOV
144
  pData->engineVer = 0;
×
UNCOV
145
  pData->updateTime = 0;
×
UNCOV
146
  pData->rebootTime = taosGetTimestampMs();
×
UNCOV
147
  pData->dropped = 0;
×
UNCOV
148
  pData->stopped = 0;
×
UNCOV
149
  char *machineId = NULL;
×
UNCOV
150
  code = tGetMachineId(&machineId);
×
UNCOV
151
  if (machineId) {
×
UNCOV
152
    tstrncpy(pData->machineId, machineId, TSDB_MACHINE_ID_LEN + 1);
×
UNCOV
153
    taosMemoryFreeClear(machineId);
×
154
  } else {
155
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
156
    code = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
157
    return terrno = code;
×
158
#endif
159
  }
160

UNCOV
161
  pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
UNCOV
162
  if (pData->dnodeHash == NULL) {
×
163
    dError("failed to init dnode hash");
×
164
    return terrno;
×
165
  }
166

UNCOV
167
  if ((code = dmReadEps(pData)) != 0) {
×
168
    dError("failed to read file since %s", tstrerror(code));
×
169
    return code;
×
170
  }
171

172
#if defined(TD_ENTERPRISE)
UNCOV
173
  tsiEncryptAlgorithm = pData->encryptAlgorigthm;
×
UNCOV
174
  tsiEncryptScope = pData->encryptScope;
×
175
  /*
176
  if(tsiEncryptAlgorithm != 0) {
177
    if(pData->machineId != NULL && strlen(pData->machineId) > 0){
178
      dInfo("get crypt key at startup, machineId:%s", pData->machineId);
179
      int32_t code = 0;
180

181
      //code = taosGetCryptKey(tsAuthCode, pData->machineId, tsCryptKey);
182
      code = 0;
183
      tstrncpy(tsEncryptKey, tsAuthCode, 16);
184

185
      if (code != 0) {
186
        if(code == -1){
187
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
188
          dError("machine code changed, can't get crypt key");
189
        }
190
        if(code == -2){
191
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
192
          dError("failed to get crypt key");
193
        }
194
        return -1;
195
      }
196

197
      if(strlen(tsEncryptKey) == 0){
198
        terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
199
        dError("failed to get crypt key at startup since key is null, machineId:%s", pData->machineId);
200
        return -1;
201
      }
202
    }
203
    else{
204
      terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
205
      dError("failed to get crypt key at startup, machineId:%s", pData->machineId);
206
      return -1;
207
    }
208
  }
209
  */
210
#endif
211

UNCOV
212
  if (pData->dropped) {
×
213
    dError("dnode will not start since its already dropped");
×
214
    return -1;
×
215
  }
216

UNCOV
217
  (void)taosThreadRwlockInit(&pData->lock, NULL);
×
UNCOV
218
  (void)taosThreadMutexInit(&pData->statusInfolock, NULL);
×
UNCOV
219
  (void)taosThreadMutexInit(&pDnode->mutex, NULL);
×
UNCOV
220
  return 0;
×
221
}
222

223
extern SMonVloadInfo tsVinfo;
224

UNCOV
225
void dmClearVars(SDnode *pDnode) {
×
UNCOV
226
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
×
UNCOV
227
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
×
UNCOV
228
    taosMemoryFreeClear(pWrapper->path);
×
UNCOV
229
    (void)taosThreadRwlockDestroy(&pWrapper->lock);
×
230
  }
UNCOV
231
  if (pDnode->lockfile != NULL) {
×
UNCOV
232
    if (taosUnLockFile(pDnode->lockfile) != 0) {
×
233
      dError("failed to unlock file");
×
234
    }
235

UNCOV
236
    (void)taosCloseFile(&pDnode->lockfile);
×
UNCOV
237
    pDnode->lockfile = NULL;
×
238
  }
239

UNCOV
240
  SDnodeData *pData = &pDnode->data;
×
UNCOV
241
  (void)taosThreadRwlockWrlock(&pData->lock);
×
UNCOV
242
  if (pData->oldDnodeEps != NULL) {
×
243
    if (dmWriteEps(pData) == 0) {
×
244
      dmRemoveDnodePairs(pData);
×
245
    }
246
    taosArrayDestroy(pData->oldDnodeEps);
×
247
    pData->oldDnodeEps = NULL;
×
248
  }
UNCOV
249
  if (pData->dnodeEps != NULL) {
×
UNCOV
250
    taosArrayDestroy(pData->dnodeEps);
×
UNCOV
251
    pData->dnodeEps = NULL;
×
252
  }
UNCOV
253
  if (pData->dnodeHash != NULL) {
×
UNCOV
254
    taosHashCleanup(pData->dnodeHash);
×
UNCOV
255
    pData->dnodeHash = NULL;
×
256
  }
UNCOV
257
  (void)taosThreadRwlockUnlock(&pData->lock);
×
258

UNCOV
259
  (void)taosThreadRwlockDestroy(&pData->lock);
×
260

UNCOV
261
  dDebug("begin to lock status info when thread exit");
×
UNCOV
262
  if (taosThreadMutexLock(&pData->statusInfolock) != 0) {
×
263
    dError("failed to lock status info lock");
×
264
    return;
×
265
  }
UNCOV
266
  if (tsVinfo.pVloads != NULL) {
×
UNCOV
267
    taosArrayDestroy(tsVinfo.pVloads);
×
UNCOV
268
    tsVinfo.pVloads = NULL;
×
269
  }
UNCOV
270
  if (taosThreadMutexUnlock(&pData->statusInfolock) != 0) {
×
271
    dError("failed to unlock status info lock");
×
272
    return;
×
273
  }
UNCOV
274
  if (taosThreadMutexDestroy(&pData->statusInfolock) != 0) {
×
275
    dError("failed to destroy status info lock");
×
276
  }
UNCOV
277
  memset(&pData->statusInfolock, 0, sizeof(pData->statusInfolock));
×
278

UNCOV
279
  (void)taosThreadMutexDestroy(&pDnode->mutex);
×
UNCOV
280
  memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
×
281
}
282

UNCOV
283
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
×
UNCOV
284
  if (pDnode->status != status) {
×
UNCOV
285
    dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
×
UNCOV
286
    pDnode->status = status;
×
287
  }
UNCOV
288
}
×
289

UNCOV
290
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
×
UNCOV
291
  SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
×
UNCOV
292
  SMgmtWrapper *pRetWrapper = pWrapper;
×
293

UNCOV
294
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
×
UNCOV
295
  if (pWrapper->deployed) {
×
UNCOV
296
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
×
297
    // dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
298
  } else {
UNCOV
299
    pRetWrapper = NULL;
×
300
  }
UNCOV
301
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
×
302

UNCOV
303
  return pRetWrapper;
×
304
}
305

UNCOV
306
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
×
UNCOV
307
  int32_t code = 0;
×
308

UNCOV
309
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
×
UNCOV
310
  if (pWrapper->deployed) {
×
UNCOV
311
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
×
312
    // dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
313
  } else {
UNCOV
314
    switch (pWrapper->ntype) {
×
UNCOV
315
      case MNODE:
×
UNCOV
316
        code = TSDB_CODE_MNODE_NOT_FOUND;
×
UNCOV
317
        break;
×
UNCOV
318
      case QNODE:
×
UNCOV
319
        code = TSDB_CODE_QNODE_NOT_FOUND;
×
UNCOV
320
        break;
×
UNCOV
321
      case SNODE:
×
UNCOV
322
        code = TSDB_CODE_SNODE_NOT_FOUND;
×
UNCOV
323
        break;
×
UNCOV
324
      case VNODE:
×
UNCOV
325
        code = TSDB_CODE_VND_STOPPED;
×
UNCOV
326
        break;
×
327
      default:
×
328
        code = TSDB_CODE_APP_IS_STOPPING;
×
329
        break;
×
330
    }
331
  }
UNCOV
332
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
×
333

UNCOV
334
  return code;
×
335
}
336

UNCOV
337
void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
×
UNCOV
338
  if (pWrapper == NULL) return;
×
339

UNCOV
340
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
×
UNCOV
341
  int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
×
UNCOV
342
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
×
343
  // dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
344
}
345

346
static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
×
347
  SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt;
×
348
  pStatus->details[0] = 0;
×
349

350
  if (pDnode->status == DND_STAT_INIT) {
×
351
    pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
×
352
    snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
×
353
  } else if (pDnode->status == DND_STAT_STOPPED) {
×
354
    pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
×
355
  } else {
356
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
×
357
  }
358
}
×
359

360
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
×
361
  dDebug("msg:%p, net test req will be processed", pMsg);
×
362

363
  SRpcMsg rsp = {.info = pMsg->info};
×
364
  rsp.pCont = rpcMallocCont(pMsg->contLen);
×
365
  if (rsp.pCont == NULL) {
×
366
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
367
  } else {
368
    rsp.contLen = pMsg->contLen;
×
369
  }
370

371
  if (rpcSendResponse(&rsp) != 0) {
×
372
    dError("failed to send response, msg:%p", &rsp);
×
373
  }
374
  rpcFreeCont(pMsg->pCont);
×
375
}
×
376

377
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
×
378
  dDebug("msg:%p, server startup status req will be processed", pMsg);
×
379

380
  SServerStatusRsp statusRsp = {0};
×
381
  dmGetServerStartupStatus(pDnode, &statusRsp);
×
382

383
  SRpcMsg rsp = {.info = pMsg->info};
×
384
  int32_t contLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
×
385
  if (contLen < 0) {
×
386
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
387
  } else {
388
    rsp.pCont = rpcMallocCont(contLen);
×
389
    if (rsp.pCont != NULL) {
×
390
      if (tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp) < 0) {
×
391
        rsp.code = TSDB_CODE_APP_ERROR;
×
392
      } else {
393
        rsp.contLen = contLen;
×
394
      }
395
    }
396
  }
397

398
  if (rpcSendResponse(&rsp) != 0) {
×
399
    dError("failed to send response, msg:%p", &rsp);
×
400
  }
401
  rpcFreeCont(pMsg->pCont);
×
402
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc