• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4822

27 Oct 2025 05:42AM UTC coverage: 59.732% (+1.0%) from 58.728%
#4822

push

travis-ci

web-flow
Merge pull request #33377 from taosdata/fix/main/rename-udf-path

fix: update UDF example links to correct file paths

121214 of 258518 branches covered (46.89%)

Branch coverage included in aggregate %.

193636 of 268583 relevant lines covered (72.1%)

4002399.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.51
/source/dnode/mgmt/node_util/src/dmEps.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmUtil.h"
18
#include "tjson.h"
19
#include "tmisce.h"
20

21
typedef struct {
22
  int32_t  id;
23
  uint16_t oldPort;
24
  uint16_t newPort;
25
  char     oldFqdn[TSDB_FQDN_LEN];
26
  char     newFqdn[TSDB_FQDN_LEN];
27
} SDnodeEpPair;
28

29
static void    dmPrintEps(SDnodeData *pData);
30
static bool    dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
31
static void    dmResetEps(SDnodeData *pData, SArray *dnodeEps);
32
static int32_t dmReadDnodePairs(SDnodeData *pData);
33

34
void dmGetDnodeEp(void *data, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
×
35
  SDnodeData *pData = data;
×
36
  (void)taosThreadRwlockRdlock(&pData->lock);
×
37

38
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
×
39
  if (pDnodeEp != NULL) {
×
40
    if (pPort != NULL) {
×
41
      *pPort = pDnodeEp->ep.port;
×
42
    }
43
    if (pFqdn != NULL) {
×
44
      tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
×
45
    }
46
    if (pEp != NULL) {
×
47
      snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
×
48
    }
49
  }
50

51
  (void)taosThreadRwlockUnlock(&pData->lock);
×
52
}
×
53

54
static int32_t dmDecodeEps(SJson *pJson, SDnodeData *pData) {
425✔
55
  int32_t code = 0;
425✔
56

57
  tjsonGetInt32ValueFromDouble(pJson, "dnodeId", pData->dnodeId, code);
425✔
58
  if (code < 0) return -1;
425!
59
  tjsonGetNumberValue(pJson, "dnodeVer", pData->dnodeVer, code);
425✔
60
  if (code < 0) return -1;
425!
61
  tjsonGetNumberValue(pJson, "engineVer", pData->engineVer, code);
425✔
62
  if (code < 0) return -1;
425!
63
  tjsonGetNumberValue(pJson, "clusterId", pData->clusterId, code);
425✔
64
  if (code < 0) return -1;
425!
65
  tjsonGetInt32ValueFromDouble(pJson, "dropped", pData->dropped, code);
425✔
66
  if (code < 0) return -1;
425!
67
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
68
  tjsonGetInt32ValueFromDouble(pJson, "encryptAlgor", pData->encryptAlgorigthm, code);
425✔
69
  if (code < 0) return -1;
425!
70
  tjsonGetInt32ValueFromDouble(pJson, "encryptScope", pData->encryptScope, code);
425✔
71
  if (code < 0) return -1;
425!
72
#endif
73
  SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
425✔
74
  if (dnodes == NULL) return 0;
425!
75
  int32_t numOfDnodes = tjsonGetArraySize(dnodes);
425✔
76

77
  for (int32_t i = 0; i < numOfDnodes; ++i) {
1,991✔
78
    SJson *dnode = tjsonGetArrayItem(dnodes, i);
1,566✔
79
    if (dnode == NULL) return -1;
1,566!
80

81
    SDnodeEp dnodeEp = {0};
1,566✔
82
    tjsonGetInt32ValueFromDouble(dnode, "id", dnodeEp.id, code);
1,566✔
83
    if (code < 0) return -1;
1,566!
84
    code = tjsonGetStringValue(dnode, "fqdn", dnodeEp.ep.fqdn);
1,566✔
85
    if (code < 0) return -1;
1,566!
86
    tjsonGetUInt16ValueFromDouble(dnode, "port", dnodeEp.ep.port, code);
1,566✔
87
    if (code < 0) return -1;
1,566!
88
    tjsonGetInt8ValueFromDouble(dnode, "isMnode", dnodeEp.isMnode, code);
1,566✔
89
    if (code < 0) return -1;
1,566!
90

91
    if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) return -1;
3,132!
92
  }
93

94
  return 0;
425✔
95
}
96

97
int dmOccurrences(char *str, char *toSearch) {
×
98
  int   count = 0;
×
99
  char *ptr = str;
×
100
  while ((ptr = strstr(ptr, toSearch)) != NULL) {
×
101
    count++;
×
102
    ptr++;
×
103
  }
104
  return count;
×
105
}
106

107
void dmSplitStr(char **arr, char *str, const char *del) {
×
108
  char *lasts;
109
  char *s = strsep(&str, del);
×
110
  while (s != NULL) {
×
111
    *arr++ = s;
×
112
    s = strsep(&str, del);
×
113
  }
114
}
×
115

116
int32_t dmReadEps(SDnodeData *pData) {
1,759✔
117
  int32_t   code = -1;
1,759✔
118
  TdFilePtr pFile = NULL;
1,759✔
119
  char     *content = NULL;
1,759✔
120
  SJson    *pJson = NULL;
1,759✔
121
  char      file[PATH_MAX] = {0};
1,759✔
122
  snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
1,759✔
123

124
  pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
1,759✔
125
  if (pData->dnodeEps == NULL) {
1,759!
126
    code = terrno;
×
127
    dError("failed to calloc dnodeEp array since %s", terrstr());
×
128
    goto _OVER;
×
129
  }
130

131
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
1,759✔
132
    dInfo("dnode file:%s not exist", file);
1,334!
133

134
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
135
    if (strlen(tsEncryptAlgorithm) > 0) {
1,334!
136
      if (strcmp(tsEncryptAlgorithm, "sm4") == 0) {
×
137
        pData->encryptAlgorigthm = DND_CA_SM4;
×
138
      } else {
139
        terrno = TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG;
×
140
        dError("invalid tsEncryptAlgorithm:%s", tsEncryptAlgorithm);
×
141
        goto _OVER;
×
142
      }
143

144
      dInfo("start to parse encryptScope:%s", tsEncryptScope);
×
145
      int32_t scopeLen = strlen(tsEncryptScope);
×
146
      if (scopeLen == 0) {
×
147
        terrno = TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG;
×
148
        dError("invalid tsEncryptScope:%s", tsEncryptScope);
×
149
        goto _OVER;
×
150
      }
151

152
      char *tmp = taosMemoryMalloc(scopeLen + 1);
×
153
      if (tmp == NULL) {
×
154
        dError("failed to malloc memory for tsEncryptScope:%s", tsEncryptScope);
×
155
        goto _OVER;
×
156
      }
157
      memset(tmp, 0, scopeLen + 1);
×
158
      memcpy(tmp, tsEncryptScope, scopeLen);
×
159

160
      int32_t count = dmOccurrences(tmp, ",");
×
161

162
      char **array = taosMemoryMalloc(sizeof(char *) * (count + 1));
×
163
      memset(array, 0, sizeof(char *) * (count + 1));
×
164
      dmSplitStr(array, tmp, ",");
×
165

166
      for (int32_t i = 0; i < count + 1; i++) {
×
167
        char *str = *(array + i);
×
168

169
        bool success = false;
×
170

171
        if (strcasecmp(str, "tsdb") == 0 || strcasecmp(str, "all") == 0) {
×
172
          pData->encryptScope |= DND_CS_TSDB;
×
173
          success = true;
×
174
        }
175
        if (strcasecmp(str, "vnode_wal") == 0 || strcasecmp(str, "all") == 0) {
×
176
          pData->encryptScope |= DND_CS_VNODE_WAL;
×
177
          success = true;
×
178
        }
179
        if (strcasecmp(str, "sdb") == 0 || strcasecmp(str, "all") == 0) {
×
180
          pData->encryptScope |= DND_CS_SDB;
×
181
          success = true;
×
182
        }
183
        if (strcasecmp(str, "mnode_wal") == 0 || strcasecmp(str, "all") == 0) {
×
184
          pData->encryptScope |= DND_CS_MNODE_WAL;
×
185
          success = true;
×
186
        }
187

188
        if (!success) {
×
189
          terrno = TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG;
×
190
          taosMemoryFree(tmp);
×
191
          taosMemoryFree(array);
×
192
          dError("invalid tsEncryptScope:%s", tsEncryptScope);
×
193
          goto _OVER;
×
194
        }
195
      }
196

197
      taosMemoryFree(tmp);
×
198
      taosMemoryFree(array);
×
199

200
      dInfo("set tsCryptAlgorithm:%s, tsCryptScope:%s from cfg", tsEncryptAlgorithm, tsEncryptScope);
×
201
    }
202

203
#endif
204
    code = 0;
1,334✔
205
    goto _OVER;
1,334✔
206
  }
207

208
  pFile = taosOpenFile(file, TD_FILE_READ);
425✔
209
  if (pFile == NULL) {
425!
210
    code = terrno;
×
211
    dError("failed to open dnode file:%s since %s", file, terrstr());
×
212
    goto _OVER;
×
213
  }
214

215
  int64_t size = 0;
425✔
216
  code = taosFStatFile(pFile, &size, NULL);
425✔
217
  if (code != 0) {
425!
218
    dError("failed to fstat dnode file:%s since %s", file, terrstr());
×
219
    goto _OVER;
×
220
  }
221

222
  content = taosMemoryMalloc(size + 1);
425!
223
  if (content == NULL) {
425!
224
    code = terrno;
×
225
    goto _OVER;
×
226
  }
227

228
  if (taosReadFile(pFile, content, size) != size) {
425!
229
    code = terrno;
×
230
    dError("failed to read dnode file:%s since %s", file, terrstr());
×
231
    goto _OVER;
×
232
  }
233

234
  content[size] = '\0';
425✔
235

236
  pJson = tjsonParse(content);
425✔
237
  if (pJson == NULL) {
425!
238
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
239
    goto _OVER;
×
240
  }
241

242
  if ((code = dmDecodeEps(pJson, pData)) < 0) {
425!
243
    goto _OVER;
×
244
  }
245

246
  code = 0;
425✔
247
  dInfo("succceed to read dnode file %s", file);
425!
248

249
_OVER:
×
250
  if (content != NULL) taosMemoryFree(content);
1,759!
251
  if (pJson != NULL) cJSON_Delete(pJson);
1,759✔
252
  if (pFile != NULL) taosCloseFile(&pFile);
1,759✔
253

254
  if (code != 0) {
1,759!
255
    dError("failed to read dnode file:%s since %s", file, terrstr());
×
256
    return terrno = code;
×
257
  }
258

259
  if (taosArrayGetSize(pData->dnodeEps) == 0) {
1,759✔
260
    SDnodeEp dnodeEp = {0};
1,334✔
261
    dnodeEp.isMnode = 1;
1,334✔
262
    if (taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep) != 0) {
1,334!
263
      dError("failed to get fqdn and port from ep:%s", tsFirst);
×
264
    }
265
    if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) {
2,668!
266
      return terrno;
×
267
    }
268
  }
269

270
  if ((code = dmReadDnodePairs(pData)) != 0) {
1,759!
271
    return terrno = code;
×
272
  }
273

274
  dDebug("reset dnode list on startup");
1,759✔
275
  dmResetEps(pData, pData->dnodeEps);
1,759✔
276

277
  if (pData->oldDnodeEps == NULL && dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
1,759!
278
    dError("localEp %s different with %s and need to be reconfigured", tsLocalEp, file);
×
279
    code = TSDB_CODE_INVALID_CFG;
×
280
    return terrno = code;
×
281
  }
282

283
  return code;
1,759✔
284
}
285

286
static int32_t dmEncodeEps(SJson *pJson, SDnodeData *pData) {
8,838✔
287
  if (tjsonAddDoubleToObject(pJson, "dnodeId", pData->dnodeId) < 0) return -1;
8,838!
288
  if (tjsonAddIntegerToObject(pJson, "dnodeVer", pData->dnodeVer) < 0) return -1;
8,838!
289
  if (tjsonAddIntegerToObject(pJson, "engineVer", pData->engineVer) < 0) return -1;
8,838!
290
  if (tjsonAddIntegerToObject(pJson, "clusterId", pData->clusterId) < 0) return -1;
8,838!
291
  if (tjsonAddDoubleToObject(pJson, "dropped", pData->dropped) < 0) return -1;
8,838!
292
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
293
  if (tjsonAddDoubleToObject(pJson, "encryptAlgor", pData->encryptAlgorigthm) < 0) return -1;
8,838!
294
  if (tjsonAddDoubleToObject(pJson, "encryptScope", pData->encryptScope) < 0) return -1;
8,838!
295
#endif
296
  SJson *dnodes = tjsonCreateArray();
8,838✔
297
  if (dnodes == NULL) return -1;
8,838!
298
  if (tjsonAddItemToObject(pJson, "dnodes", dnodes) < 0) return -1;
8,838!
299

300
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
8,838✔
301
  for (int32_t i = 0; i < numOfEps; ++i) {
38,771✔
302
    SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
29,933✔
303
    SJson    *dnode = tjsonCreateObject();
29,933✔
304
    if (dnode == NULL) return -1;
29,933!
305

306
    if (tjsonAddDoubleToObject(dnode, "id", pDnodeEp->id) < 0) return -1;
29,933!
307
    if (tjsonAddStringToObject(dnode, "fqdn", pDnodeEp->ep.fqdn) < 0) return -1;
29,933!
308
    if (tjsonAddDoubleToObject(dnode, "port", pDnodeEp->ep.port) < 0) return -1;
29,933!
309
    if (tjsonAddDoubleToObject(dnode, "isMnode", pDnodeEp->isMnode) < 0) return -1;
29,933!
310
    if (tjsonAddItemToArray(dnodes, dnode) < 0) return -1;
29,933!
311
  }
312

313
  return 0;
8,838✔
314
}
315

316
int32_t dmWriteEps(SDnodeData *pData) {
8,838✔
317
  int32_t   code = 0;
8,838✔
318
  char     *buffer = NULL;
8,838✔
319
  SJson    *pJson = NULL;
8,838✔
320
  TdFilePtr pFile = NULL;
8,838✔
321
  char      file[PATH_MAX] = {0};
8,838✔
322
  char      realfile[PATH_MAX] = {0};
8,838✔
323
  snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
8,838✔
324
  snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
8,838✔
325

326
  // if ((code == dmInitDndInfo(pData)) != 0) goto _OVER;
327
  TAOS_CHECK_GOTO(dmInitDndInfo(pData), NULL, _OVER);
8,838!
328

329
  pJson = tjsonCreateObject();
8,838✔
330
  if (pJson == NULL) TAOS_CHECK_GOTO(terrno, NULL, _OVER);
8,838!
331

332
  pData->engineVer = tsVersion;
8,838✔
333

334
  TAOS_CHECK_GOTO(dmEncodeEps(pJson, pData), NULL, _OVER);  // dmEncodeEps(pJson, pData) != 0) goto _OVER;
8,838!
335

336
  buffer = tjsonToString(pJson);
8,838✔
337
  if (buffer == NULL) {
8,838!
338
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
339
  }
340

341
  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
8,838✔
342
  if (pFile == NULL) TAOS_CHECK_GOTO(terrno, NULL, _OVER);
8,838!
343

344
  int32_t len = strlen(buffer);
8,838✔
345
  if (taosWriteFile(pFile, buffer, len) <= 0) TAOS_CHECK_GOTO(terrno, NULL, _OVER);
8,838!
346
  if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(terrno, NULL, _OVER);
8,838!
347

348
  (void)taosCloseFile(&pFile);
8,838✔
349
  TAOS_CHECK_GOTO(taosRenameFile(file, realfile), NULL, _OVER);
8,838!
350

351
  pData->updateTime = taosGetTimestampMs();
8,838✔
352
  dInfo("succeed to write dnode file:%s, num:%d ver:%" PRId64, realfile, (int32_t)taosArrayGetSize(pData->dnodeEps),
8,838!
353
        pData->dnodeVer);
354

355
_OVER:
×
356
  if (pJson != NULL) tjsonDelete(pJson);
8,838!
357
  if (buffer != NULL) taosMemoryFree(buffer);
8,838!
358
  if (pFile != NULL) (void)taosCloseFile(&pFile);
8,838!
359

360
  if (code != 0) {
8,838!
361
    dError("failed to write dnode file:%s since %s, dnodeVer:%" PRId64, realfile, tstrerror(code), pData->dnodeVer);
×
362
  }
363
  return code;
8,838✔
364
}
365

366
int32_t dmGetDnodeSize(SDnodeData *pData) {
×
367
  int32_t size = 0;
×
368
  (void)taosThreadRwlockRdlock(&pData->lock);
×
369
  size = taosArrayGetSize(pData->dnodeEps);
×
370
  (void)taosThreadRwlockUnlock(&pData->lock);
×
371
  return size;
×
372
}
373

374
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
7,505✔
375
  if (taosThreadRwlockWrlock(&pData->lock) != 0) {
7,505!
376
    dError("failed to lock dnode lock");
×
377
  }
378

379
  dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
7,505✔
380
  dmResetEps(pData, eps);
7,505✔
381
  if (dmWriteEps(pData) != 0) {
7,505!
382
    dError("failed to write dnode file");
×
383
  }
384

385
  if (taosThreadRwlockUnlock(&pData->lock) != 0) {
7,505!
386
    dError("failed to unlock dnode lock");
×
387
  }
388
}
7,505✔
389

390
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
9,264✔
391
  if (pData->dnodeEps != dnodeEps) {
9,264✔
392
    SArray *tmp = pData->dnodeEps;
7,505✔
393
    pData->dnodeEps = taosArrayDup(dnodeEps, NULL);
7,505✔
394
    taosArrayDestroy(tmp);
7,505✔
395
  }
396

397
  pData->mnodeEps.inUse = 0;
9,264✔
398
  pData->mnodeEps.numOfEps = 0;
9,264✔
399

400
  int32_t mIndex = 0;
9,264✔
401
  int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
9,264✔
402

403
  for (int32_t i = 0; i < numOfEps; i++) {
40,764✔
404
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
31,500✔
405
    if (!pDnodeEp->isMnode) continue;
31,500✔
406
    if (mIndex >= TSDB_MAX_REPLICA) continue;
14,008!
407
    pData->mnodeEps.numOfEps++;
14,008✔
408

409
    pData->mnodeEps.eps[mIndex] = pDnodeEp->ep;
14,008✔
410
    mIndex++;
14,008✔
411
  }
412
  epsetSort(&pData->mnodeEps);
9,264✔
413

414
  for (int32_t i = 0; i < numOfEps; i++) {
40,764✔
415
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
31,500✔
416
    int32_t   code = taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
31,500✔
417
    if (code) {
31,500!
418
      dError("dnode:%d, fqdn:%s port:%u isMnode:%d failed to put into hash, reason:%s", pDnodeEp->id, pDnodeEp->ep.fqdn,
×
419
             pDnodeEp->ep.port, pDnodeEp->isMnode, tstrerror(code));
420
    }
421
  }
422

423
  pData->validMnodeEps = true;
9,264✔
424

425
  dmPrintEps(pData);
9,264✔
426
}
9,264✔
427

428
static void dmPrintEps(SDnodeData *pData) {
9,264✔
429
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
9,264✔
430
  dDebug("print dnode list, num:%d", numOfEps);
9,264✔
431
  for (int32_t i = 0; i < numOfEps; i++) {
40,764✔
432
    SDnodeEp *pEp = taosArrayGet(pData->dnodeEps, i);
31,500✔
433
    dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
31,500✔
434
  }
435
}
9,264✔
436

437
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
1,759✔
438
  bool changed = false;
1,759✔
439
  if (dnodeId == 0) return changed;
1,759✔
440
  (void)taosThreadRwlockRdlock(&pData->lock);
425✔
441

442
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
425✔
443
  if (pDnodeEp != NULL) {
425!
444
    char epstr[TSDB_EP_LEN + 1] = {0};
425✔
445
    snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
425✔
446
    changed = (strcmp(ep, epstr) != 0);
425✔
447
    if (changed) {
425!
448
      dError("dnode:%d, localEp %s different from %s", dnodeId, ep, epstr);
×
449
    }
450
  }
451

452
  (void)taosThreadRwlockUnlock(&pData->lock);
425✔
453
  return changed;
425✔
454
}
455

456
void dmGetMnodeEpSet(void* data, SEpSet *pEpSet) {
124,212✔
457
  SDnodeData *pData = (SDnodeData*)data;
124,212✔
458
  (void)taosThreadRwlockRdlock(&pData->lock);
124,212✔
459
  *pEpSet = pData->mnodeEps;
124,217✔
460
  (void)taosThreadRwlockUnlock(&pData->lock);
124,217✔
461
}
124,216✔
462

463
void dmEpSetToStr(char *buf, int32_t len, SEpSet *epSet) {
2,115✔
464
  int32_t n = 0;
2,115✔
465
  n += tsnprintf(buf + n, len - n, "%s", "{");
2,115✔
466
  for (int i = 0; i < epSet->numOfEps; i++) {
4,369✔
467
    n += tsnprintf(buf + n, len - n, "%s:%d%s", epSet->eps[i].fqdn, epSet->eps[i].port,
2,254✔
468
                  (i + 1 < epSet->numOfEps ? ", " : ""));
2,254✔
469
  }
470
  n += tsnprintf(buf + n, len - n, "%s", "}");
2,115✔
471
}
2,115✔
472

473
static FORCE_INLINE void dmSwapEps(SEp *epLhs, SEp *epRhs) {
474
  SEp epTmp;
475

476
  epTmp.port = epLhs->port;
139✔
477
  tstrncpy(epTmp.fqdn, epLhs->fqdn, tListLen(epTmp.fqdn));
139✔
478

479
  epLhs->port = epRhs->port;
139✔
480
  tstrncpy(epLhs->fqdn, epRhs->fqdn, tListLen(epLhs->fqdn));
139✔
481

482
  epRhs->port = epTmp.port;
139✔
483
  tstrncpy(epRhs->fqdn, epTmp.fqdn, tListLen(epRhs->fqdn));
139✔
484
}
139✔
485

486
void dmRotateMnodeEpSet(SDnodeData *pData) {
325✔
487
  (void)taosThreadRwlockRdlock(&pData->lock);
325✔
488
  SEpSet *pEpSet = &pData->mnodeEps;
325✔
489
  for (int i = 1; i < pEpSet->numOfEps; i++) {
464✔
490
    dmSwapEps(&pEpSet->eps[i - 1], &pEpSet->eps[i]);
139✔
491
  }
492
  (void)taosThreadRwlockUnlock(&pData->lock);
325✔
493
}
325✔
494

495
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
300✔
496
  if (!pData->validMnodeEps) return;
300!
497
  dmGetMnodeEpSet(pData, pEpSet);
300✔
498
  dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse);
307!
499
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
826✔
500
    dTrace("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
524!
501
    if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) {
518✔
502
      pEpSet->inUse = (i + 1) % pEpSet->numOfEps;
47✔
503
    }
504
  }
505
}
506

507
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
1,159✔
508
  if (memcmp(pEpSet, &pData->mnodeEps, sizeof(SEpSet)) == 0) return;
1,159!
509
  (void)taosThreadRwlockWrlock(&pData->lock);
1,159✔
510
  pData->mnodeEps = *pEpSet;
1,159✔
511
  (void)taosThreadRwlockUnlock(&pData->lock);
1,159✔
512

513
  dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
1,159!
514
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
4,625✔
515
    dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
3,466!
516
  }
517
}
518

519
bool dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) {
896,996✔
520
  bool        updated = false;
896,996✔
521
  SDnodeData *pData = data;
896,996✔
522
  int32_t     dnodeId = -1;
896,996✔
523
  if (did != NULL) dnodeId = *did;
896,996✔
524

525
  (void)taosThreadRwlockRdlock(&pData->lock);
896,996✔
526

527
  if (pData->oldDnodeEps != NULL) {
897,012!
528
    int32_t size = (int32_t)taosArrayGetSize(pData->oldDnodeEps);
×
529
    for (int32_t i = 0; i < size; ++i) {
×
530
      SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
531
      if (strcmp(pair->oldFqdn, fqdn) == 0 && pair->oldPort == *port) {
×
532
        dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pair->newFqdn, pair->newPort);
×
533
        tstrncpy(fqdn, pair->newFqdn, TSDB_FQDN_LEN);
×
534
        *port = pair->newPort;
×
535
        updated = true;
×
536
      }
537
    }
538
  }
539

540
  if (did != NULL && dnodeId <= 0) {
897,012✔
541
    int32_t size = (int32_t)taosArrayGetSize(pData->dnodeEps);
65✔
542
    for (int32_t i = 0; i < size; ++i) {
259✔
543
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
194✔
544
      if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) {
194!
545
        dInfo("dnode:%s:%u, update dnodeId to dnode:%d", fqdn, *port, pDnodeEp->id);
65!
546
        *did = pDnodeEp->id;
65✔
547
        if (clusterId != NULL) *clusterId = pData->clusterId;
65!
548
      }
549
    }
550
  }
551

552
  if (dnodeId > 0) {
897,012✔
553
    SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
61,035✔
554
    if (pDnodeEp) {
61,041✔
555
      if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0 || pDnodeEp->ep.port != *port) {
57,090!
556
        dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
×
557
        tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
×
558
        *port = pDnodeEp->ep.port;
×
559
        updated = true;
×
560
      }
561
      if (clusterId != NULL) *clusterId = pData->clusterId;
57,091✔
562
    }
563
  }
564

565
  (void)taosThreadRwlockUnlock(&pData->lock);
897,019✔
566
  return updated;
897,010✔
567
}
568

569
static int32_t dmDecodeEpPairs(SJson *pJson, SDnodeData *pData) {
×
570
  int32_t code = 0;
×
571

572
  SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
×
573
  if (dnodes == NULL) return TSDB_CODE_INVALID_CFG_VALUE;
×
574
  int32_t numOfDnodes = tjsonGetArraySize(dnodes);
×
575

576
  for (int32_t i = 0; i < numOfDnodes; ++i) {
×
577
    SJson *dnode = tjsonGetArrayItem(dnodes, i);
×
578
    if (dnode == NULL) return TSDB_CODE_INVALID_CFG_VALUE;
×
579

580
    SDnodeEpPair pair = {0};
×
581
    tjsonGetInt32ValueFromDouble(dnode, "id", pair.id, code);
×
582
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
583
    code = tjsonGetStringValue(dnode, "fqdn", pair.oldFqdn);
×
584
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
585
    tjsonGetUInt16ValueFromDouble(dnode, "port", pair.oldPort, code);
×
586
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
587
    code = tjsonGetStringValue(dnode, "new_fqdn", pair.newFqdn);
×
588
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
589
    tjsonGetUInt16ValueFromDouble(dnode, "new_port", pair.newPort, code);
×
590
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
591

592
    if (taosArrayPush(pData->oldDnodeEps, &pair) == NULL) return terrno;
×
593
  }
594

595
  return code;
×
596
}
597

598
void dmRemoveDnodePairs(SDnodeData *pData) {
×
599
  char file[PATH_MAX] = {0};
×
600
  char bak[PATH_MAX] = {0};
×
601
  snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
×
602
  snprintf(bak, sizeof(bak), "%s%sdnode%sep.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
×
603
  dInfo("dnode file:%s is rename to bak file", file);
×
604
  if (taosRenameFile(file, bak) != 0) {
×
605
    dError("failed to rename dnode file:%s to bak file:%s since %s", file, bak, tstrerror(terrno));
×
606
  }
607
}
×
608

609
static int32_t dmReadDnodePairs(SDnodeData *pData) {
1,759✔
610
  int32_t   code = -1;
1,759✔
611
  TdFilePtr pFile = NULL;
1,759✔
612
  char     *content = NULL;
1,759✔
613
  SJson    *pJson = NULL;
1,759✔
614
  char      file[PATH_MAX] = {0};
1,759✔
615
  snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
1,759✔
616

617
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
1,759!
618
    code = terrno;
1,759✔
619
    dDebug("dnode file:%s not exist, reason:%s", file, tstrerror(code));
1,759✔
620
    code = 0;
1,759✔
621
    goto _OVER;
1,759✔
622
  }
623

624
  pFile = taosOpenFile(file, TD_FILE_READ);
×
625
  if (pFile == NULL) {
×
626
    code = terrno;
×
627
    dError("failed to open dnode file:%s since %s", file, terrstr());
×
628
    goto _OVER;
×
629
  }
630

631
  int64_t size = 0;
×
632
  code = taosFStatFile(pFile, &size, NULL);
×
633
  if (code != 0) {
×
634
    dError("failed to fstat dnode file:%s since %s", file, terrstr());
×
635
    goto _OVER;
×
636
  }
637

638
  content = taosMemoryMalloc(size + 1);
×
639
  if (content == NULL) {
×
640
    code = terrno;
×
641
    goto _OVER;
×
642
  }
643

644
  if (taosReadFile(pFile, content, size) != size) {
×
645
    terrno = terrno;
×
646
    dError("failed to read dnode file:%s since %s", file, terrstr());
×
647
    goto _OVER;
×
648
  }
649

650
  content[size] = '\0';
×
651

652
  pJson = tjsonParse(content);
×
653
  if (pJson == NULL) {
×
654
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
655
    goto _OVER;
×
656
  }
657

658
  pData->oldDnodeEps = taosArrayInit(1, sizeof(SDnodeEpPair));
×
659
  if (pData->oldDnodeEps == NULL) {
×
660
    code = terrno;
×
661
    dError("failed to calloc dnodeEp array since %s", strerror(ERRNO));
×
662
    goto _OVER;
×
663
  }
664

665
  if (dmDecodeEpPairs(pJson, pData) < 0) {
×
666
    taosArrayDestroy(pData->oldDnodeEps);
×
667
    pData->oldDnodeEps = NULL;
×
668

669
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
670
    goto _OVER;
×
671
  }
672

673
  code = 0;
×
674
  dInfo("succceed to read dnode file %s", file);
×
675

676
_OVER:
×
677
  if (content != NULL) taosMemoryFree(content);
1,759!
678
  if (pJson != NULL) cJSON_Delete(pJson);
1,759!
679
  if (pFile != NULL) taosCloseFile(&pFile);
1,759!
680

681
  if (code != 0) {
1,759!
682
    dError("failed to read dnode file:%s since %s", file, tstrerror(code));
×
683
    return code;
×
684
  }
685

686
  // update old fqdn and port
687
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
1,759!
688
    SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
689
    for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
×
690
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
×
691
      if (pDnodeEp->id == pair->id) {
×
692
        tstrncpy(pair->oldFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
×
693
        pair->oldPort = pDnodeEp->ep.port;
×
694
      }
695
    }
696
  }
697

698
  // check new fqdn and port
699
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
1,759!
700
    SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
701
    for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
×
702
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
×
703
      if (pDnodeEp->id != pair->id &&
×
704
          (strcmp(pDnodeEp->ep.fqdn, pair->newFqdn) == 0 && pDnodeEp->ep.port == pair->newPort)) {
×
705
        dError("dnode:%d, can't update ep:%s:%u to %s:%u since already exists as dnode:%d", pair->id, pair->oldFqdn,
×
706
               pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id);
707
        taosArrayDestroy(pData->oldDnodeEps);
×
708
        pData->oldDnodeEps = NULL;
×
709
        code = TSDB_CODE_INVALID_CFG;
×
710
        return code;
×
711
      }
712
    }
713
  }
714

715
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
1,759!
716
    SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
717
    for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
×
718
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
×
719
      if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) {
×
720
        dInfo("dnode:%d, will update ep:%s:%u to %s:%u", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port,
×
721
              pair->newFqdn, pair->newPort);
722
        tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN);
×
723
        pDnodeEp->ep.port = pair->newPort;
×
724
      }
725
    }
726
  }
727

728
  pData->dnodeVer = 0;
1,759✔
729
  return 0;
1,759✔
730
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc