• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.38
/source/dnode/mgmt/mgmt_bnode/src/bmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "bmInt.h"
18
#include "tjson.h"
19

20
static int32_t bmRequire(const SMgmtInputOpt *pInput, bool *required) {
141,269✔
21
  return dmReadFile(pInput->path, pInput->name, required);
141,269✔
22
}
23

UNCOV
24
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
×
UNCOV
25
  pOption->msgCb = pMgmt->msgCb;
×
UNCOV
26
  pOption->dnodeId = pMgmt->pData->dnodeId;
×
UNCOV
27
}
×
28

UNCOV
29
static void bmClose(SBnodeMgmt *pMgmt) {
×
UNCOV
30
  if (pMgmt->pBnode != NULL) {
×
31
    // bmStopWorker(pMgmt);
UNCOV
32
    bndClose(pMgmt->pBnode);
×
UNCOV
33
    pMgmt->pBnode = NULL;
×
34
  }
35

UNCOV
36
  taosMemoryFree(pMgmt);
×
UNCOV
37
}
×
38

UNCOV
39
static int32_t bndOpenWrapper(SBnodeOpt *pOption, SBnode **pBnode) {
×
UNCOV
40
  int32_t code = bndOpen(pOption, pBnode);
×
UNCOV
41
  return code;
×
42
}
43

44
int32_t bmPutMsgToQueue(SBnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
×
45
  int32_t  code;
46
  SRpcMsg *pMsg;
×
47

48
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
×
49
  if (code) {
×
50
    rpcFreeCont(pRpc->pCont);
×
51
    pRpc->pCont = NULL;
×
52
    return code = terrno;
×
53
  }
54

55
  SBnode *pBnode = pMgmt->pBnode;
×
56
  if (pBnode == NULL) {
×
57
    code = terrno;
×
58
    dError("msg:%p failed to put into bnode queue since %s, type:%s qtype:%d len:%d", pMsg, tstrerror(code),
×
59
           TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
60
    taosFreeQitem(pMsg);
×
61
    rpcFreeCont(pRpc->pCont);
×
62
    pRpc->pCont = NULL;
×
63
    return code;
×
64
  }
65

66
  SMsgHead *pHead = pRpc->pCont;
×
67
  pHead->contLen = htonl(pHead->contLen);
×
68
  pHead->vgId = SNODE_HANDLE;
×
69
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
×
70
  pRpc->pCont = NULL;
×
71

72
  switch (qtype) {
73
    case WRITE_QUEUE:
74
      // code = bmPutNodeMsgToWriteQueue(pMgmt, pMsg);
75
      // break;
76
    default:
77
      code = TSDB_CODE_INVALID_PARA;
×
78
      rpcFreeCont(pMsg->pCont);
×
79
      taosFreeQitem(pMsg);
×
80
      return code;
×
81
  }
82
  return code;
83
}
84

UNCOV
85
static int32_t bmDecodeFile(SJson *pJson, int32_t *proto) {
×
UNCOV
86
  int32_t code = 0;
×
87

UNCOV
88
  code = tjsonGetIntValue(pJson, "proto", proto);
×
UNCOV
89
  return code;
×
90
}
91

UNCOV
92
static int32_t bmReadFile(const char *path, const char *name, int32_t *proto) {
×
UNCOV
93
  int32_t   code = -1;
×
UNCOV
94
  TdFilePtr pFile = NULL;
×
UNCOV
95
  char     *content = NULL;
×
UNCOV
96
  SJson    *pJson = NULL;
×
UNCOV
97
  char      file[PATH_MAX] = {0};
×
UNCOV
98
  int32_t   nBytes = snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
×
UNCOV
99
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
×
100
    code = TSDB_CODE_OUT_OF_BUFFER;
×
101
    goto _OVER;
×
102
  }
103

UNCOV
104
  if (taosStatFile(file, NULL, NULL, NULL) < 0) {
×
105
    dInfo("file:%s not exist", file);
×
106
    code = 0;
×
107
    goto _OVER;
×
108
  }
109

UNCOV
110
  pFile = taosOpenFile(file, TD_FILE_READ);
×
UNCOV
111
  if (pFile == NULL) {
×
112
    code = terrno;
×
113
    dError("failed to open file:%s since %s", file, tstrerror(code));
×
114
    goto _OVER;
×
115
  }
116

UNCOV
117
  int64_t size = 0;
×
UNCOV
118
  code = taosFStatFile(pFile, &size, NULL);
×
UNCOV
119
  if (code != 0) {
×
120
    dError("failed to fstat file:%s since %s", file, tstrerror(code));
×
121
    goto _OVER;
×
122
  }
123

UNCOV
124
  content = taosMemoryMalloc(size + 1);
×
UNCOV
125
  if (content == NULL) {
×
126
    code = terrno;
×
127
    goto _OVER;
×
128
  }
129

UNCOV
130
  if (taosReadFile(pFile, content, size) != size) {
×
131
    code = terrno;
×
132
    dError("failed to read file:%s since %s", file, tstrerror(code));
×
133
    goto _OVER;
×
134
  }
135

UNCOV
136
  content[size] = '\0';
×
137

UNCOV
138
  pJson = tjsonParse(content);
×
UNCOV
139
  if (pJson == NULL) {
×
140
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
141
    goto _OVER;
×
142
  }
143

UNCOV
144
  if (bmDecodeFile(pJson, proto) < 0) {
×
145
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
146
    goto _OVER;
×
147
  }
148

UNCOV
149
  code = 0;
×
UNCOV
150
  dInfo("succceed to read bnode file %s", file);
×
151

UNCOV
152
_OVER:
×
UNCOV
153
  if (content != NULL) taosMemoryFree(content);
×
UNCOV
154
  if (pJson != NULL) cJSON_Delete(pJson);
×
UNCOV
155
  if (pFile != NULL) taosCloseFile(&pFile);
×
156

UNCOV
157
  if (code != 0) {
×
158
    dError("failed to read bnode file:%s since %s", file, tstrerror(code));
×
159
  }
UNCOV
160
  return code;
×
161
}
162

UNCOV
163
static int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
×
UNCOV
164
  int32_t     code = 0;
×
UNCOV
165
  SBnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SBnodeMgmt));
×
UNCOV
166
  if (pMgmt == NULL) {
×
167
    return terrno;
×
168
  }
169

UNCOV
170
  pMgmt->pData = pInput->pData;
×
UNCOV
171
  pMgmt->path = pInput->path;
×
UNCOV
172
  pMgmt->name = pInput->name;
×
UNCOV
173
  pMgmt->msgCb = pInput->msgCb;
×
UNCOV
174
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)bmPutMsgToQueue;
×
UNCOV
175
  pMgmt->msgCb.mgmt = pMgmt;
×
176

UNCOV
177
  SBnodeOpt option = {0};
×
UNCOV
178
  bmInitOption(pMgmt, &option);
×
179

UNCOV
180
  code = bmReadFile(pInput->path, pInput->name, &option.proto);
×
UNCOV
181
  if (code != 0) {
×
182
    dError("failed to read bnode since %s", tstrerror(code));
×
183
    bmClose(pMgmt);
×
184
    return code;
×
185
  }
186

UNCOV
187
  code = bndOpenWrapper(&option, &pMgmt->pBnode);
×
UNCOV
188
  if (code != 0) {
×
189
    dError("failed to open bnode since %s", tstrerror(code));
×
190
    bmClose(pMgmt);
×
191
    return code;
×
192
  }
UNCOV
193
  tmsgReportStartup("bnode-impl", "initialized");
×
194

195
  /*
196
  if ((code = bmStartWorker(pMgmt)) != 0) {
197
    dError("failed to start bnode worker since %s", tstrerror(code));
198
    bmClose(pMgmt);
199
    return code;
200
  }
201
  tmsgReportStartup("bnode-worker", "initialized");
202
  */
UNCOV
203
  pOutput->pMgmt = pMgmt;
×
UNCOV
204
  return code;
×
205
}
206

UNCOV
207
static int32_t bmEncodeFile(SJson *pJson, bool deployed, int32_t proto) {
×
UNCOV
208
  if (tjsonAddDoubleToObject(pJson, "deployed", deployed) < 0) {
×
209
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
210
  }
211

UNCOV
212
  if (tjsonAddIntegerToObject(pJson, "proto", proto) < 0) {
×
213
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
214
  }
215

UNCOV
216
  return 0;
×
217
}
218

UNCOV
219
static int32_t bmWriteFile(const char *path, const char *name, bool deployed, int32_t proto) {
×
UNCOV
220
  int32_t   code = -1;
×
UNCOV
221
  char     *buffer = NULL;
×
UNCOV
222
  SJson    *pJson = NULL;
×
UNCOV
223
  TdFilePtr pFile = NULL;
×
UNCOV
224
  char      file[PATH_MAX] = {0};
×
UNCOV
225
  char      realfile[PATH_MAX] = {0};
×
226

UNCOV
227
  int32_t nBytes = snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
×
UNCOV
228
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
×
229
    code = TSDB_CODE_OUT_OF_BUFFER;
×
230
    goto _OVER;
×
231
  }
232

UNCOV
233
  nBytes = snprintf(realfile, sizeof(realfile), "%s%s%s.json", path, TD_DIRSEP, name);
×
UNCOV
234
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
×
235
    code = TSDB_CODE_OUT_OF_BUFFER;
×
236
    goto _OVER;
×
237
  }
238

UNCOV
239
  pJson = tjsonCreateObject();
×
UNCOV
240
  if (pJson == NULL) {
×
241
    code = terrno;
×
242
    goto _OVER;
×
243
  }
244

UNCOV
245
  if ((code = bmEncodeFile(pJson, deployed, proto)) != 0) goto _OVER;
×
246

UNCOV
247
  buffer = tjsonToString(pJson);
×
UNCOV
248
  if (buffer == NULL) {
×
249
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
250
    goto _OVER;
×
251
  }
252

UNCOV
253
  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
UNCOV
254
  if (pFile == NULL) {
×
255
    code = terrno;
×
256
    goto _OVER;
×
257
  }
258

UNCOV
259
  int32_t len = strlen(buffer);
×
UNCOV
260
  if (taosWriteFile(pFile, buffer, len) <= 0) {
×
261
    code = terrno;
×
262
    goto _OVER;
×
263
  }
UNCOV
264
  if (taosFsyncFile(pFile) < 0) {
×
265
    code = terrno;
×
266
    goto _OVER;
×
267
  }
268

UNCOV
269
  if (taosCloseFile(&pFile) != 0) {
×
270
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
271
    goto _OVER;
×
272
  }
UNCOV
273
  TAOS_CHECK_GOTO(taosRenameFile(file, realfile), NULL, _OVER);
×
274

UNCOV
275
  dInfo("succeed to write file:%s, deloyed:%d", realfile, deployed);
×
276

UNCOV
277
_OVER:
×
UNCOV
278
  if (pJson != NULL) tjsonDelete(pJson);
×
UNCOV
279
  if (buffer != NULL) taosMemoryFree(buffer);
×
UNCOV
280
  if (pFile != NULL) taosCloseFile(&pFile);
×
281

UNCOV
282
  if (code != 0) {
×
283
    dError("failed to write file:%s since %s, deloyed:%d", realfile, tstrerror(code), deployed);
×
284
  }
UNCOV
285
  return code;
×
286
}
287

UNCOV
288
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
×
UNCOV
289
  int32_t          code = 0;
×
UNCOV
290
  SDCreateBnodeReq createReq = {0};
×
UNCOV
291
  if (tDeserializeSMCreateBnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
×
292
    code = TSDB_CODE_INVALID_MSG;
×
293
    return code;
×
294
  }
295

UNCOV
296
  if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
×
297
    code = TSDB_CODE_INVALID_OPTION;
×
298
    dError("failed to create bnode since %s", tstrerror(code));
×
299

300
    tFreeSMCreateBnodeReq(&createReq);
×
301
    return code;
×
302
  }
303

UNCOV
304
  bool deployed = true;
×
UNCOV
305
  if ((code = bmWriteFile(pInput->path, pInput->name, deployed, createReq.bnodeProto)) != 0) {
×
306
    dError("failed to write bnode file since %s", tstrerror(code));
×
307

308
    tFreeSMCreateBnodeReq(&createReq);
×
309
    return code;
×
310
  }
311

UNCOV
312
  tFreeSMCreateBnodeReq(&createReq);
×
313

UNCOV
314
  return 0;
×
315
}
316

UNCOV
317
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
×
UNCOV
318
  int32_t        code = 0;
×
UNCOV
319
  SDDropBnodeReq dropReq = {0};
×
UNCOV
320
  if (tDeserializeSMDropBnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
×
321
    code = TSDB_CODE_INVALID_MSG;
×
322

323
    return code;
×
324
  }
325

UNCOV
326
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
×
327
    code = TSDB_CODE_INVALID_OPTION;
×
328
    dError("failed to drop bnode since %s", tstrerror(code));
×
329

330
    tFreeSMDropBnodeReq(&dropReq);
×
331
    return code;
×
332
  }
333

UNCOV
334
  bool deployed = false;
×
UNCOV
335
  if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) {
×
336
    dError("failed to write bnode file since %s", tstrerror(code));
×
337

338
    tFreeSMDropBnodeReq(&dropReq);
×
339
    return code;
×
340
  }
341

UNCOV
342
  tFreeSMDropBnodeReq(&dropReq);
×
343

UNCOV
344
  return 0;
×
345
}
346

347
SArray *bmGetMsgHandles() {
141,242✔
348
  int32_t code = -1;
141,242✔
349
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
141,242✔
350
  if (pArray == NULL) goto _OVER;
141,242!
351

352
  code = 0;
141,242✔
353
_OVER:
141,242✔
354
  if (code != 0) {
141,242!
355
    taosArrayDestroy(pArray);
×
356
    return NULL;
×
357
  } else {
358
    return pArray;
141,242✔
359
  }
360
}
361

362
SMgmtFunc bmGetMgmtFunc() {
141,269✔
363
  SMgmtFunc mgmtFunc = {0};
141,269✔
364
  mgmtFunc.openFp = bmOpen;
141,269✔
365
  mgmtFunc.closeFp = (NodeCloseFp)bmClose;
141,269✔
366
  mgmtFunc.createFp = (NodeCreateFp)bmProcessCreateReq;
141,269✔
367
  mgmtFunc.dropFp = (NodeDropFp)bmProcessDropReq;
141,269✔
368
  mgmtFunc.requiredFp = bmRequire;
141,269✔
369
  mgmtFunc.getHandlesFp = bmGetMsgHandles;
141,269✔
370

371
  return mgmtFunc;
141,269✔
372
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc