• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4910

30 Dec 2025 10:52AM UTC coverage: 65.864% (+0.3%) from 65.542%
#4910

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

999 existing lines in 108 files now uncovered.

194877 of 295877 relevant lines covered (65.86%)

121300574.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.63
/source/dnode/mgmt/node_util/src/dmEps.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "dmUtil.h"
18
#include "tencrypt.h"
19
#include "tjson.h"
20
#include "tmisce.h"
21

22
typedef struct {
23
  int32_t  id;
24
  uint16_t oldPort;
25
  uint16_t newPort;
26
  char     oldFqdn[TSDB_FQDN_LEN];
27
  char     newFqdn[TSDB_FQDN_LEN];
28
} SDnodeEpPair;
29

30
static void    dmPrintEps(SDnodeData *pData);
31
static bool    dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
32
static void    dmResetEps(SDnodeData *pData, SArray *dnodeEps);
33
static int32_t dmReadDnodePairs(SDnodeData *pData);
34

35
void dmGetDnodeEp(void *data, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
×
36
  SDnodeData *pData = data;
×
37
  (void)taosThreadRwlockRdlock(&pData->lock);
×
38

39
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
×
40
  if (pDnodeEp != NULL) {
×
41
    if (pPort != NULL) {
×
42
      *pPort = pDnodeEp->ep.port;
×
43
    }
44
    if (pFqdn != NULL) {
×
45
      tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
×
46
    }
47
    if (pEp != NULL) {
×
48
      snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
×
49
    }
50
  }
51

52
  (void)taosThreadRwlockUnlock(&pData->lock);
×
53
}
×
54

55
static int32_t dmDecodeEps(SJson *pJson, SDnodeData *pData) {
132,654✔
56
  int32_t code = 0;
132,654✔
57

58
  tjsonGetInt32ValueFromDouble(pJson, "dnodeId", pData->dnodeId, code);
132,654✔
59
  if (code < 0) return -1;
132,654✔
60
  tjsonGetNumberValue(pJson, "dnodeVer", pData->dnodeVer, code);
132,654✔
61
  if (code < 0) return -1;
132,654✔
62
  tjsonGetNumberValue(pJson, "engineVer", pData->engineVer, code);
132,654✔
63
  if (code < 0) return -1;
132,654✔
64
  tjsonGetNumberValue(pJson, "clusterId", pData->clusterId, code);
132,654✔
65
  if (code < 0) return -1;
132,654✔
66
  tjsonGetInt32ValueFromDouble(pJson, "dropped", pData->dropped, code);
132,654✔
67
  if (code < 0) return -1;
132,654✔
68
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
69
  tjsonGetInt32ValueFromDouble(pJson, "encryptAlgor", pData->encryptAlgorigthm, code);
132,654✔
70
  if (code < 0) return -1;
132,654✔
71
  tjsonGetInt32ValueFromDouble(pJson, "encryptScope", pData->encryptScope, code);
132,654✔
72
  if (code < 0) return -1;
132,654✔
73
#endif
74
  SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
132,654✔
75
  if (dnodes == NULL) return 0;
132,654✔
76
  int32_t numOfDnodes = tjsonGetArraySize(dnodes);
132,654✔
77

78
  for (int32_t i = 0; i < numOfDnodes; ++i) {
480,448✔
79
    SJson *dnode = tjsonGetArrayItem(dnodes, i);
347,794✔
80
    if (dnode == NULL) return -1;
347,794✔
81

82
    SDnodeEp dnodeEp = {0};
347,794✔
83
    tjsonGetInt32ValueFromDouble(dnode, "id", dnodeEp.id, code);
347,794✔
84
    if (code < 0) return -1;
347,794✔
85
    code = tjsonGetStringValue(dnode, "fqdn", dnodeEp.ep.fqdn);
347,794✔
86
    if (code < 0) return -1;
347,794✔
87
    tjsonGetUInt16ValueFromDouble(dnode, "port", dnodeEp.ep.port, code);
347,794✔
88
    if (code < 0) return -1;
347,794✔
89
    tjsonGetInt8ValueFromDouble(dnode, "isMnode", dnodeEp.isMnode, code);
347,794✔
90
    if (code < 0) return -1;
347,794✔
91

92
    if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) return -1;
695,588✔
93
  }
94

95
  return 0;
132,654✔
96
}
97

98
int dmOccurrences(char *str, char *toSearch) {
×
99
  int   count = 0;
×
100
  char *ptr = str;
×
101
  while ((ptr = strstr(ptr, toSearch)) != NULL) {
×
102
    count++;
×
103
    ptr++;
×
104
  }
105
  return count;
×
106
}
107

108
void dmSplitStr(char **arr, char *str, const char *del) {
×
109
  char *lasts;
110
  char *s = strsep(&str, del);
×
111
  while (s != NULL) {
×
112
    *arr++ = s;
×
113
    s = strsep(&str, del);
×
114
  }
115
}
×
116

117
int32_t dmReadEps(SDnodeData *pData) {
544,274✔
118
  int32_t   code = -1;
544,274✔
119
  char     *content = NULL;
544,274✔
120
  SJson    *pJson = NULL;
544,274✔
121
  char      file[PATH_MAX] = {0};
544,274✔
122
  snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
544,274✔
123

124
  pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
544,274✔
125
  if (pData->dnodeEps == NULL) {
544,274✔
126
    code = terrno;
×
127
    dError("failed to calloc dnodeEp array since %s", terrstr());
×
128
    goto _OVER;
×
129
  }
130

131
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
544,274✔
132
    dInfo("dnode file:%s not exist", file);
411,620✔
133

134
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
135
    if (strlen(tsEncryptAlgorithm) > 0) {
411,620✔
136
      if (strcmp(tsEncryptAlgorithm, "sm4") == 0) {
×
137
        pData->encryptAlgorigthm = DND_CA_SM4;
×
138
      } else {
139
        terrno = TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG;
×
140
        dError("invalid tsEncryptAlgorithm:%s", tsEncryptAlgorithm);
×
141
        goto _OVER;
×
142
      }
143

144
      dInfo("start to parse encryptScope:%s", tsEncryptScope);
×
145
      int32_t scopeLen = strlen(tsEncryptScope);
×
146
      if (scopeLen == 0) {
×
147
        terrno = TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG;
×
148
        dError("invalid tsEncryptScope:%s", tsEncryptScope);
×
149
        goto _OVER;
×
150
      }
151

152
      char *tmp = taosMemoryMalloc(scopeLen + 1);
×
153
      if (tmp == NULL) {
×
154
        dError("failed to malloc memory for tsEncryptScope:%s", tsEncryptScope);
×
155
        goto _OVER;
×
156
      }
157
      memset(tmp, 0, scopeLen + 1);
×
158
      memcpy(tmp, tsEncryptScope, scopeLen);
×
159

160
      int32_t count = dmOccurrences(tmp, ",");
×
161

162
      char **array = taosMemoryMalloc(sizeof(char *) * (count + 1));
×
163
      memset(array, 0, sizeof(char *) * (count + 1));
×
164
      dmSplitStr(array, tmp, ",");
×
165

166
      for (int32_t i = 0; i < count + 1; i++) {
×
167
        char *str = *(array + i);
×
168

169
        bool success = false;
×
170

171
        if (strcasecmp(str, "tsdb") == 0 || strcasecmp(str, "all") == 0) {
×
172
          pData->encryptScope |= DND_CS_TSDB;
×
173
          success = true;
×
174
        }
175
        if (strcasecmp(str, "vnode_wal") == 0 || strcasecmp(str, "all") == 0) {
×
176
          pData->encryptScope |= DND_CS_VNODE_WAL;
×
177
          success = true;
×
178
        }
179
        if (strcasecmp(str, "sdb") == 0 || strcasecmp(str, "all") == 0) {
×
180
          pData->encryptScope |= DND_CS_SDB;
×
181
          success = true;
×
182
        }
183
        if (strcasecmp(str, "mnode_wal") == 0 || strcasecmp(str, "all") == 0) {
×
184
          pData->encryptScope |= DND_CS_MNODE_WAL;
×
185
          success = true;
×
186
        }
187

188
        if (!success) {
×
189
          terrno = TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG;
×
190
          taosMemoryFree(tmp);
×
191
          taosMemoryFree(array);
×
192
          dError("invalid tsEncryptScope:%s", tsEncryptScope);
×
193
          goto _OVER;
×
194
        }
195
      }
196

197
      taosMemoryFree(tmp);
×
198
      taosMemoryFree(array);
×
199

200
      dInfo("set tsCryptAlgorithm:%s, tsCryptScope:%s from cfg", tsEncryptAlgorithm, tsEncryptScope);
×
201
    }
202

203
#endif
204
    code = 0;
411,620✔
205
    goto _OVER;
411,620✔
206
  }
207

208
  // Use taosReadCfgFile for automatic decryption support (returns null-terminated string)
209
  int32_t contentLen = 0;
132,654✔
210
  code = taosReadCfgFile(file, &content, &contentLen);
132,654✔
211
  if (code != 0) {
132,654✔
212
    dError("failed to read dnode file:%s since %s", file, terrstr());
×
213
    goto _OVER;
×
214
  }
215

216
  pJson = tjsonParse(content);
132,654✔
217
  if (pJson == NULL) {
132,654✔
218
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
219
    goto _OVER;
×
220
  }
221

222
  if ((code = dmDecodeEps(pJson, pData)) < 0) {
132,654✔
223
    goto _OVER;
×
224
  }
225

226
  code = 0;
132,654✔
227
  dInfo("succceed to read dnode file %s", file);
132,654✔
228

229
_OVER:
543,998✔
230
  if (content != NULL) taosMemoryFree(content);
544,274✔
231
  if (pJson != NULL) cJSON_Delete(pJson);
544,274✔
232

233
  if (code != 0) {
544,274✔
234
    dError("failed to read dnode file:%s since %s", file, terrstr());
×
235
    return terrno = code;
×
236
  }
237

238
  if (taosArrayGetSize(pData->dnodeEps) == 0) {
544,274✔
239
    SDnodeEp dnodeEp = {0};
411,620✔
240
    dnodeEp.isMnode = 1;
411,620✔
241
    if (taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep) != 0) {
411,620✔
242
      dError("failed to get fqdn and port from ep:%s", tsFirst);
×
243
    }
244
    if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) {
823,240✔
245
      return terrno;
×
246
    }
247
  }
248

249
  if ((code = dmReadDnodePairs(pData)) != 0) {
544,274✔
250
    return terrno = code;
×
251
  }
252

253
  dDebug("reset dnode list on startup");
544,274✔
254
  dmResetEps(pData, pData->dnodeEps);
544,274✔
255

256
  if (pData->oldDnodeEps == NULL && dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
544,274✔
257
    dError("localEp %s different with %s and need to be reconfigured", tsLocalEp, file);
×
258
    code = TSDB_CODE_INVALID_CFG;
×
259
    return terrno = code;
×
260
  }
261

262
  return code;
544,274✔
263
}
264

265
static int32_t dmEncodeEps(SJson *pJson, SDnodeData *pData) {
2,109,479✔
266
  if (tjsonAddDoubleToObject(pJson, "dnodeId", pData->dnodeId) < 0) return -1;
2,109,479✔
267
  if (tjsonAddIntegerToObject(pJson, "dnodeVer", pData->dnodeVer) < 0) return -1;
2,109,479✔
268
  if (tjsonAddIntegerToObject(pJson, "engineVer", pData->engineVer) < 0) return -1;
2,109,479✔
269
  if (tjsonAddIntegerToObject(pJson, "clusterId", pData->clusterId) < 0) return -1;
2,109,479✔
270
  if (tjsonAddDoubleToObject(pJson, "dropped", pData->dropped) < 0) return -1;
2,109,479✔
271
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
272
  if (tjsonAddDoubleToObject(pJson, "encryptAlgor", pData->encryptAlgorigthm) < 0) return -1;
2,109,479✔
273
  if (tjsonAddDoubleToObject(pJson, "encryptScope", pData->encryptScope) < 0) return -1;
2,109,479✔
274
#endif
275
  SJson *dnodes = tjsonCreateArray();
2,109,479✔
276
  if (dnodes == NULL) return -1;
2,109,479✔
277
  if (tjsonAddItemToObject(pJson, "dnodes", dnodes) < 0) return -1;
2,109,479✔
278

279
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
2,109,479✔
280
  for (int32_t i = 0; i < numOfEps; ++i) {
7,825,127✔
281
    SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
5,715,648✔
282
    SJson    *dnode = tjsonCreateObject();
5,715,648✔
283
    if (dnode == NULL) return -1;
5,715,648✔
284

285
    if (tjsonAddDoubleToObject(dnode, "id", pDnodeEp->id) < 0) return -1;
5,715,648✔
286
    if (tjsonAddStringToObject(dnode, "fqdn", pDnodeEp->ep.fqdn) < 0) return -1;
5,715,648✔
287
    if (tjsonAddDoubleToObject(dnode, "port", pDnodeEp->ep.port) < 0) return -1;
5,715,648✔
288
    if (tjsonAddDoubleToObject(dnode, "isMnode", pDnodeEp->isMnode) < 0) return -1;
5,715,648✔
289
    if (tjsonAddItemToArray(dnodes, dnode) < 0) return -1;
5,715,648✔
290
  }
291

292
  return 0;
2,109,479✔
293
}
294

295
int32_t dmWriteEps(SDnodeData *pData) {
2,114,011✔
296
  int32_t   code = 0;
2,114,011✔
297
  char     *buffer = NULL;
2,114,011✔
298
  SJson    *pJson = NULL;
2,114,011✔
299
  char      realfile[PATH_MAX] = {0};
2,114,011✔
300
  snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
2,114,011✔
301

302
  // if ((code == dmInitDndInfo(pData)) != 0) goto _OVER;
303
  TAOS_CHECK_GOTO(dmInitDndInfo(pData), NULL, _OVER);
2,114,011✔
304

305
  pJson = tjsonCreateObject();
2,109,479✔
306
  if (pJson == NULL) TAOS_CHECK_GOTO(terrno, NULL, _OVER);
2,109,479✔
307

308
  pData->engineVer = tsVersion;
2,109,479✔
309

310
  TAOS_CHECK_GOTO(dmEncodeEps(pJson, pData), NULL, _OVER);  // dmEncodeEps(pJson, pData) != 0) goto _OVER;
2,109,479✔
311

312
  buffer = tjsonToString(pJson);
2,109,479✔
313
  if (buffer == NULL) {
2,109,479✔
314
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
315
  }
316

317
  int32_t len = strlen(buffer);
2,109,479✔
318

319
  // Use encrypted write if tsCfgKey is enabled
320
  code = taosWriteCfgFile(realfile, buffer, len);
2,109,479✔
321
  if (code != 0) {
2,109,479✔
UNCOV
322
    TAOS_CHECK_GOTO(code, NULL, _OVER);
×
323
  }
324

325
  pData->updateTime = taosGetTimestampMs();
2,109,479✔
326
  dInfo("succeed to write dnode file:%s, num:%d ver:%" PRId64, realfile, (int32_t)taosArrayGetSize(pData->dnodeEps),
2,109,479✔
327
        pData->dnodeVer);
328

329
_OVER:
2,113,229✔
330
  if (pJson != NULL) tjsonDelete(pJson);
2,114,011✔
331
  if (buffer != NULL) taosMemoryFree(buffer);
2,114,011✔
332

333
  if (code != 0) {
2,114,011✔
334
    dError("failed to write dnode file:%s since %s, dnodeVer:%" PRId64, realfile, tstrerror(code), pData->dnodeVer);
4,532✔
335
  }
336
  return code;
2,114,011✔
337
}
338

339
int32_t dmGetDnodeSize(SDnodeData *pData) {
×
340
  int32_t size = 0;
×
341
  (void)taosThreadRwlockRdlock(&pData->lock);
×
342
  size = taosArrayGetSize(pData->dnodeEps);
×
343
  (void)taosThreadRwlockUnlock(&pData->lock);
×
344
  return size;
×
345
}
346

347
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
1,702,629✔
348
  if (taosThreadRwlockWrlock(&pData->lock) != 0) {
1,702,629✔
349
    dError("failed to lock dnode lock");
×
350
  }
351

352
  dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
1,702,629✔
353
  dmResetEps(pData, eps);
1,702,629✔
354
  if (dmWriteEps(pData) != 0) {
1,702,629✔
355
    dError("failed to write dnode file");
4,008✔
356
  }
357

358
  if (taosThreadRwlockUnlock(&pData->lock) != 0) {
1,702,629✔
359
    dError("failed to unlock dnode lock");
×
360
  }
361
}
1,702,629✔
362

363
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
2,246,903✔
364
  if (pData->dnodeEps != dnodeEps) {
2,246,903✔
365
    SArray *tmp = pData->dnodeEps;
1,702,629✔
366
    pData->dnodeEps = taosArrayDup(dnodeEps, NULL);
1,702,629✔
367
    taosArrayDestroy(tmp);
1,702,629✔
368
  }
369

370
  pData->mnodeEps.inUse = 0;
2,246,903✔
371
  pData->mnodeEps.numOfEps = 0;
2,246,903✔
372

373
  int32_t mIndex = 0;
2,246,903✔
374
  int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
2,246,903✔
375

376
  for (int32_t i = 0; i < numOfEps; i++) {
8,316,381✔
377
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
6,069,478✔
378
    if (!pDnodeEp->isMnode) continue;
6,069,478✔
379
    if (mIndex >= TSDB_MAX_REPLICA) continue;
2,634,733✔
380
    pData->mnodeEps.numOfEps++;
2,634,733✔
381

382
    pData->mnodeEps.eps[mIndex] = pDnodeEp->ep;
2,634,733✔
383
    mIndex++;
2,634,733✔
384
  }
385
  epsetSort(&pData->mnodeEps);
2,246,903✔
386

387
  for (int32_t i = 0; i < numOfEps; i++) {
8,316,381✔
388
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
6,069,478✔
389
    int32_t   code = taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
6,069,478✔
390
    if (code) {
6,069,478✔
391
      dError("dnode:%d, fqdn:%s port:%u isMnode:%d failed to put into hash, reason:%s", pDnodeEp->id, pDnodeEp->ep.fqdn,
×
392
             pDnodeEp->ep.port, pDnodeEp->isMnode, tstrerror(code));
393
    }
394
  }
395

396
  pData->validMnodeEps = true;
2,246,903✔
397

398
  dmPrintEps(pData);
2,246,903✔
399
}
2,246,903✔
400

401
static void dmPrintEps(SDnodeData *pData) {
2,246,903✔
402
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
2,246,903✔
403
  dDebug("print dnode list, num:%d", numOfEps);
2,246,903✔
404
  for (int32_t i = 0; i < numOfEps; i++) {
8,316,381✔
405
    SDnodeEp *pEp = taosArrayGet(pData->dnodeEps, i);
6,069,478✔
406
    dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
6,069,478✔
407
  }
408
}
2,246,903✔
409

410
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
544,274✔
411
  bool changed = false;
544,274✔
412
  if (dnodeId == 0) return changed;
544,274✔
413
  (void)taosThreadRwlockRdlock(&pData->lock);
132,654✔
414

415
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
132,654✔
416
  if (pDnodeEp != NULL) {
132,654✔
417
    char epstr[TSDB_EP_LEN + 1] = {0};
132,591✔
418
    snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
132,591✔
419
    changed = (strcmp(ep, epstr) != 0);
132,591✔
420
    if (changed) {
132,591✔
421
      dError("dnode:%d, localEp %s different from %s", dnodeId, ep, epstr);
×
422
    }
423
  }
424

425
  (void)taosThreadRwlockUnlock(&pData->lock);
132,654✔
426
  return changed;
132,654✔
427
}
428

429
void dmGetMnodeEpSet(void* data, SEpSet *pEpSet) {
53,588,754✔
430
  SDnodeData *pData = (SDnodeData*)data;
53,588,754✔
431
  (void)taosThreadRwlockRdlock(&pData->lock);
53,588,754✔
432
  *pEpSet = pData->mnodeEps;
53,588,799✔
433
  (void)taosThreadRwlockUnlock(&pData->lock);
53,588,799✔
434
}
53,588,860✔
435

436
void dmEpSetToStr(char *buf, int32_t len, SEpSet *epSet) {
1,583,707✔
437
  int32_t n = 0;
1,583,707✔
438
  n += tsnprintf(buf + n, len - n, "%s", "{");
1,583,707✔
439
  for (int i = 0; i < epSet->numOfEps; i++) {
3,188,497✔
440
    n += tsnprintf(buf + n, len - n, "%s:%d%s", epSet->eps[i].fqdn, epSet->eps[i].port,
1,604,790✔
441
                  (i + 1 < epSet->numOfEps ? ", " : ""));
1,604,790✔
442
  }
443
  n += tsnprintf(buf + n, len - n, "%s", "}");
1,583,707✔
444
}
1,583,707✔
445

446
static FORCE_INLINE void dmSwapEps(SEp *epLhs, SEp *epRhs) {
447
  SEp epTmp;
21,083✔
448

449
  epTmp.port = epLhs->port;
21,083✔
450
  tstrncpy(epTmp.fqdn, epLhs->fqdn, tListLen(epTmp.fqdn));
21,083✔
451

452
  epLhs->port = epRhs->port;
21,083✔
453
  tstrncpy(epLhs->fqdn, epRhs->fqdn, tListLen(epLhs->fqdn));
21,083✔
454

455
  epRhs->port = epTmp.port;
21,083✔
456
  tstrncpy(epRhs->fqdn, epTmp.fqdn, tListLen(epRhs->fqdn));
21,083✔
457
}
21,083✔
458

459
void dmRotateMnodeEpSet(SDnodeData *pData) {
491,839✔
460
  (void)taosThreadRwlockRdlock(&pData->lock);
491,839✔
461
  SEpSet *pEpSet = &pData->mnodeEps;
491,839✔
462
  for (int i = 1; i < pEpSet->numOfEps; i++) {
512,922✔
463
    dmSwapEps(&pEpSet->eps[i - 1], &pEpSet->eps[i]);
21,083✔
464
  }
465
  (void)taosThreadRwlockUnlock(&pData->lock);
491,839✔
466
}
491,839✔
467

468
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
44,482✔
469
  if (!pData->validMnodeEps) return;
44,482✔
470
  dmGetMnodeEpSet(pData, pEpSet);
44,482✔
471
  dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse);
44,626✔
472
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
112,955✔
473
    dTrace("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
68,944✔
474
    if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) {
68,944✔
475
      pEpSet->inUse = (i + 1) % pEpSet->numOfEps;
4,005✔
476
    }
477
  }
478
}
479

480
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
93,503✔
481
  if (memcmp(pEpSet, &pData->mnodeEps, sizeof(SEpSet)) == 0) return;
93,503✔
482
  (void)taosThreadRwlockWrlock(&pData->lock);
93,503✔
483
  pData->mnodeEps = *pEpSet;
93,503✔
484
  (void)taosThreadRwlockUnlock(&pData->lock);
93,503✔
485

486
  dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
93,503✔
487
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
372,986✔
488
    dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
279,483✔
489
  }
490
}
491

492
bool dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) {
278,668,568✔
493
  bool        updated = false;
278,668,568✔
494
  SDnodeData *pData = data;
278,668,568✔
495
  int32_t     dnodeId = -1;
278,668,568✔
496
  if (did != NULL) dnodeId = *did;
278,673,885✔
497

498
  (void)taosThreadRwlockRdlock(&pData->lock);
278,674,080✔
499

500
  if (pData->oldDnodeEps != NULL) {
278,675,156✔
501
    int32_t size = (int32_t)taosArrayGetSize(pData->oldDnodeEps);
×
502
    for (int32_t i = 0; i < size; ++i) {
×
503
      SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
504
      if (strcmp(pair->oldFqdn, fqdn) == 0 && pair->oldPort == *port) {
×
505
        dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pair->newFqdn, pair->newPort);
×
506
        tstrncpy(fqdn, pair->newFqdn, TSDB_FQDN_LEN);
×
507
        *port = pair->newPort;
×
508
        updated = true;
×
509
      }
510
    }
511
  }
512

513
  if (did != NULL && dnodeId <= 0) {
278,673,419✔
514
    int32_t size = (int32_t)taosArrayGetSize(pData->dnodeEps);
23,723✔
515
    for (int32_t i = 0; i < size; ++i) {
95,067✔
516
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
71,344✔
517
      if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) {
71,344✔
518
        dInfo("dnode:%s:%u, update dnodeId to dnode:%d", fqdn, *port, pDnodeEp->id);
23,723✔
519
        *did = pDnodeEp->id;
23,723✔
520
        if (clusterId != NULL) *clusterId = pData->clusterId;
23,723✔
521
      }
522
    }
523
  }
524

525
  if (dnodeId > 0) {
278,673,419✔
526
    SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
21,053,645✔
527
    if (pDnodeEp) {
21,057,149✔
528
      if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0 || pDnodeEp->ep.port != *port) {
19,675,717✔
529
        dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
×
530
        tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
×
531
        *port = pDnodeEp->ep.port;
×
532
        updated = true;
×
533
      }
534
      if (clusterId != NULL) *clusterId = pData->clusterId;
19,673,124✔
535
    }
536
  }
537

538
  (void)taosThreadRwlockUnlock(&pData->lock);
278,673,902✔
539
  return updated;
278,675,646✔
540
}
541

542
static int32_t dmDecodeEpPairs(SJson *pJson, SDnodeData *pData) {
×
543
  int32_t code = 0;
×
544

545
  SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
×
546
  if (dnodes == NULL) return TSDB_CODE_INVALID_CFG_VALUE;
×
547
  int32_t numOfDnodes = tjsonGetArraySize(dnodes);
×
548

549
  for (int32_t i = 0; i < numOfDnodes; ++i) {
×
550
    SJson *dnode = tjsonGetArrayItem(dnodes, i);
×
551
    if (dnode == NULL) return TSDB_CODE_INVALID_CFG_VALUE;
×
552

553
    SDnodeEpPair pair = {0};
×
554
    tjsonGetInt32ValueFromDouble(dnode, "id", pair.id, code);
×
555
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
556
    code = tjsonGetStringValue(dnode, "fqdn", pair.oldFqdn);
×
557
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
558
    tjsonGetUInt16ValueFromDouble(dnode, "port", pair.oldPort, code);
×
559
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
560
    code = tjsonGetStringValue(dnode, "new_fqdn", pair.newFqdn);
×
561
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
562
    tjsonGetUInt16ValueFromDouble(dnode, "new_port", pair.newPort, code);
×
563
    if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
×
564

565
    if (taosArrayPush(pData->oldDnodeEps, &pair) == NULL) return terrno;
×
566
  }
567

568
  return code;
×
569
}
570

571
void dmRemoveDnodePairs(SDnodeData *pData) {
×
572
  char file[PATH_MAX] = {0};
×
573
  char bak[PATH_MAX] = {0};
×
574
  snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
×
575
  snprintf(bak, sizeof(bak), "%s%sdnode%sep.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
×
576
  dInfo("dnode file:%s is rename to bak file", file);
×
577
  if (taosRenameFile(file, bak) != 0) {
×
578
    dError("failed to rename dnode file:%s to bak file:%s since %s", file, bak, tstrerror(terrno));
×
579
  }
580
}
×
581

582
static int32_t dmReadDnodePairs(SDnodeData *pData) {
544,274✔
583
  int32_t   code = -1;
544,274✔
584
  char     *content = NULL;
544,274✔
585
  int32_t   contentLen = 0;
544,274✔
586
  SJson    *pJson = NULL;
544,274✔
587
  char      file[PATH_MAX] = {0};
544,274✔
588
  snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
544,274✔
589

590
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
544,274✔
591
    code = terrno;
544,274✔
592
    dDebug("dnode file:%s not exist, reason:%s", file, tstrerror(code));
544,274✔
593
    code = 0;
544,274✔
594
    goto _OVER;
544,274✔
595
  }
596

597
  // Use taosReadCfgFile for automatic decryption support (returns null-terminated string)
598
  code = taosReadCfgFile(file, &content, &contentLen);
×
599
  if (code != 0) {
×
600
    dError("failed to read dnode file:%s since %s", file, terrstr());
×
601
    goto _OVER;
×
602
  }
603

604
  pJson = tjsonParse(content);
×
605
  if (pJson == NULL) {
×
606
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
607
    goto _OVER;
×
608
  }
609

610
  pData->oldDnodeEps = taosArrayInit(1, sizeof(SDnodeEpPair));
×
611
  if (pData->oldDnodeEps == NULL) {
×
612
    code = terrno;
×
613
    dError("failed to calloc dnodeEp array since %s", strerror(ERRNO));
×
614
    goto _OVER;
×
615
  }
616

617
  if (dmDecodeEpPairs(pJson, pData) < 0) {
×
618
    taosArrayDestroy(pData->oldDnodeEps);
×
619
    pData->oldDnodeEps = NULL;
×
620

621
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
622
    goto _OVER;
×
623
  }
624

625
  code = 0;
×
626
  dInfo("succceed to read dnode file %s", file);
×
627

628
_OVER:
543,998✔
629
  if (content != NULL) taosMemoryFree(content);
544,274✔
630
  if (pJson != NULL) cJSON_Delete(pJson);
544,274✔
631

632
  if (code != 0) {
544,274✔
633
    dError("failed to read dnode file:%s since %s", file, tstrerror(code));
×
634
    return code;
×
635
  }
636

637
  // update old fqdn and port
638
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
544,274✔
639
    SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
640
    for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
×
641
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
×
642
      if (pDnodeEp->id == pair->id) {
×
643
        tstrncpy(pair->oldFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
×
644
        pair->oldPort = pDnodeEp->ep.port;
×
645
      }
646
    }
647
  }
648

649
  // check new fqdn and port
650
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
544,274✔
651
    SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
652
    for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
×
653
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
×
654
      if (pDnodeEp->id != pair->id &&
×
655
          (strcmp(pDnodeEp->ep.fqdn, pair->newFqdn) == 0 && pDnodeEp->ep.port == pair->newPort)) {
×
656
        dError("dnode:%d, can't update ep:%s:%u to %s:%u since already exists as dnode:%d", pair->id, pair->oldFqdn,
×
657
               pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id);
658
        taosArrayDestroy(pData->oldDnodeEps);
×
659
        pData->oldDnodeEps = NULL;
×
660
        code = TSDB_CODE_INVALID_CFG;
×
661
        return code;
×
662
      }
663
    }
664
  }
665

666
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
544,274✔
667
    SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
×
668
    for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
×
669
      SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
×
670
      if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) {
×
671
        dInfo("dnode:%d, will update ep:%s:%u to %s:%u", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port,
×
672
              pair->newFqdn, pair->newPort);
673
        tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN);
×
674
        pDnodeEp->ep.port = pair->newPort;
×
675
      }
676
    }
677
  }
678

679
  pData->dnodeVer = 0;
544,274✔
680
  return 0;
544,274✔
681
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc