• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4911

04 Jan 2026 09:05AM UTC coverage: 65.028% (-0.8%) from 65.864%
#4911

push

travis-ci

web-flow
merge: from main to 3.0 branch #34156

1206 of 4524 new or added lines in 22 files covered. (26.66%)

1517 existing lines in 134 files now uncovered.

195276 of 300296 relevant lines covered (65.03%)

116931714.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.5
/source/dnode/mgmt/mgmt_xnode/src/xmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tjson.h"
18
#include "xmInt.h"
19

20
static int32_t xmRequire(const SMgmtInputOpt *pInput, bool *required) {
539,436✔
21
  xndInfo("xnode require call path:%s, name:%s", pInput->path, pInput->name);
539,436✔
22
  *required = true;
539,436✔
23
  return TSDB_CODE_SUCCESS;
539,436✔
24
}
25

26
static void xmInitOption(SXnodeMgmt *pMgmt, SXnodeOpt *pOption) {
534,124✔
27
  pOption->msgCb = pMgmt->msgCb;
534,124✔
28
  pOption->dnodeId = pMgmt->pData->dnodeId;
534,124✔
29
  pOption->clusterId = pMgmt->pData->clusterId;
534,124✔
30
  (void)memmove(pOption->machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
534,124✔
31
}
534,124✔
32

33
static void xmClose(SXnodeMgmt *pMgmt) {
534,124✔
34
  if (pMgmt->pXnode != NULL) {
534,124✔
35
    xndClose(pMgmt->pXnode);
534,124✔
36
    pMgmt->pXnode = NULL;
534,124✔
37
  }
38
  taosMemoryFree(pMgmt);
534,124✔
39
}
534,124✔
40

41
static int32_t xndOpenWrapper(SXnodeOpt *pOption, SXnode **pXnode) {
534,124✔
42
  int32_t code = xndOpen(pOption, pXnode);
534,124✔
43
  return code;
534,124✔
44
}
45

46
// int32_t xmPutMsgToQueue(SXnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
47
//   int32_t  code;
48
//   SRpcMsg *pMsg;
49

50
//   code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
51
//   if (code) {
52
//     rpcFreeCont(pRpc->pCont);
53
//     pRpc->pCont = NULL;
54
//     return code = terrno;
55
//   }
56

57
//   SBnode *pBnode = pMgmt->pBnode;
58
//   if (pBnode == NULL) {
59
//     code = terrno;
60
//     dError("msg:%p failed to put into bnode queue since %s, type:%s qtype:%d len:%d", pMsg, tstrerror(code),
61
//            TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
62
//     taosFreeQitem(pMsg);
63
//     rpcFreeCont(pRpc->pCont);
64
//     pRpc->pCont = NULL;
65
//     return code;
66
//   }
67

68
//   SMsgHead *pHead = pRpc->pCont;
69
//   pHead->contLen = htonl(pHead->contLen);
70
//   pHead->vgId = SNODE_HANDLE;
71
//   memcpy(pMsg, pRpc, sizeof(SRpcMsg));
72
//   pRpc->pCont = NULL;
73

74
//   switch (qtype) {
75
//     case WRITE_QUEUE:
76
//       // code = bmPutNodeMsgToWriteQueue(pMgmt, pMsg);
77
//       // break;
78
//     default:
79
//       code = TSDB_CODE_INVALID_PARA;
80
//       rpcFreeCont(pMsg->pCont);
81
//       taosFreeQitem(pMsg);
82
//       return code;
83
//   }
84
//   return code;
85
// }
86

87
static int32_t xmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
534,124✔
88
  int32_t     code = 0;
534,124✔
89
  SXnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SXnodeMgmt));
534,124✔
90
  if (pMgmt == NULL) {
534,124✔
NEW
91
    return terrno;
×
92
  }
93
  pMgmt->pData = pInput->pData;
534,124✔
94
  pMgmt->path = pInput->path;
534,124✔
95
  pMgmt->name = pInput->name;
534,124✔
96
  pMgmt->msgCb = pInput->msgCb;
534,124✔
97
  // pMgmt->msgCb.putToQueueFp = (PutToQueueFp)xmPutMsgToQueue;
98
  pMgmt->msgCb.mgmt = pMgmt;
534,124✔
99

100
  SXnodeOpt option = {0};
534,124✔
101
  xmInitOption(pMgmt, &option);
534,124✔
102

103
  code = xndOpenWrapper(&option, &pMgmt->pXnode);
534,124✔
104
  if (code != 0) {
534,124✔
NEW
105
    dError("failed to open xnode since %s", tstrerror(code));
×
NEW
106
    xmClose(pMgmt);
×
NEW
107
    return code;
×
108
  }
109

110
  pOutput->pMgmt = pMgmt;
534,124✔
111
  return code;
534,124✔
112
}
113

NEW
114
static int32_t xmEncodeFile(SJson *pJson, bool deployed, int32_t proto) {
×
NEW
115
  if (tjsonAddDoubleToObject(pJson, "deployed", deployed) < 0) {
×
NEW
116
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
117
  }
118

NEW
119
  if (tjsonAddIntegerToObject(pJson, "proto", proto) < 0) {
×
NEW
120
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
121
  }
122

NEW
123
  return 0;
×
124
}
125

NEW
126
static int32_t xmWriteFile(const char *path, const char *name, bool deployed, int32_t proto) {
×
NEW
127
  int32_t   code = -1;
×
NEW
128
  char     *buffer = NULL;
×
NEW
129
  SJson    *pJson = NULL;
×
NEW
130
  TdFilePtr pFile = NULL;
×
NEW
131
  char      file[PATH_MAX] = {0};
×
NEW
132
  char      realfile[PATH_MAX] = {0};
×
133

NEW
134
  int32_t nBytes = snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
×
NEW
135
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
×
NEW
136
    code = TSDB_CODE_OUT_OF_BUFFER;
×
NEW
137
    goto _OVER;
×
138
  }
139

NEW
140
  nBytes = snprintf(realfile, sizeof(realfile), "%s%s%s.json", path, TD_DIRSEP, name);
×
NEW
141
  if (nBytes <= 0 || nBytes >= PATH_MAX) {
×
NEW
142
    code = TSDB_CODE_OUT_OF_BUFFER;
×
NEW
143
    goto _OVER;
×
144
  }
145

NEW
146
  pJson = tjsonCreateObject();
×
NEW
147
  if (pJson == NULL) {
×
NEW
148
    code = terrno;
×
NEW
149
    goto _OVER;
×
150
  }
151

NEW
152
  if ((code = xmEncodeFile(pJson, deployed, proto)) != 0) goto _OVER;
×
153

NEW
154
  buffer = tjsonToString(pJson);
×
NEW
155
  if (buffer == NULL) {
×
NEW
156
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
NEW
157
    goto _OVER;
×
158
  }
159

NEW
160
  pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
NEW
161
  if (pFile == NULL) {
×
NEW
162
    code = terrno;
×
NEW
163
    goto _OVER;
×
164
  }
165

NEW
166
  int32_t len = strlen(buffer);
×
NEW
167
  if (taosWriteFile(pFile, buffer, len) <= 0) {
×
NEW
168
    code = terrno;
×
NEW
169
    goto _OVER;
×
170
  }
NEW
171
  if (taosFsyncFile(pFile) < 0) {
×
NEW
172
    code = terrno;
×
NEW
173
    goto _OVER;
×
174
  }
175

NEW
176
  if (taosCloseFile(&pFile) != 0) {
×
NEW
177
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
NEW
178
    goto _OVER;
×
179
  }
NEW
180
  TAOS_CHECK_GOTO(taosRenameFile(file, realfile), NULL, _OVER);
×
181

NEW
182
  dInfo("succeed to write file:%s, deloyed:%d", realfile, deployed);
×
183

NEW
184
_OVER:
×
NEW
185
  if (pJson != NULL) tjsonDelete(pJson);
×
NEW
186
  if (buffer != NULL) taosMemoryFree(buffer);
×
NEW
187
  if (pFile != NULL) taosCloseFile(&pFile);
×
188

NEW
189
  if (code != 0) {
×
NEW
190
    dError("failed to write file:%s since %s, deloyed:%d", realfile, tstrerror(code), deployed);
×
191
  }
NEW
192
  return code;
×
193
}
194

NEW
195
int32_t xmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
×
NEW
196
  int32_t          code = 0;
×
NEW
197
  SDCreateXnodeReq createReq = {0};
×
NEW
198
  if (tDeserializeSMCreateXnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
×
NEW
199
    code = TSDB_CODE_INVALID_MSG;
×
NEW
200
    return code;
×
201
  }
202

NEW
203
  bool deployed = true;
×
NEW
204
  if ((code = xmWriteFile(pInput->path, pInput->name, deployed, 1)) != 0) {
×
NEW
205
    dError("failed to write xnode file since %s", tstrerror(code));
×
206

NEW
207
    tFreeSMCreateXnodeReq(&createReq);
×
NEW
208
    return code;
×
209
  }
210

NEW
211
  tFreeSMCreateXnodeReq(&createReq);
×
212

NEW
213
  return 0;
×
214
}
215

NEW
216
int32_t xmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
×
NEW
217
  int32_t        code = 0;
×
NEW
218
  SDDropXnodeReq dropReq = {0};
×
NEW
219
  if (tDeserializeSMDropXnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
×
NEW
220
    code = TSDB_CODE_INVALID_MSG;
×
221

NEW
222
    return code;
×
223
  }
224

225
  // if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
226
  //   code = TSDB_CODE_INVALID_OPTION;
227
  //   dError("failed to drop bnode since %s", tstrerror(code));
228

229
  //   tFreeSMDropBnodeReq(&dropReq);
230
  //   return code;
231
  // }
232

NEW
233
  bool deployed = false;
×
NEW
234
  if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) {
×
NEW
235
    dError("failed to write bnode file since %s", tstrerror(code));
×
236

NEW
237
    tFreeSMDropXnodeReq(&dropReq);
×
NEW
238
    return code;
×
239
  }
240

NEW
241
  tFreeSMDropXnodeReq(&dropReq);
×
242

NEW
243
  return 0;
×
244
}
245

246
SArray *xmGetMsgHandles() {
534,152✔
247
  int32_t code = -1;
534,152✔
248
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
534,152✔
249
  if (pArray == NULL) goto _OVER;
534,152✔
250

251
  code = 0;
534,152✔
252
_OVER:
534,152✔
253
  if (code != 0) {
534,152✔
NEW
254
    taosArrayDestroy(pArray);
×
NEW
255
    return NULL;
×
256
  } else {
257
    return pArray;
534,152✔
258
  }
259
}
260

261
SMgmtFunc xmGetMgmtFunc() {
539,436✔
262
  SMgmtFunc mgmtFunc = {0};
539,436✔
263
  mgmtFunc.openFp = xmOpen;
539,436✔
264
  mgmtFunc.closeFp = (NodeCloseFp)xmClose;
539,436✔
265
  mgmtFunc.createFp = (NodeCreateFp)xmProcessCreateReq;
539,436✔
266
  mgmtFunc.dropFp = (NodeDropFp)xmProcessDropReq;
539,436✔
267
  mgmtFunc.requiredFp = xmRequire;
539,436✔
268
  mgmtFunc.getHandlesFp = xmGetMsgHandles;
539,436✔
269

270
  return mgmtFunc;
539,436✔
271
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc