• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4066

12 May 2025 05:35AM UTC coverage: 62.547% (+0.04%) from 62.508%
#4066

push

travis-ci

web-flow
Merge pull request #31053 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

155777 of 317858 branches covered (49.01%)

Branch coverage included in aggregate %.

382 of 573 new or added lines in 31 files covered. (66.67%)

648 existing lines in 129 files now uncovered.

241270 of 316936 relevant lines covered (76.13%)

6462449.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.61
/source/dnode/mgmt/mgmt_vnode/src/vmFile.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tjson.h"
18
#include "vmInt.h"
19

20
#define MAX_CONTENT_LEN 2 * 1024 * 1024
21

22
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
18,601✔
23
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
18,601✔
24

25
  int32_t num = 0;
18,601✔
26
  int32_t size = taosHashGetSize(pMgmt->runngingHash);
18,601✔
27
  int32_t closedSize = taosHashGetSize(pMgmt->closedHash);
18,601✔
28
  size += closedSize;
18,601✔
29
  SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
18,601!
30
  if (pVnodes == NULL) {
18,601!
31
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
32
    return terrno;
×
33
  }
34

35
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
18,601✔
36
  while (pIter) {
85,828✔
37
    SVnodeObj **ppVnode = pIter;
67,227✔
38
    SVnodeObj  *pVnode = *ppVnode;
67,227✔
39
    if (pVnode && num < size) {
134,454!
40
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
67,227✔
41
      dTrace("vgId:%d,acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
67,227✔
42
      pVnodes[num++] = (*ppVnode);
67,227✔
43
      pIter = taosHashIterate(pMgmt->runngingHash, pIter);
67,227✔
44
    } else {
45
      taosHashCancelIterate(pMgmt->runngingHash, pIter);
×
46
    }
47
  }
48

49
  pIter = taosHashIterate(pMgmt->closedHash, NULL);
18,601✔
50
  while (pIter) {
18,601!
51
    SVnodeObj **ppVnode = pIter;
×
52
    SVnodeObj  *pVnode = *ppVnode;
×
53
    if (pVnode && num < size) {
×
54
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
×
55
      dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
56
      pVnodes[num++] = (*ppVnode);
×
57
      pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
58
    } else {
59
      taosHashCancelIterate(pMgmt->closedHash, pIter);
×
60
    }
61
  }
62

63
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
18,601✔
64
  *numOfVnodes = num;
18,601✔
65
  *ppVnodes = pVnodes;
18,601✔
66

67
  return 0;
18,601✔
68
}
69

70
int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
10,391✔
71
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
10,391✔
72

73
  int32_t num = 0;
10,391✔
74
  int32_t size = taosHashGetSize(pMgmt->runngingHash);
10,391✔
75
  int32_t creatingSize = taosHashGetSize(pMgmt->creatingHash);
10,391✔
76
  size += creatingSize;
10,391✔
77
  SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
10,391!
78
  if (pVnodes == NULL) {
10,391!
79
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
80
    return terrno;
×
81
  }
82

83
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
10,391✔
84
  while (pIter) {
29,856✔
85
    SVnodeObj **ppVnode = pIter;
19,465✔
86
    SVnodeObj  *pVnode = *ppVnode;
19,465✔
87
    if (pVnode && num < size) {
38,930!
88
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
19,465✔
89
      dTrace("vgId:%d,acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
19,465✔
90
      pVnodes[num++] = (*ppVnode);
19,465✔
91
      pIter = taosHashIterate(pMgmt->runngingHash, pIter);
19,465✔
92
    } else {
93
      taosHashCancelIterate(pMgmt->runngingHash, pIter);
×
94
    }
95
  }
96

97
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
10,391✔
98
  while (pIter) {
19,739✔
99
    SVnodeObj **ppVnode = pIter;
9,348✔
100
    SVnodeObj  *pVnode = *ppVnode;
9,348✔
101
    if (pVnode && num < size) {
18,696!
102
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
9,348✔
103
      dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
9,348✔
104
      pVnodes[num++] = (*ppVnode);
9,348✔
105
      pIter = taosHashIterate(pMgmt->creatingHash, pIter);
9,348✔
106
    } else {
107
      taosHashCancelIterate(pMgmt->creatingHash, pIter);
×
108
    }
109
  }
110
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
10,391✔
111

112
  *numOfVnodes = num;
10,391✔
113
  *ppVnodes = pVnodes;
10,391✔
114

115
  return 0;
10,391✔
116
}
117

118
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
7,502✔
119
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
7,502✔
120

121
  int32_t     num = 0;
7,502✔
122
  int32_t     size = taosHashGetSize(pMgmt->runngingHash);
7,502✔
123
  SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
7,502!
124
  if (pVnodes == NULL) {
7,502!
125
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
126
    return terrno;
×
127
  }
128

129
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
7,502✔
130
  while (pIter) {
21,853✔
131
    SVnodeObj **ppVnode = pIter;
14,351✔
132
    SVnodeObj  *pVnode = *ppVnode;
14,351✔
133
    if (pVnode && num < size) {
28,702!
134
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
14,351✔
135
      dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
14,351✔
136
      pVnodes[num++] = (*ppVnode);
14,351✔
137
      pIter = taosHashIterate(pMgmt->runngingHash, pIter);
14,351✔
138
    } else {
139
      taosHashCancelIterate(pMgmt->runngingHash, pIter);
×
140
    }
141
  }
142

143
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
7,502✔
144
  *numOfVnodes = num;
7,502✔
145
  *ppVnodes = pVnodes;
7,502✔
146

147
  return 0;
7,502✔
148
}
149

150
static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
430✔
151
  int32_t      code = -1;
430✔
152
  SWrapperCfg *pCfgs = NULL;
430✔
153
  *ppCfgs = NULL;
430✔
154

155
  SJson *vnodes = tjsonGetObjectItem(pJson, "vnodes");
430✔
156
  if (vnodes == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
430!
157

158
  int32_t vnodesNum = cJSON_GetArraySize(vnodes);
430✔
159
  if (vnodesNum > 0) {
430✔
160
    pCfgs = taosMemoryCalloc(vnodesNum, sizeof(SWrapperCfg));
427!
161
    if (pCfgs == NULL) return terrno;
427!
162
  }
163

164
  for (int32_t i = 0; i < vnodesNum; ++i) {
1,558✔
165
    SJson *vnode = tjsonGetArrayItem(vnodes, i);
1,128✔
166
    if (vnode == NULL) {
1,128!
167
      code = TSDB_CODE_INVALID_JSON_FORMAT;
×
168
      goto _OVER;
×
169
    }
170

171
    SWrapperCfg *pCfg = &pCfgs[i];
1,128✔
172
    tjsonGetInt32ValueFromDouble(vnode, "vgId", pCfg->vgId, code);
1,128✔
173
    if (code != 0) goto _OVER;
1,128!
174
    tjsonGetInt32ValueFromDouble(vnode, "dropped", pCfg->dropped, code);
1,128✔
175
    if (code != 0) goto _OVER;
1,128!
176
    tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code);
1,128✔
177
    if (code != 0) goto _OVER;
1,128!
178
    tjsonGetInt32ValueFromDouble(vnode, "diskPrimary", pCfg->diskPrimary, code);
1,128✔
179
    if (code != 0) goto _OVER;
1,128!
180
    tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code);
1,128✔
181
    if (code != 0) goto _OVER;
1,128!
182

183
    snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId);
1,128✔
184
  }
185

186
  code = 0;
430✔
187
  *ppCfgs = pCfgs;
430✔
188
  *numOfVnodes = vnodesNum;
430✔
189

190
_OVER:
430✔
191
  if (*ppCfgs == NULL) taosMemoryFree(pCfgs);
430!
192
  return code;
430✔
193
}
194

195
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
2,664✔
196
  int32_t      code = -1;
2,664✔
197
  TdFilePtr    pFile = NULL;
2,664✔
198
  char        *pData = NULL;
2,664✔
199
  SJson       *pJson = NULL;
2,664✔
200
  char         file[PATH_MAX] = {0};
2,664✔
201
  SWrapperCfg *pCfgs = NULL;
2,664✔
202
  snprintf(file, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
2,664✔
203

204
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
2,664✔
205
    code = terrno;
2,234✔
206
    dInfo("vnode file:%s not exist, reason:%s", file, tstrerror(code));
2,234!
207
    code = 0;
2,234✔
208
    return code;
2,234✔
209
  }
210

211
  pFile = taosOpenFile(file, TD_FILE_READ);
430✔
212
  if (pFile == NULL) {
430!
213
    code = terrno;
×
214
    dError("failed to open vnode file:%s since %s", file, tstrerror(code));
×
215
    goto _OVER;
×
216
  }
217

218
  int64_t size = 0;
430✔
219
  code = taosFStatFile(pFile, &size, NULL);
430✔
220
  if (code != 0) {
430!
221
    dError("failed to fstat mnode file:%s since %s", file, tstrerror(code));
×
222
    goto _OVER;
×
223
  }
224

225
  pData = taosMemoryMalloc(size + 1);
430!
226
  if (pData == NULL) {
430!
227
    code = terrno;
×
228
    goto _OVER;
×
229
  }
230

231
  if (taosReadFile(pFile, pData, size) != size) {
430!
232
    code = terrno;
×
233
    dError("failed to read vnode file:%s since %s", file, tstrerror(code));
×
234
    goto _OVER;
×
235
  }
236

237
  pData[size] = '\0';
430✔
238

239
  pJson = tjsonParse(pData);
430✔
240
  if (pJson == NULL) {
430!
241
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
242
    goto _OVER;
×
243
  }
244

245
  if (vmDecodeVnodeList(pJson, pMgmt, ppCfgs, numOfVnodes) < 0) {
430!
246
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
247
    goto _OVER;
×
248
  }
249

250
  code = 0;
430✔
251
  dInfo("succceed to read vnode file %s", file);
430!
252

253
_OVER:
×
254
  if (pData != NULL) taosMemoryFree(pData);
430!
255
  if (pJson != NULL) cJSON_Delete(pJson);
430!
256
  if (pFile != NULL) taosCloseFile(&pFile);
430!
257

258
  if (code != 0) {
430!
259
    dError("failed to read vnode file:%s since %s", file, tstrerror(code));
×
260
  }
261
  return code;
430✔
262
}
263

264
static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t numOfVnodes) {
18,601✔
265
  int32_t code = 0;
18,601✔
266
  SJson  *vnodes = tjsonCreateArray();
18,601✔
267
  if (vnodes == NULL) {
18,601!
268
    return terrno;
×
269
  }
270
  if ((code = tjsonAddItemToObject(pJson, "vnodes", vnodes)) < 0) {
18,601!
UNCOV
271
    tjsonDelete(vnodes);
×
272
    return code;
×
273
  };
274

275
  for (int32_t i = 0; i < numOfVnodes; ++i) {
85,824✔
276
    SVnodeObj *pVnode = ppVnodes[i];
67,224✔
277
    if (pVnode == NULL) continue;
67,224!
278

279
    SJson *vnode = tjsonCreateObject();
67,224✔
280
    if (vnode == NULL) return terrno;
67,225!
281
    if ((code = tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId)) < 0) return code;
67,225!
282
    if ((code = tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped)) < 0) return code;
67,226!
283
    if ((code = tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion)) < 0) return code;
67,225!
284
    if ((code = tjsonAddDoubleToObject(vnode, "diskPrimary", pVnode->diskPrimary)) < 0) return code;
67,224!
285
    if (pVnode->toVgId) {
67,225✔
286
      if ((code = tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId)) < 0) return code;
30!
287
    }
288
    if ((code = tjsonAddItemToArray(vnodes, vnode)) < 0) return code;
67,225!
289
  }
290

291
  return 0;
18,600✔
292
}
293

294
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
18,601✔
295
  int32_t     code = -1;
18,601✔
296
  char       *buffer = NULL;
18,601✔
297
  SJson      *pJson = NULL;
18,601✔
298
  TdFilePtr   pFile = NULL;
18,601✔
299
  SVnodeObj **ppVnodes = NULL;
18,601✔
300
  char        file[PATH_MAX] = {0};
18,601✔
301
  char        realfile[PATH_MAX] = {0};
18,601✔
302
  int32_t     lino = 0;
18,601✔
303
  int32_t     ret = -1;
18,601✔
304

305
  int32_t nBytes = snprintf(file, sizeof(file), "%s%svnodes_tmp.json", pMgmt->path, TD_DIRSEP);
18,601✔
306
  if (nBytes <= 0 || nBytes >= sizeof(file)) {
18,601!
307
    return TSDB_CODE_OUT_OF_RANGE;
×
308
  }
309

310
  nBytes = snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
18,601✔
311
  if (nBytes <= 0 || nBytes >= sizeof(realfile)) {
18,601!
312
    return TSDB_CODE_OUT_OF_RANGE;
×
313
  }
314

315
  int32_t numOfVnodes = 0;
18,601✔
316
  TAOS_CHECK_GOTO(vmGetAllVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes), &lino, _OVER);
18,601!
317

318
  // terrno = TSDB_CODE_OUT_OF_MEMORY;
319
  pJson = tjsonCreateObject();
18,601✔
320
  if (pJson == NULL) {
18,601!
321
    code = terrno;
×
322
    goto _OVER;
×
323
  }
324
  TAOS_CHECK_GOTO(vmEncodeVnodeList(pJson, ppVnodes, numOfVnodes), &lino, _OVER);
18,601!
325

326
  buffer = tjsonToString(pJson);
18,601✔
327
  if (buffer == NULL) {
18,601!
328
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
329
    lino = __LINE__;
×
330
    goto _OVER;
×
331
  }
332

333
  code = taosThreadMutexLock(&pMgmt->fileLock);
18,601✔
334
  if (code != 0) {
18,601!
335
    lino = __LINE__;
×
336
    goto _OVER;
×
337
  }
338

339
  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
18,601✔
340
  if (pFile == NULL) {
18,601!
341
    code = terrno;
×
342
    lino = __LINE__;
×
343
    goto _OVER1;
×
344
  }
345

346
  int32_t len = strlen(buffer);
18,601✔
347
  if (taosWriteFile(pFile, buffer, len) <= 0) {
18,601!
348
    code = terrno;
×
349
    lino = __LINE__;
×
350
    goto _OVER1;
×
351
  }
352
  if (taosFsyncFile(pFile) < 0) {
18,601!
353
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
354
    lino = __LINE__;
×
355
    goto _OVER1;
×
356
  }
357

358
  code = taosCloseFile(&pFile);
18,601✔
359
  if (code != 0) {
18,601!
360
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
361
    lino = __LINE__;
×
362
    goto _OVER1;
×
363
  }
364
  TAOS_CHECK_GOTO(taosRenameFile(file, realfile), &lino, _OVER1);
18,601!
365

366
  dInfo("succeed to write vnodes file:%s, vnodes:%d", realfile, numOfVnodes);
18,601!
367

368
_OVER1:
×
369
  ret = taosThreadMutexUnlock(&pMgmt->fileLock);
18,601✔
370
  if (ret != 0) {
18,601!
371
    dError("failed to unlock since %s", tstrerror(ret));
×
372
  }
373

374
_OVER:
18,601✔
375
  if (pJson != NULL) tjsonDelete(pJson);
18,601!
376
  if (buffer != NULL) taosMemoryFree(buffer);
18,601!
377
  if (pFile != NULL) taosCloseFile(&pFile);
18,601!
378
  if (ppVnodes != NULL) {
18,601!
379
    for (int32_t i = 0; i < numOfVnodes; ++i) {
85,828✔
380
      SVnodeObj *pVnode = ppVnodes[i];
67,227✔
381
      if (pVnode != NULL) {
67,227!
382
        vmReleaseVnode(pMgmt, pVnode);
67,227✔
383
      }
384
    }
385
    taosMemoryFree(ppVnodes);
18,601!
386
  }
387

388
  if (code != 0) {
18,601!
389
    dError("failed to write vnodes file:%s at line:%d since %s, vnodes:%d", realfile, lino, tstrerror(code),
×
390
           numOfVnodes);
391
  }
392
  return code;
18,601✔
393
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc