• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.57
/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
18
#include "dmNodes.h"
19
#include "index.h"
20
#include "qworker.h"
21
#include "tcompression.h"
22
#include "tglobal.h"
23
#include "tgrant.h"
24
#include "tstream.h"
25

26
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
12,035✔
27
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
12,035✔
28

29
  bool    required = false;
12,035✔
30
  int32_t code = (*pWrapper->func.requiredFp)(&input, &required);
12,035✔
31
  if (!required) {
12,035✔
32
    dDebug("node:%s, does not require startup", pWrapper->name);
5,375✔
33
  } else {
34
    dDebug("node:%s, required to startup", pWrapper->name);
6,660✔
35
  }
36

37
  return required;
12,035✔
38
}
39

40
int32_t dmInitDnode(SDnode *pDnode) {
2,407✔
41
  dDebug("start to create dnode");
2,407✔
42
  int32_t code = -1;
2,407✔
43
  char    path[PATH_MAX + 100] = {0};
2,407✔
44

45
  if ((code = dmInitVarsWrapper(pDnode)) != 0) {
2,407!
46
    goto _OVER;
×
47
  }
48

49
  // compress module init
50
  tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor);
2,407✔
51

52
  pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
2,407✔
53
  pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
2,407✔
54
  pDnode->wrappers[VNODE].func = vmGetMgmtFunc();
2,407✔
55
  pDnode->wrappers[QNODE].func = qmGetMgmtFunc();
2,407✔
56
  pDnode->wrappers[SNODE].func = smGetMgmtFunc();
2,407✔
57

58
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
14,442✔
59
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
12,035✔
60
    pWrapper->pDnode = pDnode;
12,035✔
61
    pWrapper->name = dmNodeName(ntype);
12,035✔
62
    pWrapper->ntype = ntype;
12,035✔
63
    (void)taosThreadRwlockInit(&pWrapper->lock, NULL);
12,035✔
64

65
    snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
12,035✔
66
    pWrapper->path = taosStrdup(path);
12,035✔
67
    if (pWrapper->path == NULL) {
12,035!
68
      code = terrno;
×
69
      goto _OVER;
×
70
    }
71

72
    pWrapper->required = dmRequireNode(pDnode, pWrapper);
12,035✔
73
  }
74

75
  code = dmCheckRunning(tsDataDir, &pDnode->lockfile);
2,407✔
76
  if (code != 0) {
2,407✔
77
    goto _OVER;
2✔
78
  }
79

80
  if ((code = dmInitModule(pDnode)) != 0) {
2,405!
UNCOV
81
    goto _OVER;
×
82
  }
83

84
  indexInit(tsNumOfCommitThreads);
2,405✔
85
  streamMetaInit();
2,405✔
86

87
  if ((code = dmInitStatusClient(pDnode)) != 0) {
2,405!
88
    goto _OVER;
×
89
  }
90
  if ((code = dmInitSyncClient(pDnode)) != 0) {
2,405!
91
    goto _OVER;
×
92
  }
93

94
  dmReportStartup("dnode-transport", "initialized");
2,405✔
95
  dDebug("dnode is created, ptr:%p", pDnode);
2,405✔
96
  code = 0;
2,405✔
97

98
_OVER:
2,407✔
99
  if (code != 0 && pDnode != NULL) {
2,407!
100
    dmClearVars(pDnode);
2✔
101
    pDnode = NULL;
2✔
102
    dError("failed to create dnode since %s", tstrerror(code));
2!
103
  }
104

105
  return code;
2,407✔
106
}
107

108
void dmCleanupDnode(SDnode *pDnode) {
2,405✔
109
  if (pDnode == NULL) {
2,405!
110
    return;
×
111
  }
112

113
  dmCleanupClient(pDnode);
2,405✔
114
  dmCleanupStatusClient(pDnode);
2,405✔
115
  dmCleanupSyncClient(pDnode);
2,405✔
116
  dmCleanupServer(pDnode);
2,405✔
117

118
  dmClearVars(pDnode);
2,405✔
119
  rpcCleanup();
2,405✔
120
  streamMetaCleanup();
2,405✔
121
  indexCleanup();
2,405✔
122
  taosConvDestroy();
2,405✔
123

124
  // compress destroy
125
  tsCompressExit();
2,405✔
126

127
  dDebug("dnode is closed, ptr:%p", pDnode);
2,405✔
128
}
129

130
int32_t dmInitVarsWrapper(SDnode *pDnode) {
2,407✔
131
  int32_t code = dmInitVars(pDnode);
2,407✔
132
  if (code == -1) {
2,407!
133
    return terrno;
×
134
  }
135
  return 0;
2,407✔
136
}
137
int32_t dmInitVars(SDnode *pDnode) {
2,407✔
138
  int32_t     code = 0;
2,407✔
139
  SDnodeData *pData = &pDnode->data;
2,407✔
140
  pData->dnodeId = 0;
2,407✔
141
  pData->clusterId = 0;
2,407✔
142
  pData->dnodeVer = 0;
2,407✔
143
  pData->engineVer = 0;
2,407✔
144
  pData->updateTime = 0;
2,407✔
145
  pData->rebootTime = taosGetTimestampMs();
2,407✔
146
  pData->dropped = 0;
2,407✔
147
  pData->stopped = 0;
2,407✔
148
  char *machineId = NULL;
2,407✔
149
  code = tGetMachineId(&machineId);
2,407✔
150
  if (machineId) {
2,407!
151
    tstrncpy(pData->machineId, machineId, TSDB_MACHINE_ID_LEN + 1);
2,407✔
152
    taosMemoryFreeClear(machineId);
2,407!
153
  } else {
154
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
155
    code = TSDB_CODE_DNODE_NO_MACHINE_CODE;
×
156
    return terrno = code;
×
157
#endif
158
  }
159

160
  pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
2,407✔
161
  if (pData->dnodeHash == NULL) {
2,407!
162
    dError("failed to init dnode hash");
×
163
    return terrno;
×
164
  }
165

166
  if ((code = dmReadEps(pData)) != 0) {
2,407!
167
    dError("failed to read file since %s", tstrerror(code));
×
168
    return code;
×
169
  }
170

171
#if defined(TD_ENTERPRISE)
172
  tsiEncryptAlgorithm = pData->encryptAlgorigthm;
2,407✔
173
  tsiEncryptScope = pData->encryptScope;
2,407✔
174
  /*
175
  if(tsiEncryptAlgorithm != 0) {
176
    if(pData->machineId != NULL && strlen(pData->machineId) > 0){
177
      dInfo("get crypt key at startup, machineId:%s", pData->machineId);
178
      int32_t code = 0;
179

180
      //code = taosGetCryptKey(tsAuthCode, pData->machineId, tsCryptKey);
181
      code = 0;
182
      strncpy(tsEncryptKey, tsAuthCode, 16);
183

184
      if (code != 0) {
185
        if(code == -1){
186
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
187
          dError("machine code changed, can't get crypt key");
188
        }
189
        if(code == -2){
190
          terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
191
          dError("failed to get crypt key");
192
        }
193
        return -1;
194
      }
195

196
      if(strlen(tsEncryptKey) == 0){
197
        terrno = TSDB_CODE_DNODE_NO_ENCRYPT_KEY;
198
        dError("failed to get crypt key at startup since key is null, machineId:%s", pData->machineId);
199
        return -1;
200
      }
201
    }
202
    else{
203
      terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
204
      dError("failed to get crypt key at startup, machineId:%s", pData->machineId);
205
      return -1;
206
    }
207
  }
208
  */
209
#endif
210

211
  if (pData->dropped) {
2,407!
212
    dError("dnode will not start since its already dropped");
×
213
    return -1;
×
214
  }
215

216
  (void)taosThreadRwlockInit(&pData->lock, NULL);
2,407✔
217
  (void)taosThreadMutexInit(&pData->statusInfolock, NULL);
2,407✔
218
  (void)taosThreadMutexInit(&pDnode->mutex, NULL);
2,407✔
219
  return 0;
2,407✔
220
}
221

222
extern SMonVloadInfo tsVinfo;
223
void dmClearVars(SDnode *pDnode) {
2,407✔
224
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
14,442✔
225
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
12,035✔
226
    taosMemoryFreeClear(pWrapper->path);
12,035!
227
    (void)taosThreadRwlockDestroy(&pWrapper->lock);
12,035✔
228
  }
229
  if (pDnode->lockfile != NULL) {
2,407✔
230
    if (taosUnLockFile(pDnode->lockfile) != 0) {
2,406!
231
      dError("failed to unlock file");
×
232
    }
233

234
    (void)taosCloseFile(&pDnode->lockfile);
2,406✔
235
    pDnode->lockfile = NULL;
2,406✔
236
  }
237

238
  SDnodeData *pData = &pDnode->data;
2,407✔
239
  (void)taosThreadRwlockWrlock(&pData->lock);
2,407✔
240
  if (pData->oldDnodeEps != NULL) {
2,407!
241
    if (dmWriteEps(pData) == 0) {
×
242
      dmRemoveDnodePairs(pData);
×
243
    }
244
    taosArrayDestroy(pData->oldDnodeEps);
×
245
    pData->oldDnodeEps = NULL;
×
246
  }
247
  if (pData->dnodeEps != NULL) {
2,407!
248
    taosArrayDestroy(pData->dnodeEps);
2,407✔
249
    pData->dnodeEps = NULL;
2,407✔
250
  }
251
  if (pData->dnodeHash != NULL) {
2,407!
252
    taosHashCleanup(pData->dnodeHash);
2,407✔
253
    pData->dnodeHash = NULL;
2,407✔
254
  }
255
  (void)taosThreadRwlockUnlock(&pData->lock);
2,407✔
256

257
  (void)taosThreadRwlockDestroy(&pData->lock);
2,407✔
258

259
  dDebug("begin to lock status info when thread exit");
2,407✔
260
  if (taosThreadMutexLock(&pData->statusInfolock) != 0) {
2,407!
261
    dError("failed to lock status info lock");
×
262
    return;
×
263
  }
264
  if (tsVinfo.pVloads != NULL) {
2,407✔
265
    taosArrayDestroy(tsVinfo.pVloads);
484✔
266
    tsVinfo.pVloads = NULL;
484✔
267
  }
268
  if (taosThreadMutexUnlock(&pData->statusInfolock) != 0) {
2,407!
269
    dError("failed to unlock status info lock");
×
270
    return;
×
271
  }
272
  if (taosThreadMutexDestroy(&pData->statusInfolock) != 0) {
2,407!
273
    dError("failed to destroy status info lock");
×
274
  }
275
  memset(&pData->statusInfolock, 0, sizeof(pData->statusInfolock));
2,407✔
276

277
  (void)taosThreadMutexDestroy(&pDnode->mutex);
2,407✔
278
  memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
2,407✔
279
}
280

281
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
4,810✔
282
  if (pDnode->status != status) {
4,810!
283
    dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
4,810✔
284
    pDnode->status = status;
4,810✔
285
  }
286
}
4,810✔
287

288
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
1,540✔
289
  SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
1,540✔
290
  SMgmtWrapper *pRetWrapper = pWrapper;
1,540✔
291

292
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
1,540✔
293
  if (pWrapper->deployed) {
1,540✔
294
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
963✔
295
    // dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
296
  } else {
297
    pRetWrapper = NULL;
577✔
298
  }
299
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
1,540✔
300

301
  return pRetWrapper;
1,540✔
302
}
303

304
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
50,229,024✔
305
  int32_t code = 0;
50,229,024✔
306

307
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
50,229,024✔
308
  if (pWrapper->deployed) {
50,315,400✔
309
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
50,213,958✔
310
    // dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
311
  } else {
312
    switch (pWrapper->ntype) {
101,442!
313
      case MNODE:
36,986✔
314
        code = TSDB_CODE_MNODE_NOT_FOUND;
36,986✔
315
        break;
36,986✔
316
      case QNODE:
62,425✔
317
        code = TSDB_CODE_QNODE_NOT_FOUND;
62,425✔
318
        break;
62,425✔
319
      case SNODE:
20✔
320
        code = TSDB_CODE_SNODE_NOT_FOUND;
20✔
321
        break;
20✔
322
      case VNODE:
2,011✔
323
        code = TSDB_CODE_VND_STOPPED;
2,011✔
324
        break;
2,011✔
325
      default:
×
326
        code = TSDB_CODE_APP_IS_STOPPING;
×
327
        break;
×
328
    }
329
  }
330
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
50,331,003✔
331

332
  return code;
50,264,750✔
333
}
334

335
void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
50,262,424✔
336
  if (pWrapper == NULL) return;
50,262,424✔
337

338
  (void)taosThreadRwlockRdlock(&pWrapper->lock);
50,218,596✔
339
  int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
50,239,826✔
340
  (void)taosThreadRwlockUnlock(&pWrapper->lock);
50,240,773✔
341
  // dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
342
}
343

344
static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
×
345
  SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt;
×
346
  pStatus->details[0] = 0;
×
347

348
  if (pDnode->status == DND_STAT_INIT) {
×
349
    pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
×
350
    snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
×
351
  } else if (pDnode->status == DND_STAT_STOPPED) {
×
352
    pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
×
353
  } else {
354
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
×
355
  }
356
}
×
357

358
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
×
359
  dDebug("msg:%p, net test req will be processed", pMsg);
×
360

361
  SRpcMsg rsp = {.info = pMsg->info};
×
362
  rsp.pCont = rpcMallocCont(pMsg->contLen);
×
363
  if (rsp.pCont == NULL) {
×
364
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
365
  } else {
366
    rsp.contLen = pMsg->contLen;
×
367
  }
368

369
  if (rpcSendResponse(&rsp) != 0) {
×
370
    dError("failed to send response, msg:%p", &rsp);
×
371
  }
372
  rpcFreeCont(pMsg->pCont);
×
373
}
×
374

375
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
×
376
  dDebug("msg:%p, server startup status req will be processed", pMsg);
×
377

378
  SServerStatusRsp statusRsp = {0};
×
379
  dmGetServerStartupStatus(pDnode, &statusRsp);
×
380

381
  SRpcMsg rsp = {.info = pMsg->info};
×
382
  int32_t contLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
×
383
  if (contLen < 0) {
×
384
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
×
385
  } else {
386
    rsp.pCont = rpcMallocCont(contLen);
×
387
    if (rsp.pCont != NULL) {
×
388
      if (tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp) < 0) {
×
389
        rsp.code = TSDB_CODE_APP_ERROR;
×
390
      } else {
391
        rsp.contLen = contLen;
×
392
      }
393
    }
394
  }
395

396
  if (rpcSendResponse(&rsp) != 0) {
×
397
    dError("failed to send response, msg:%p", &rsp);
×
398
  }
399
  rpcFreeCont(pMsg->pCont);
×
400
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc