• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4881

14 Dec 2025 03:48AM UTC coverage: 60.617% (+0.5%) from 60.092%
#4881

push

travis-ci

web-flow
test: update coverage workflow time (#33918)

156854 of 258761 relevant lines covered (60.62%)

75258957.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.18
/source/dnode/mgmt/mgmt_bnode/src/bmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "bmInt.h"
18
#include "tjson.h"
19

20
static int32_t bmRequire(const SMgmtInputOpt *pInput, bool *required) {
319,309✔
21
  return dmReadFile(pInput->path, pInput->name, required);
319,309✔
22
}
23

24
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
19,121✔
25
  pOption->msgCb = pMgmt->msgCb;
19,121✔
26
  pOption->dnodeId = pMgmt->pData->dnodeId;
19,121✔
27
}
19,121✔
28

29
static void bmClose(SBnodeMgmt *pMgmt) {
19,121✔
30
  if (pMgmt->pBnode != NULL) {
19,121✔
31
    // bmStopWorker(pMgmt);
32
    bndClose(pMgmt->pBnode);
19,121✔
33
    pMgmt->pBnode = NULL;
19,121✔
34
  }
35

36
  taosMemoryFree(pMgmt);
19,121✔
37
}
19,121✔
38

39
static int32_t bndOpenWrapper(SBnodeOpt *pOption, SBnode **pBnode) {
19,121✔
40
  int32_t code = bndOpen(pOption, pBnode);
19,121✔
41
  return code;
19,121✔
42
}
43

44
int32_t bmPutMsgToQueue(SBnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
×
45
  int32_t  code;
46
  SRpcMsg *pMsg;
×
47

48
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
×
49
  if (code) {
×
50
    rpcFreeCont(pRpc->pCont);
×
51
    pRpc->pCont = NULL;
×
52
    return code = terrno;
×
53
  }
54

55
  SBnode *pBnode = pMgmt->pBnode;
×
56
  if (pBnode == NULL) {
×
57
    code = terrno;
×
58
    dError("msg:%p failed to put into bnode queue since %s, type:%s qtype:%d len:%d", pMsg, tstrerror(code),
×
59
           TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
60
    taosFreeQitem(pMsg);
×
61
    rpcFreeCont(pRpc->pCont);
×
62
    pRpc->pCont = NULL;
×
63
    return code;
×
64
  }
65

66
  SMsgHead *pHead = pRpc->pCont;
×
67
  pHead->contLen = htonl(pHead->contLen);
×
68
  pHead->vgId = SNODE_HANDLE;
×
69
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
×
70
  pRpc->pCont = NULL;
×
71

72
  switch (qtype) {
73
    case WRITE_QUEUE:
74
      // code = bmPutNodeMsgToWriteQueue(pMgmt, pMsg);
75
      // break;
76
    default:
77
      code = TSDB_CODE_INVALID_PARA;
×
78
      rpcFreeCont(pMsg->pCont);
×
79
      taosFreeQitem(pMsg);
×
80
      return code;
×
81
  }
82
  return code;
83
}
84

85
static int32_t bmDecodeFile(SJson *pJson, int32_t *proto) {
19,121✔
86
  int32_t code = 0;
19,121✔
87

88
  code = tjsonGetIntValue(pJson, "proto", proto);
19,121✔
89
  return code;
19,121✔
90
}
91

92
static int32_t bmReadFile(const char *path, const char *name, int32_t *proto) {
19,121✔
93
  int32_t   code = -1;
19,121✔
94
  TdFilePtr pFile = NULL;
19,121✔
95
  char     *content = NULL;
19,121✔
96
  SJson    *pJson = NULL;
19,121✔
97
  char      file[PATH_MAX] = {0};
19,121✔
98
  int32_t   nBytes = snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
19,121✔
99
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
19,121✔
100
    code = TSDB_CODE_OUT_OF_BUFFER;
×
101
    goto _OVER;
×
102
  }
103

104
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
19,121✔
105
    dInfo("file:%s not exist", file);
×
106
    code = 0;
×
107
    goto _OVER;
×
108
  }
109

110
  pFile = taosOpenFile(file, TD_FILE_READ);
19,121✔
111
  if (pFile == NULL) {
19,121✔
112
    code = terrno;
×
113
    dError("failed to open file:%s since %s", file, tstrerror(code));
×
114
    goto _OVER;
×
115
  }
116

117
  int64_t size = 0;
19,121✔
118
  code = taosFStatFile(pFile, &size, NULL);
19,121✔
119
  if (code != 0) {
19,121✔
120
    dError("failed to fstat file:%s since %s", file, tstrerror(code));
×
121
    goto _OVER;
×
122
  }
123

124
  content = taosMemoryMalloc(size + 1);
19,121✔
125
  if (content == NULL) {
19,121✔
126
    code = terrno;
×
127
    goto _OVER;
×
128
  }
129

130
  if (taosReadFile(pFile, content, size) != size) {
19,121✔
131
    code = terrno;
×
132
    dError("failed to read file:%s since %s", file, tstrerror(code));
×
133
    goto _OVER;
×
134
  }
135

136
  content[size] = '\0';
19,121✔
137

138
  pJson = tjsonParse(content);
19,121✔
139
  if (pJson == NULL) {
19,121✔
140
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
141
    goto _OVER;
×
142
  }
143

144
  if (bmDecodeFile(pJson, proto) < 0) {
19,121✔
145
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
146
    goto _OVER;
×
147
  }
148

149
  code = 0;
19,121✔
150
  dInfo("succceed to read bnode file %s", file);
19,121✔
151

152
_OVER:
19,121✔
153
  if (content != NULL) taosMemoryFree(content);
19,121✔
154
  if (pJson != NULL) cJSON_Delete(pJson);
19,121✔
155
  if (pFile != NULL) taosCloseFile(&pFile);
19,121✔
156

157
  if (code != 0) {
19,121✔
158
    dError("failed to read bnode file:%s since %s", file, tstrerror(code));
×
159
  }
160
  return code;
19,121✔
161
}
162

163
static int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
19,121✔
164
  int32_t     code = 0;
19,121✔
165
  SBnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SBnodeMgmt));
19,121✔
166
  if (pMgmt == NULL) {
19,121✔
167
    return terrno;
×
168
  }
169

170
  pMgmt->pData = pInput->pData;
19,121✔
171
  pMgmt->path = pInput->path;
19,121✔
172
  pMgmt->name = pInput->name;
19,121✔
173
  pMgmt->msgCb = pInput->msgCb;
19,121✔
174
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)bmPutMsgToQueue;
19,121✔
175
  pMgmt->msgCb.mgmt = pMgmt;
19,121✔
176

177
  SBnodeOpt option = {0};
19,121✔
178
  bmInitOption(pMgmt, &option);
19,121✔
179

180
  code = bmReadFile(pInput->path, pInput->name, &option.proto);
19,121✔
181
  if (code != 0) {
19,121✔
182
    dError("failed to read bnode since %s", tstrerror(code));
×
183
    bmClose(pMgmt);
×
184
    return code;
×
185
  }
186

187
  code = bndOpenWrapper(&option, &pMgmt->pBnode);
19,121✔
188
  if (code != 0) {
19,121✔
189
    dError("failed to open bnode since %s", tstrerror(code));
×
190
    bmClose(pMgmt);
×
191
    return code;
×
192
  }
193
  tmsgReportStartup("bnode-impl", "initialized");
19,121✔
194

195
  /*
196
  if ((code = bmStartWorker(pMgmt)) != 0) {
197
    dError("failed to start bnode worker since %s", tstrerror(code));
198
    bmClose(pMgmt);
199
    return code;
200
  }
201
  tmsgReportStartup("bnode-worker", "initialized");
202
  */
203
  pOutput->pMgmt = pMgmt;
19,121✔
204
  return code;
19,121✔
205
}
206

207
static int32_t bmEncodeFile(SJson *pJson, bool deployed, int32_t proto) {
17,820✔
208
  if (tjsonAddDoubleToObject(pJson, "deployed", deployed) < 0) {
17,820✔
209
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
210
  }
211

212
  if (tjsonAddIntegerToObject(pJson, "proto", proto) < 0) {
17,820✔
213
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
214
  }
215

216
  return 0;
17,820✔
217
}
218

219
static int32_t bmWriteFile(const char *path, const char *name, bool deployed, int32_t proto) {
17,820✔
220
  int32_t   code = -1;
17,820✔
221
  char     *buffer = NULL;
17,820✔
222
  SJson    *pJson = NULL;
17,820✔
223
  TdFilePtr pFile = NULL;
17,820✔
224
  char      file[PATH_MAX] = {0};
17,820✔
225
  char      realfile[PATH_MAX] = {0};
17,820✔
226

227
  int32_t nBytes = snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
17,820✔
228
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
17,820✔
229
    code = TSDB_CODE_OUT_OF_BUFFER;
×
230
    goto _OVER;
×
231
  }
232

233
  nBytes = snprintf(realfile, sizeof(realfile), "%s%s%s.json", path, TD_DIRSEP, name);
17,820✔
234
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
17,820✔
235
    code = TSDB_CODE_OUT_OF_BUFFER;
×
236
    goto _OVER;
×
237
  }
238

239
  pJson = tjsonCreateObject();
17,820✔
240
  if (pJson == NULL) {
17,820✔
241
    code = terrno;
×
242
    goto _OVER;
×
243
  }
244

245
  if ((code = bmEncodeFile(pJson, deployed, proto)) != 0) goto _OVER;
17,820✔
246

247
  buffer = tjsonToString(pJson);
17,820✔
248
  if (buffer == NULL) {
17,820✔
249
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
250
    goto _OVER;
×
251
  }
252

253
  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
17,820✔
254
  if (pFile == NULL) {
17,820✔
255
    code = terrno;
×
256
    goto _OVER;
×
257
  }
258

259
  int32_t len = strlen(buffer);
17,820✔
260
  if (taosWriteFile(pFile, buffer, len) <= 0) {
17,820✔
261
    code = terrno;
×
262
    goto _OVER;
×
263
  }
264
  if (taosFsyncFile(pFile) < 0) {
17,820✔
265
    code = terrno;
×
266
    goto _OVER;
×
267
  }
268

269
  if (taosCloseFile(&pFile) != 0) {
17,820✔
270
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
271
    goto _OVER;
×
272
  }
273
  TAOS_CHECK_GOTO(taosRenameFile(file, realfile), NULL, _OVER);
17,820✔
274

275
  dInfo("succeed to write file:%s, deloyed:%d", realfile, deployed);
17,820✔
276

277
_OVER:
17,820✔
278
  if (pJson != NULL) tjsonDelete(pJson);
17,820✔
279
  if (buffer != NULL) taosMemoryFree(buffer);
17,820✔
280
  if (pFile != NULL) taosCloseFile(&pFile);
17,820✔
281

282
  if (code != 0) {
17,820✔
283
    dError("failed to write file:%s since %s, deloyed:%d", realfile, tstrerror(code), deployed);
×
284
  }
285
  return code;
17,820✔
286
}
287

288
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
17,820✔
289
  int32_t          code = 0;
17,820✔
290
  SDCreateBnodeReq createReq = {0};
17,820✔
291
  if (tDeserializeSMCreateBnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
17,820✔
292
    code = TSDB_CODE_INVALID_MSG;
×
293
    return code;
×
294
  }
295

296
  if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
17,820✔
297
    code = TSDB_CODE_INVALID_OPTION;
×
298
    dError("failed to create bnode since %s", tstrerror(code));
×
299

300
    tFreeSMCreateBnodeReq(&createReq);
×
301
    return code;
×
302
  }
303

304
  bool deployed = true;
17,820✔
305
  if ((code = bmWriteFile(pInput->path, pInput->name, deployed, createReq.bnodeProto)) != 0) {
17,820✔
306
    dError("failed to write bnode file since %s", tstrerror(code));
×
307

308
    tFreeSMCreateBnodeReq(&createReq);
×
309
    return code;
×
310
  }
311

312
  tFreeSMCreateBnodeReq(&createReq);
17,820✔
313

314
  return 0;
17,820✔
315
}
316

317
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
17,812✔
318
  int32_t        code = 0;
17,812✔
319
  SDDropBnodeReq dropReq = {0};
17,812✔
320
  if (tDeserializeSMDropBnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
17,812✔
321
    code = TSDB_CODE_INVALID_MSG;
×
322

323
    return code;
×
324
  }
325

326
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
17,812✔
327
    code = TSDB_CODE_INVALID_OPTION;
×
328
    dError("failed to drop bnode since %s", tstrerror(code));
×
329

330
    tFreeSMDropBnodeReq(&dropReq);
×
331
    return code;
×
332
  }
333

334
  bool deployed = false;
17,812✔
335
  if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) {
17,812✔
336
    dError("failed to write bnode file since %s", tstrerror(code));
×
337

338
    tFreeSMDropBnodeReq(&dropReq);
×
339
    return code;
×
340
  }
341

342
  tFreeSMDropBnodeReq(&dropReq);
17,812✔
343

344
  return 0;
17,812✔
345
}
346

347
SArray *bmGetMsgHandles() {
315,745✔
348
  int32_t code = -1;
315,745✔
349
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
315,745✔
350
  if (pArray == NULL) goto _OVER;
315,745✔
351

352
  code = 0;
315,745✔
353
_OVER:
315,745✔
354
  if (code != 0) {
315,745✔
355
    taosArrayDestroy(pArray);
×
356
    return NULL;
×
357
  } else {
358
    return pArray;
315,745✔
359
  }
360
}
361

362
SMgmtFunc bmGetMgmtFunc() {
319,309✔
363
  SMgmtFunc mgmtFunc = {0};
319,309✔
364
  mgmtFunc.openFp = bmOpen;
319,309✔
365
  mgmtFunc.closeFp = (NodeCloseFp)bmClose;
319,309✔
366
  mgmtFunc.createFp = (NodeCreateFp)bmProcessCreateReq;
319,309✔
367
  mgmtFunc.dropFp = (NodeDropFp)bmProcessDropReq;
319,309✔
368
  mgmtFunc.requiredFp = bmRequire;
319,309✔
369
  mgmtFunc.getHandlesFp = bmGetMsgHandles;
319,309✔
370

371
  return mgmtFunc;
319,309✔
372
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc