• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.2
/source/dnode/mgmt/node_util/src/dmEps.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmUtil.h"
18
#include "tjson.h"
19
#include "tmisce.h"
20

21
typedef struct {
22
  int32_t  id;
23
  uint16_t oldPort;
24
  uint16_t newPort;
25
  char     oldFqdn[TSDB_FQDN_LEN];
26
  char     newFqdn[TSDB_FQDN_LEN];
27
} SDnodeEpPair;
28

29
static void    dmPrintEps(SDnodeData *pData);
30
static bool    dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
31
static void    dmResetEps(SDnodeData *pData, SArray *dnodeEps);
32
static int32_t dmReadDnodePairs(SDnodeData *pData);
33

34
void dmGetDnodeEp(void *data, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
×
35
  SDnodeData *pData = data;
×
36
  (void)taosThreadRwlockRdlock(&pData->lock);
×
37

38
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
×
39
  if (pDnodeEp != NULL) {
×
40
    if (pPort != NULL) {
×
41
      *pPort = pDnodeEp->ep.port;
×
42
    }
43
    if (pFqdn != NULL) {
×
44
      tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
×
45
    }
46
    if (pEp != NULL) {
×
47
      snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
×
48
    }
49
  }
50

51
  (void)taosThreadRwlockUnlock(&pData->lock);
×
52
}
×
53

54
static int32_t dmDecodeEps(SJson *pJson, SDnodeData *pData) {
473✔
55
  int32_t code = 0;
473✔
56

57
  tjsonGetInt32ValueFromDouble(pJson, "dnodeId", pData->dnodeId, code);
473✔
58
  if (code < 0) return -1;
473!
59
  tjsonGetNumberValue(pJson, "dnodeVer", pData->dnodeVer, code);
473✔
60
  if (code < 0) return -1;
473!
61
  tjsonGetNumberValue(pJson, "engineVer", pData->engineVer, code);
473✔
62
  if (code < 0) return -1;
473!
63
  tjsonGetNumberValue(pJson, "clusterId", pData->clusterId, code);
473✔
64
  if (code < 0) return -1;
473!
65
  tjsonGetInt32ValueFromDouble(pJson, "dropped", pData->dropped, code);
473✔
66
  if (code < 0) return -1;
473!
67
#ifdef TD_ENTERPRISE
68
  tjsonGetInt32ValueFromDouble(pJson, "encryptAlgor", pData->encryptAlgorigthm, code);
473✔
69
  if (code < 0) return -1;
473!
70
  tjsonGetInt32ValueFromDouble(pJson, "encryptScope", pData->encryptScope, code);
473✔
71
  if (code < 0) return -1;
473!
72
#endif
73
  SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
473✔
74
  if (dnodes == NULL) return 0;
473!
75
  int32_t numOfDnodes = tjsonGetArraySize(dnodes);
473✔
76

77
  for (int32_t i = 0; i < numOfDnodes; ++i) {
1,978✔
78
    SJson *dnode = tjsonGetArrayItem(dnodes, i);
1,505✔
79
    if (dnode == NULL) return -1;
1,505!
80

81
    SDnodeEp dnodeEp = {0};
1,505✔
82
    tjsonGetInt32ValueFromDouble(dnode, "id", dnodeEp.id, code);
1,505✔
83
    if (code < 0) return -1;
1,505!
84
    code = tjsonGetStringValue(dnode, "fqdn", dnodeEp.ep.fqdn);
1,505✔
85
    if (code < 0) return -1;
1,505!
86
    tjsonGetUInt16ValueFromDouble(dnode, "port", dnodeEp.ep.port, code);
1,505✔
87
    if (code < 0) return -1;
1,505!
88
    tjsonGetInt8ValueFromDouble(dnode, "isMnode", dnodeEp.isMnode, code);
1,505✔
89
    if (code < 0) return -1;
1,505!
90

91
    if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) return -1;
3,010!
92
  }
93

94
  return 0;
473✔
95
}
96

97
int dmOccurrences(char *str, char *toSearch) {
×
98
  int   count = 0;
×
99
  char *ptr = str;
×
100
  while ((ptr = strstr(ptr, toSearch)) != NULL) {
×
101
    count++;
×
102
    ptr++;
×
103
  }
104
  return count;
×
105
}
106

107
void dmSplitStr(char **arr, char *str, const char *del) {
×
108
  char *lasts;
109
  char *s = strsep(&str, del);
×
110
  while (s != NULL) {
×
111
    *arr++ = s;
×
112
    s = strsep(&str, del);
×
113
  }
114
}
×
115

116
int32_t dmReadEps(SDnodeData *pData) {
2,407✔
117
  int32_t   code = -1;
2,407✔
118
  TdFilePtr pFile = NULL;
2,407✔
119
  char     *content = NULL;
2,407✔
120
  SJson    *pJson = NULL;
2,407✔
121
  char      file[PATH_MAX] = {0};
2,407✔
122
  snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
2,407✔
123

124
  pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
2,407✔
125
  if (pData->dnodeEps == NULL) {
2,407!
126
    code = terrno;
×
127
    dError("failed to calloc dnodeEp array since %s", terrstr());
×
128
    goto _OVER;
×
129
  }
130

131
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
2,407✔
132
    dInfo("dnode file:%s not exist", file);
1,934!
133

134
#ifdef TD_ENTERPRISE
135
    if (strlen(tsEncryptAlgorithm) > 0) {
1,934!
136
      if (strcmp(tsEncryptAlgorithm, "sm4") == 0) {
×
137
        pData->encryptAlgorigthm = DND_CA_SM4;
×
138
      } else {
139
        terrno = TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG;
×
140
        dError("invalid tsEncryptAlgorithm:%s", tsEncryptAlgorithm);
×
141
        goto _OVER;
×
142
      }
143

144
      dInfo("start to parse encryptScope:%s", tsEncryptScope);
×
145
      int32_t scopeLen = strlen(tsEncryptScope);
×
146
      if (scopeLen == 0) {
×
147
        terrno = TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG;
×
148
        dError("invalid tsEncryptScope:%s", tsEncryptScope);
×
149
        goto _OVER;
×
150
      }
151

152
      char *tmp = taosMemoryMalloc(scopeLen + 1);
×
153
      if (tmp == NULL) {
×
154
        dError("failed to malloc memory for tsEncryptScope:%s", tsEncryptScope);
×
155
        goto _OVER;
×
156
      }
157
      memset(tmp, 0, scopeLen + 1);
×
158
      memcpy(tmp, tsEncryptScope, scopeLen);
×
159

160
      int32_t count = dmOccurrences(tmp, ",");
×
161

162
      char **array = taosMemoryMalloc(sizeof(char *) * (count + 1));
×
163
      memset(array, 0, sizeof(char *) * (count + 1));
×
164
      dmSplitStr(array, tmp, ",");
×
165

166
      for (int32_t i = 0; i < count + 1; i++) {
×
167
        char *str = *(array + i);
×
168

169
        bool success = false;
×
170

171
        if (strcasecmp(str, "tsdb") == 0 || strcasecmp(str, "all") == 0) {
×
172
          pData->encryptScope |= DND_CS_TSDB;
×
173
          success = true;
×
174
        }
175
        if (strcasecmp(str, "vnode_wal") == 0 || strcasecmp(str, "all") == 0) {
×
176
          pData->encryptScope |= DND_CS_VNODE_WAL;
×
177
          success = true;
×
178
        }
179
        if (strcasecmp(str, "sdb") == 0 || strcasecmp(str, "all") == 0) {
×
180
          pData->encryptScope |= DND_CS_SDB;
×
181
          success = true;
×
182
        }
183
        if (strcasecmp(str, "mnode_wal") == 0 || strcasecmp(str, "all") == 0) {
×
184
          pData->encryptScope |= DND_CS_MNODE_WAL;
×
185
          success = true;
×
186
        }
187

188
        if (!success) {
×
189
          terrno = TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG;
×
190
          taosMemoryFree(tmp);
×
191
          taosMemoryFree(array);
×
192
          dError("invalid tsEncryptScope:%s", tsEncryptScope);
×
193
          goto _OVER;
×
194
        }
195
      }
196

197
      taosMemoryFree(tmp);
×
198
      taosMemoryFree(array);
×
199

200
      dInfo("set tsCryptAlgorithm:%s, tsCryptScope:%s from cfg", tsEncryptAlgorithm, tsEncryptScope);
×
201
    }
202

203
#endif
204
    code = 0;
1,934✔
205
    goto _OVER;
1,934✔
206
  }
207

208
  pFile = taosOpenFile(file, TD_FILE_READ);
473✔
209
  if (pFile == NULL) {
473!
210
    code = terrno;
×
211
    dError("failed to open dnode file:%s since %s", file, terrstr());
×
212
    goto _OVER;
×
213
  }
214

215
  int64_t size = 0;
473✔
216
  code = taosFStatFile(pFile, &size, NULL);
473✔
217
  if (code != 0) {
473!
218
    dError("failed to fstat dnode file:%s since %s", file, terrstr());
×
219
    goto _OVER;
×
220
  }
221

222
  content = taosMemoryMalloc(size + 1);
473✔
223
  if (content == NULL) {
473!
224
    code = terrno;
×
225
    goto _OVER;
×
226
  }
227

228
  if (taosReadFile(pFile, content, size) != size) {
473!
229
    code = terrno;
×
230
    dError("failed to read dnode file:%s since %s", file, terrstr());
×
231
    goto _OVER;
×
232
  }
233

234
  content[size] = '\0';
473✔
235

236
  pJson = tjsonParse(content);
473✔
237
  if (pJson == NULL) {
473!
238
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
239
    goto _OVER;
×
240
  }
241

242
  if ((code = dmDecodeEps(pJson, pData)) < 0) {
473!
243
    goto _OVER;
×
244
  }
245

246
  code = 0;
473✔
247
  dInfo("succceed to read dnode file %s", file);
473!
248

249
_OVER:
×
250
  if (content != NULL) taosMemoryFree(content);
2,407✔
251
  if (pJson != NULL) cJSON_Delete(pJson);
2,407✔
252
  if (pFile != NULL) taosCloseFile(&pFile);
2,407✔
253

254
  if (code != 0) {
2,407!
255
    dError("failed to read dnode file:%s since %s", file, terrstr());
×
256
    return terrno = code;
×
257
  }
258

259
  if (taosArrayGetSize(pData->dnodeEps) == 0) {
2,407✔
260
    SDnodeEp dnodeEp = {0};
1,934✔
261
    dnodeEp.isMnode = 1;
1,934✔
262
    if (taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep) != 0) {
1,934!
263
      dError("failed to get fqdn and port from ep:%s", tsFirst);
×
264
    }
265
    if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) {
3,868!
266
      return terrno;
×
267
    }
268
  }
269

270
  if ((code = dmReadDnodePairs(pData)) != 0) {
2,407!
271
    return terrno = code;
×
272
  }
273

274
  dDebug("reset dnode list on startup");
2,407✔
275
  dmResetEps(pData, pData->dnodeEps);
2,407✔
276

277
  if (pData->oldDnodeEps == NULL && dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
2,407!
278
    dError("localEp %s different with %s and need to be reconfigured", tsLocalEp, file);
×
279
    code = TSDB_CODE_INVALID_CFG;
×
280
    return terrno = code;
×
281
  }
282

283
  return code;
2,407✔
284
}
285

286
static int32_t dmEncodeEps(SJson *pJson, SDnodeData *pData) {
5,687✔
287
  if (tjsonAddDoubleToObject(pJson, "dnodeId", pData->dnodeId) < 0) return -1;
5,687!
288
  if (tjsonAddIntegerToObject(pJson, "dnodeVer", pData->dnodeVer) < 0) return -1;
5,687!
289
  if (tjsonAddIntegerToObject(pJson, "engineVer", pData->engineVer) < 0) return -1;
5,687!
290
  if (tjsonAddIntegerToObject(pJson, "clusterId", pData->clusterId) < 0) return -1;
5,687!
291
  if (tjsonAddDoubleToObject(pJson, "dropped", pData->dropped) < 0) return -1;
5,687!
292
#ifdef TD_ENTERPRISE
293
  if (tjsonAddDoubleToObject(pJson, "encryptAlgor", pData->encryptAlgorigthm) < 0) return -1;
5,687!
294
  if (tjsonAddDoubleToObject(pJson, "encryptScope", pData->encryptScope) < 0) return -1;
5,687!
295
#endif
296
  SJson *dnodes = tjsonCreateArray();
5,687✔
297
  if (dnodes == NULL) return -1;
5,687!
298
  if (tjsonAddItemToObject(pJson, "dnodes", dnodes) < 0) return -1;
5,687!
299

300
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
5,687✔
301
  for (int32_t i = 0; i < numOfEps; ++i) {
19,671✔
302
    SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
13,984✔
303
    SJson    *dnode = tjsonCreateObject();
13,984✔
304
    if (dnode == NULL) return -1;
13,984!
305

306
    if (tjsonAddDoubleToObject(dnode, "id", pDnodeEp->id) < 0) return -1;
13,984!
307
    if (tjsonAddStringToObject(dnode, "fqdn", pDnodeEp->ep.fqdn) < 0) return -1;
13,984!
308
    if (tjsonAddDoubleToObject(dnode, "port", pDnodeEp->ep.port) < 0) return -1;
13,984!
309
    if (tjsonAddDoubleToObject(dnode, "isMnode", pDnodeEp->isMnode) < 0) return -1;
13,984!
310
    if (tjsonAddItemToArray(dnodes, dnode) < 0) return -1;
13,984!
311
  }
312

313
  return 0;
5,687✔
314
}
315

316
int32_t dmWriteEps(SDnodeData *pData) {
5,687✔
317
  int32_t   code = 0;
5,687✔
318
  char     *buffer = NULL;
5,687✔
319
  SJson    *pJson = NULL;
5,687✔
320
  TdFilePtr pFile = NULL;
5,687✔
321
  char      file[PATH_MAX] = {0};
5,687✔
322
  char      realfile[PATH_MAX] = {0};
5,687✔
323
  snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
5,687✔
324
  snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
5,687✔
325

326
  // if ((code == dmInitDndInfo(pData)) != 0) goto _OVER;
327
  TAOS_CHECK_GOTO(dmInitDndInfo(pData), NULL, _OVER);
5,687!
328

329
  pJson = tjsonCreateObject();
5,687✔
330
  if (pJson == NULL) TAOS_CHECK_GOTO(terrno, NULL, _OVER);
5,687!
331

332
  pData->engineVer = tsVersion;
5,687✔
333

334
  TAOS_CHECK_GOTO(dmEncodeEps(pJson, pData), NULL, _OVER);  // dmEncodeEps(pJson, pData) != 0) goto _OVER;
5,687!
335

336
  buffer = tjsonToString(pJson);
5,687✔
337
  if (buffer == NULL) {
5,687!
338
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
339
  }
340

341
  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
5,687✔
342
  if (pFile == NULL) TAOS_CHECK_GOTO(terrno, NULL, _OVER);
5,687!
343

344
  int32_t len = strlen(buffer);
5,687✔
345
  if (taosWriteFile(pFile, buffer, len) <= 0) TAOS_CHECK_GOTO(terrno, NULL, _OVER);
5,687!
346
  if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(terrno, NULL, _OVER);
5,687!
347

348
  (void)taosCloseFile(&pFile);
5,687✔
349
  TAOS_CHECK_GOTO(taosRenameFile(file, realfile), NULL, _OVER);
5,687!
350

351
  pData->updateTime = taosGetTimestampMs();
5,687✔
352
  dInfo("succeed to write dnode file:%s, num:%d ver:%" PRId64, realfile, (int32_t)taosArrayGetSize(pData->dnodeEps),
5,687!
353
        pData->dnodeVer);
354

355
_OVER:
×
356
  if (pJson != NULL) tjsonDelete(pJson);
5,687!
357
  if (buffer != NULL) taosMemoryFree(buffer);
5,687!
358
  if (pFile != NULL) (void)taosCloseFile(&pFile);
5,687!
359

360
  if (code != 0) {
5,687!
361
    dError("failed to write dnode file:%s since %s, dnodeVer:%" PRId64, realfile, tstrerror(code), pData->dnodeVer);
×
362
  }
363
  return code;
5,687✔
364
}
365

366
int32_t dmGetDnodeSize(SDnodeData *pData) {
×
367
  int32_t size = 0;
×
368
  (void)taosThreadRwlockRdlock(&pData->lock);
×
369
  size = taosArrayGetSize(pData->dnodeEps);
×
370
  (void)taosThreadRwlockUnlock(&pData->lock);
×
371
  return size;
×
372
}
373

374
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
3,763✔
375
  if (taosThreadRwlockWrlock(&pData->lock) != 0) {
3,763!
376
    dError("failed to lock dnode lock");
×
377
  }
378

379
  dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
3,763✔
380
  dmResetEps(pData, eps);
3,763✔
381
  if (dmWriteEps(pData) != 0) {
3,763!
382
    dError("failed to write dnode file");
×
383
  }
384

385
  if (taosThreadRwlockUnlock(&pData->lock) != 0) {
3,763!
386
    dError("failed to unlock dnode lock");
×
387
  }
388
}
3,763✔
389

390
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
6,170✔
391
  if (pData->dnodeEps != dnodeEps) {
6,170✔
392
    SArray *tmp = pData->dnodeEps;
3,763✔
393
    pData->dnodeEps = taosArrayDup(dnodeEps, NULL);
3,763✔
394
    taosArrayDestroy(tmp);
3,763✔
395
  }
396

397
  pData->mnodeEps.inUse = 0;
6,170✔
398
  pData->mnodeEps.numOfEps = 0;
6,170✔
399

400
  int32_t mIndex = 0;
6,170✔
401
  int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
6,170✔
402

403
  for (int32_t i = 0; i < numOfEps; i++) {
21,669✔
404
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
15,499✔
405
    if (!pDnodeEp->isMnode) continue;
15,499✔
406
    if (mIndex >= TSDB_MAX_REPLICA) continue;
7,612!
407
    pData->mnodeEps.numOfEps++;
7,612✔
408

409
    pData->mnodeEps.eps[mIndex] = pDnodeEp->ep;
7,612✔
410
    mIndex++;
7,612✔
411
  }
412
  epsetSort(&pData->mnodeEps);
6,170✔
413

414
  for (int32_t i = 0; i < numOfEps; i++) {
21,669✔
415
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
15,499✔
416
    int32_t   code = taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
15,499✔
417
    if (code) {
15,499!
418
      dError("dnode:%d, fqdn:%s port:%u isMnode:%d failed to put into hash, reason:%s", pDnodeEp->id, pDnodeEp->ep.fqdn,
×
419
             pDnodeEp->ep.port, pDnodeEp->isMnode, tstrerror(code));
420
    }
421
  }
422

423
  pData->validMnodeEps = true;
6,170✔
424

425
  dmPrintEps(pData);
6,170✔
426
}
6,170✔
427

428
static void dmPrintEps(SDnodeData *pData) {
6,170✔
429
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
6,170✔
430
  dDebug("print dnode list, num:%d", numOfEps);
6,170✔
431
  for (int32_t i = 0; i < numOfEps; i++) {
21,669✔
432
    SDnodeEp *pEp = taosArrayGet(pData->dnodeEps, i);
15,499✔
433
    dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
15,499✔
434
  }
435
}
6,170✔
436

437
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
2,407✔
438
  bool changed = false;
2,407✔
439
  if (dnodeId == 0) return changed;
2,407✔
440
  (void)taosThreadRwlockRdlock(&pData->lock);
473✔
441

442
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
473✔
443
  if (pDnodeEp != NULL) {
473!
444
    char epstr[TSDB_EP_LEN + 1] = {0};
473✔
445
    snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
473✔
446
    changed = (strcmp(ep, epstr) != 0);
473✔
447
    if (changed) {
473!
448
      dError("dnode:%d, localEp %s different from %s", dnodeId, ep, epstr);
×
449
    }
450
  }
451

452
  (void)taosThreadRwlockUnlock(&pData->lock);
473✔
453
  return changed;
473✔
454
}
455

456
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
104,400✔
457
  (void)taosThreadRwlockRdlock(&pData->lock);
104,400✔
458
  *pEpSet = pData->mnodeEps;
104,405✔
459
  (void)taosThreadRwlockUnlock(&pData->lock);
104,405✔
460
}
104,401✔
461

UNCOV
462
void dmEpSetToStr(char *buf, int32_t len, SEpSet *epSet) {
×
UNCOV
463
  int32_t n = 0;
×
UNCOV
464
  n += tsnprintf(buf + n, len - n, "%s", "{");
×
UNCOV
465
  for (int i = 0; i < epSet->numOfEps; i++) {
×
UNCOV
466
    n += tsnprintf(buf + n, len - n, "%s:%d%s", epSet->eps[i].fqdn, epSet->eps[i].port,
×
UNCOV
467
                  (i + 1 < epSet->numOfEps ? ", " : ""));
×
468
  }
UNCOV
469
  n += tsnprintf(buf + n, len - n, "%s", "}");
×
UNCOV
470
}
×
471

472
static FORCE_INLINE void dmSwapEps(SEp *epLhs, SEp *epRhs) {
473
  SEp epTmp;
474

UNCOV
475
  epTmp.port = epLhs->port;
×
UNCOV
476
  tstrncpy(epTmp.fqdn, epLhs->fqdn, tListLen(epTmp.fqdn));
×
477

UNCOV
478
  epLhs->port = epRhs->port;
×
UNCOV
479
  tstrncpy(epLhs->fqdn, epRhs->fqdn, tListLen(epLhs->fqdn));
×
480

UNCOV
481
  epRhs->port = epTmp.port;
×
UNCOV
482
  tstrncpy(epRhs->fqdn, epTmp.fqdn, tListLen(epRhs->fqdn));
×
UNCOV
483
}
×
484

UNCOV
485
void dmRotateMnodeEpSet(SDnodeData *pData) {
×
UNCOV
486
  (void)taosThreadRwlockRdlock(&pData->lock);
×
UNCOV
487
  SEpSet *pEpSet = &pData->mnodeEps;
×
UNCOV
488
  for (int i = 1; i < pEpSet->numOfEps; i++) {
×
UNCOV
489
    dmSwapEps(&pEpSet->eps[i - 1], &pEpSet->eps[i]);
×
490
  }
UNCOV
491
  (void)taosThreadRwlockUnlock(&pData->lock);
×
UNCOV
492
}
×
493

494
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
265✔
495
  if (!pData->validMnodeEps) return;
265!
496
  dmGetMnodeEpSet(pData, pEpSet);
265✔
497
  dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse);
265!
498
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
686✔
499
    dTrace("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
421!
500
    if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) {
421✔
501
      pEpSet->inUse = (i + 1) % pEpSet->numOfEps;
18✔
502
    }
503
  }
504
}
505

506
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
270✔
507
  if (memcmp(pEpSet, &pData->mnodeEps, sizeof(SEpSet)) == 0) return;
270!
508
  (void)taosThreadRwlockWrlock(&pData->lock);
270✔
509
  pData->mnodeEps = *pEpSet;
270✔
510
  (void)taosThreadRwlockUnlock(&pData->lock);
270✔
511

512
  dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
270!
513
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
1,058✔
514
    dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
788!
515
  }
516
}
517

518
bool dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) {
1,159,196✔
519
  bool        updated = false;
1,159,196✔
520
  SDnodeData *pData = data;
1,159,196✔
521
  int32_t     dnodeId = -1;
1,159,196✔
522
  if (did != NULL) dnodeId = *did;
1,159,196✔
523

524
  (void)taosThreadRwlockRdlock(&pData->lock);
1,159,196✔
525

526
  if (pData->oldDnodeEps != NULL) {
1,159,246!
527
    int32_t size = (int32_t)taosArrayGetSize(pData->oldDnodeEps);
×
528
    for (int32_t i = 0; i < size; ++i) {
×
529
      SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
530
      if (strcmp(pair->oldFqdn, fqdn) == 0 && pair->oldPort == *port) {
×
531
        dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pair->newFqdn, pair->newPort);
×
532
        tstrncpy(fqdn, pair->newFqdn, TSDB_FQDN_LEN);
×
533
        *port = pair->newPort;
×
534
        updated = true;
×
535
      }
536
    }
537
  }
538

539
  if (did != NULL && dnodeId <= 0) {
1,159,246✔
540
    int32_t size = (int32_t)taosArrayGetSize(pData->dnodeEps);
86✔
541
    for (int32_t i = 0; i < size; ++i) {
378✔
542
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
292✔
543
      if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) {
292!
544
        dInfo("dnode:%s:%u, update dnodeId to dnode:%d", fqdn, *port, pDnodeEp->id);
86!
545
        *did = pDnodeEp->id;
86✔
546
        if (clusterId != NULL) *clusterId = pData->clusterId;
86!
547
      }
548
    }
549
  }
550

551
  if (dnodeId > 0) {
1,159,246✔
552
    SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
69,958✔
553
    if (pDnodeEp) {
69,983✔
554
      if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0 || pDnodeEp->ep.port != *port) {
63,468!
555
        dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
22!
556
        tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
×
557
        *port = pDnodeEp->ep.port;
×
558
        updated = true;
×
559
      }
560
      if (clusterId != NULL) *clusterId = pData->clusterId;
63,446✔
561
    }
562
  }
563

564
  (void)taosThreadRwlockUnlock(&pData->lock);
1,159,249✔
565
  return updated;
1,159,254✔
566
}
567

568
static int32_t dmDecodeEpPairs(SJson *pJson, SDnodeData *pData) {
×
569
  int32_t code = 0;
×
570

571
  SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
×
572
  if (dnodes == NULL) return TSDB_CODE_INVALID_CFG_VALUE;
×
573
  int32_t numOfDnodes = tjsonGetArraySize(dnodes);
×
574

575
  for (int32_t i = 0; i < numOfDnodes; ++i) {
×
576
    SJson *dnode = tjsonGetArrayItem(dnodes, i);
×
577
    if (dnode == NULL) return TSDB_CODE_INVALID_CFG_VALUE;
×
578

579
    SDnodeEpPair pair = {0};
×
580
    tjsonGetInt32ValueFromDouble(dnode, "id", pair.id, code);
×
581
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
582
    code = tjsonGetStringValue(dnode, "fqdn", pair.oldFqdn);
×
583
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
584
    tjsonGetUInt16ValueFromDouble(dnode, "port", pair.oldPort, code);
×
585
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
586
    code = tjsonGetStringValue(dnode, "new_fqdn", pair.newFqdn);
×
587
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
588
    tjsonGetUInt16ValueFromDouble(dnode, "new_port", pair.newPort, code);
×
589
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
590

591
    if (taosArrayPush(pData->oldDnodeEps, &pair) == NULL) return terrno;
×
592
  }
593

594
  return code;
×
595
}
596

597
void dmRemoveDnodePairs(SDnodeData *pData) {
×
598
  char file[PATH_MAX] = {0};
×
599
  char bak[PATH_MAX] = {0};
×
600
  snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
×
601
  snprintf(bak, sizeof(bak), "%s%sdnode%sep.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
×
602
  dInfo("dnode file:%s is rename to bak file", file);
×
603
  if (taosRenameFile(file, bak) != 0) {
×
604
    dError("failed to rename dnode file:%s to bak file:%s since %s", file, bak, tstrerror(terrno));
×
605
  }
606
}
×
607

608
static int32_t dmReadDnodePairs(SDnodeData *pData) {
2,407✔
609
  int32_t   code = -1;
2,407✔
610
  TdFilePtr pFile = NULL;
2,407✔
611
  char     *content = NULL;
2,407✔
612
  SJson    *pJson = NULL;
2,407✔
613
  char      file[PATH_MAX] = {0};
2,407✔
614
  snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
2,407✔
615

616
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
2,407!
617
    code = terrno;
2,407✔
618
    dDebug("dnode file:%s not exist, reason:%s", file, tstrerror(code));
2,407✔
619
    code = 0;
2,407✔
620
    goto _OVER;
2,407✔
621
  }
622

623
  pFile = taosOpenFile(file, TD_FILE_READ);
×
624
  if (pFile == NULL) {
×
625
    code = terrno;
×
626
    dError("failed to open dnode file:%s since %s", file, terrstr());
×
627
    goto _OVER;
×
628
  }
629

630
  int64_t size = 0;
×
631
  code = taosFStatFile(pFile, &size, NULL);
×
632
  if (code != 0) {
×
633
    dError("failed to fstat dnode file:%s since %s", file, terrstr());
×
634
    goto _OVER;
×
635
  }
636

637
  content = taosMemoryMalloc(size + 1);
×
638
  if (content == NULL) {
×
639
    code = terrno;
×
640
    goto _OVER;
×
641
  }
642

643
  if (taosReadFile(pFile, content, size) != size) {
×
644
    terrno = terrno;
×
645
    dError("failed to read dnode file:%s since %s", file, terrstr());
×
646
    goto _OVER;
×
647
  }
648

649
  content[size] = '\0';
×
650

651
  pJson = tjsonParse(content);
×
652
  if (pJson == NULL) {
×
653
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
654
    goto _OVER;
×
655
  }
656

657
  pData->oldDnodeEps = taosArrayInit(1, sizeof(SDnodeEpPair));
×
658
  if (pData->oldDnodeEps == NULL) {
×
659
    code = terrno;
×
660
    dError("failed to calloc dnodeEp array since %s", strerror(errno));
×
661
    goto _OVER;
×
662
  }
663

664
  if (dmDecodeEpPairs(pJson, pData) < 0) {
×
665
    taosArrayDestroy(pData->oldDnodeEps);
×
666
    pData->oldDnodeEps = NULL;
×
667

668
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
669
    goto _OVER;
×
670
  }
671

672
  code = 0;
×
673
  dInfo("succceed to read dnode file %s", file);
×
674

675
_OVER:
×
676
  if (content != NULL) taosMemoryFree(content);
2,407!
677
  if (pJson != NULL) cJSON_Delete(pJson);
2,407!
678
  if (pFile != NULL) taosCloseFile(&pFile);
2,407!
679

680
  if (code != 0) {
2,407!
681
    dError("failed to read dnode file:%s since %s", file, tstrerror(code));
×
682
    return code;
×
683
  }
684

685
  // update old fqdn and port
686
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
2,407!
687
    SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
688
    for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
×
689
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
×
690
      if (pDnodeEp->id == pair->id) {
×
691
        tstrncpy(pair->oldFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
×
692
        pair->oldPort = pDnodeEp->ep.port;
×
693
      }
694
    }
695
  }
696

697
  // check new fqdn and port
698
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
2,407!
699
    SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
700
    for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
×
701
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
×
702
      if (pDnodeEp->id != pair->id &&
×
703
          (strcmp(pDnodeEp->ep.fqdn, pair->newFqdn) == 0 && pDnodeEp->ep.port == pair->newPort)) {
×
704
        dError("dnode:%d, can't update ep:%s:%u to %s:%u since already exists as dnode:%d", pair->id, pair->oldFqdn,
×
705
               pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id);
706
        taosArrayDestroy(pData->oldDnodeEps);
×
707
        pData->oldDnodeEps = NULL;
×
708
        code = TSDB_CODE_INVALID_CFG;
×
709
        return code;
×
710
      }
711
    }
712
  }
713

714
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
2,407!
715
    SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
716
    for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
×
717
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
×
718
      if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) {
×
719
        dInfo("dnode:%d, will update ep:%s:%u to %s:%u", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port,
×
720
              pair->newFqdn, pair->newPort);
721
        tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN);
×
722
        pDnodeEp->ep.port = pair->newPort;
×
723
      }
724
    }
725
  }
726

727
  pData->dnodeVer = 0;
2,407✔
728
  return 0;
2,407✔
729
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc