• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4925

12 Jan 2026 09:34AM UTC coverage: 66.107% (+0.8%) from 65.354%
#4925

push

travis-ci

web-flow
merge: from main to 3.0 branch #34248

103 of 129 new or added lines in 9 files covered. (79.84%)

891 existing lines in 139 files now uncovered.

200488 of 303278 relevant lines covered (66.11%)

129810096.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.22
/source/libs/txnode/src/txnodeMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
// clang-format off
17
#include "uv.h"
18
#include "os.h"
19
#include "tarray.h"
20
#include "tglobal.h"
21
#include "txnode.h"
22
#include "txnodeInt.h"
23
#include "osString.h"
24

25
// clang-format on
26

27
extern char **environ;
28

29
#ifdef WINDOWS
30
#define XNODED_DEFAULT_PATH "C:\\TDengine"
31
#define XNODED_DEFAULT_EXEC "\\xnoded.exe"
32
#else
33
#define XNODED_DEFAULT_PATH "/usr/bin"
34
#define XNODED_DEFAULT_EXEC "/xnoded"
35
#endif
36

37
#define XNODED_XNODED_PID_NAME ".xnoded.pid"
38

39
typedef struct {
40
  bool         isStarted;
41
  bool         needCleanUp;
42
  uv_loop_t    loop;
43
  uv_thread_t  thread;
44
  uv_barrier_t barrier;
45
  uv_process_t process;
46
  int32_t      spawnErr;
47
  uv_pipe_t    ctrlPipe;
48
  uv_async_t   stopAsync;
49
  int32_t      isStopped;
50
  int32_t      dnodeId;
51
  int64_t      clusterId;
52
  char         userPass[XNODE_USER_PASS_LEN];
53
  SEp          leaderEp;
54
} SXnodedData;
55

56
SXnodedData xnodedGlobal = {0};
57

58
static int32_t xnodeMgmtSpawnXnoded(SXnodedData *pData);
59

60
static void getXnodedPidPath(char *pipeName, int32_t size) {
258✔
61
#ifdef _WIN32
62
  snprintf(pipeName, size, "%s", XNODED_XNODED_PID_NAME);
63
#else
64
  snprintf(pipeName, size, "%s%s", tsDataDir, XNODED_XNODED_PID_NAME);
258✔
65
#endif
66
  xndDebug("xnode get xnoded pid path:%s", pipeName);
258✔
67
}
258✔
68

69
static void    xnodeMgmtXnodedExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
×
70
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(process);
×
71
  xndDebug("xnoded process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
×
72
  SXnodedData *pData = process->data;
×
73
  if (pData == NULL) {
×
74
    xndError("xnoded process data is NULL");
×
75
    return;
×
76
  }
77
  if ((exitStatus == 0 && termSignal == 0) || atomic_load_32(&pData->isStopped)) {
×
78
    xndInfo("xnoded process exit due to exit status 0 or dnode-mgmt called stop");
×
79
    if (uv_async_send(&pData->stopAsync) != 0) {
×
80
      xndError("stop xnoded: failed to send stop async");
×
81
    }
82
    char xnodedPipeSocket[PATH_MAX] = {0};
×
83
    getXnodedPipeName(xnodedPipeSocket, PATH_MAX);
×
84
    if (0 != unlink(xnodedPipeSocket)) {
×
85
      xndWarn("txnode failed to unlink, socket:%s, err:%s", xnodedPipeSocket, terrstr());
×
86
    }
87

88
    char *pidPath = xnodedPipeSocket;
×
89
    memset(pidPath, 0, PATH_MAX);
×
90
    getXnodedPidPath(pidPath, PATH_MAX);
×
91
    (void)taosRemoveFile(pidPath);
×
92
  } else {
93
    xndInfo("xnoded process restart, exit status %" PRId64 ", signal %d", exitStatus, termSignal);
×
94
    uv_sleep(2000);
×
95
    int32_t code = xnodeMgmtSpawnXnoded(pData);
×
96
    if (code != 0) {
×
97
      xndError("xnoded process restart failed with code:%d", code);
×
98
    }
99
  }
100
}
101
void killPreXnoded() {
258✔
102
  char buf[PATH_MAX] = {0};
258✔
103
  getXnodedPidPath(buf, sizeof(buf));
258✔
104

105
  TdFilePtr pFile = NULL;
258✔
106
  pFile = taosOpenFile(buf, TD_FILE_READ);
258✔
107
  if (pFile == NULL) {
258✔
108
    xndWarn("xnode failed to open xnoded pid file:%s, file may not exist", buf);
258✔
109
    return;
258✔
110
  }
111
  int64_t readSize = taosReadFile(pFile, buf, sizeof(buf));
×
112
  if (readSize <= 0) {
×
113
    if (readSize < 0) {
×
114
      xndError("xnode failed to read len from file:%p since %s", pFile, terrstr());
×
115
    }
116
    (void)taosCloseFile(&pFile);
×
117
    return;
×
118
  }
119
  int32_t pid = taosStr2Int32(buf, NULL, 10);
×
120
  int result = uv_kill((uv_pid_t)pid, SIGTERM);
×
121
  if (result != 0) {
×
122
    if (result != UV_ESRCH) {
×
123
      xndError("xnode failed to kill process %d: %s", pid, uv_strerror(result));
×
124
    }
125
    return;
×
126
  }
127
}
128

129
void saveXnodedPid(int32_t pid) {
×
130
  char buf[PATH_MAX] = {0};
×
131
  getXnodedPidPath(buf, sizeof(buf));
×
132
  TdFilePtr testFilePtr = taosCreateFile(buf, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
×
133
  snprintf(buf, PATH_MAX, "%d", pid);
×
134
  (void)taosWriteFile(testFilePtr, buf, strlen(buf));
×
135
  (void)taosCloseFile(&testFilePtr);
×
136
}
×
137

138
static int32_t xnodeMgmtSpawnXnoded(SXnodedData *pData) {
258✔
139
  xndDebug("start to init xnoded");
258✔
140
  TAOS_XNODED_MGMT_CHECK_PTR_RCODE(pData);
516✔
141

142
  int32_t              err = 0;
258✔
143
  uv_process_options_t options = {0};
258✔
144

145
  char path[PATH_MAX] = {0};
258✔
146
  if (tsProcPath == NULL) {
258✔
147
    path[0] = '.';
×
148
#ifdef WINDOWS
149
    GetModuleFileName(NULL, path, PATH_MAX);
150
#elif defined(_TD_DARWIN_64)
151
    uint32_t pathSize = sizeof(path);
152
    _NSGetExecutablePath(path, &pathSize);
153
#endif
154
  } else {
155
    TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
258✔
156
  }
157

158
  TAOS_DIRNAME(path);
258✔
159

160
  if (strlen(path) == 0) {
258✔
161
    TAOS_STRCAT(path, XNODED_DEFAULT_PATH);
×
162
  }
163
  TAOS_STRCAT(path, XNODED_DEFAULT_EXEC);
258✔
164

165
  xndInfo("xnode mgmt spawn xnoded path: %s", path);
258✔
166
  // char *argsXnoded[] = {path, "-c", configDir, "-d", dnodeId, NULL};
167
  char *argsXnoded[] = {path, NULL};
258✔
168
  options.args = argsXnoded;
258✔
169
  options.file = path;
258✔
170

171
  options.exit_cb = xnodeMgmtXnodedExit;
258✔
172

173
  killPreXnoded();
258✔
174

175
  char xnodedPipeSocket[PATH_MAX] = {0};
258✔
176
  getXnodedPipeName(xnodedPipeSocket, PATH_MAX);
258✔
177
  if (0 != unlink(xnodedPipeSocket)) {
258✔
178
    xndWarn("txnode failed to unlink, ignore if first time, socket:%s, err:%s", xnodedPipeSocket, terrstr());
258✔
179
  }
180

181
  TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
258✔
182

183
  uv_stdio_container_t child_stdio[3];
258✔
184
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
258✔
185
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
258✔
186
  child_stdio[1].flags = UV_IGNORE;
258✔
187
  child_stdio[2].flags = UV_INHERIT_FD;
258✔
188
  child_stdio[2].data.fd = 2;
258✔
189
  options.stdio_count = 3;
258✔
190
  options.stdio = child_stdio;
258✔
191

192
  options.flags = UV_PROCESS_DETACHED;
258✔
193

194
  char xnodedCfgDir[PATH_MAX] = {0};
258✔
195
  snprintf(xnodedCfgDir, PATH_MAX, "%s=%s", "XNODED_CFG_DIR", configDir);
258✔
196
  char xnodedLogDir[PATH_MAX] = {0};
258✔
197
  snprintf(xnodedLogDir, PATH_MAX, "%s=%s", "XNODED_LOG_DIR", tsLogDir);
258✔
198
  char dnodeIdEnvItem[64] = {0};
258✔
199
  snprintf(dnodeIdEnvItem, 64, "%s=%s:%d", "XNODED_LEADER_EP", pData->leaderEp.fqdn, pData->leaderEp.port);
258✔
200
  char xnodedUserPass[XNODE_USER_PASS_LEN] = {0};
258✔
201
  snprintf(xnodedUserPass, XNODE_USER_PASS_LEN, "%s=%s", "XNODED_USER_PASS", pData->userPass);
258✔
202
  char xnodeClusterId[32] = {0};
258✔
203
  snprintf(xnodeClusterId, 32, "%s=%" PRIu64, "XNODED_CLUSTER_ID", pData->clusterId);
258✔
204

205
  char xnodePipeSocket[PATH_MAX + 64] = {0};
258✔
206
  snprintf(xnodePipeSocket, PATH_MAX + 64, "%s=%s", "XNODED_LISTEN", xnodedPipeSocket);
258✔
207

208
  char *envXnoded[] = {xnodedCfgDir,    xnodedLogDir, dnodeIdEnvItem, xnodedUserPass, xnodeClusterId,
258✔
209
                       xnodePipeSocket, NULL};
210

211
  char **envXnodedWithPEnv = NULL;
258✔
212
  if (environ != NULL) {
258✔
213
    int32_t lenEnvXnoded = ARRAY_SIZE(envXnoded);
258✔
214
    int32_t numEnviron = 0;
258✔
215
    while (environ[numEnviron] != NULL) {
7,998✔
216
      numEnviron++;
7,740✔
217
    }
218

219
    envXnodedWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvXnoded, sizeof(char *));
258✔
220
    if (envXnodedWithPEnv == NULL) {
258✔
221
      err = TSDB_CODE_OUT_OF_MEMORY;
×
222
      goto _OVER;
×
223
    }
224

225
    for (int32_t i = 0; i < numEnviron; i++) {
7,998✔
226
      int32_t len = strlen(environ[i]) + 1;
7,740✔
227
      envXnodedWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
7,740✔
228
      if (envXnodedWithPEnv[i] == NULL) {
7,740✔
229
        err = TSDB_CODE_OUT_OF_MEMORY;
×
230
        goto _OVER;
×
231
      }
232

233
      tstrncpy(envXnodedWithPEnv[i], environ[i], len);
7,740✔
234
    }
235

236
    for (int32_t i = 0; i < lenEnvXnoded; i++) {
2,064✔
237
      if (envXnoded[i] != NULL) {
1,806✔
238
        int32_t len = strlen(envXnoded[i]) + 1;
1,548✔
239
        envXnodedWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
1,548✔
240
        if (envXnodedWithPEnv[numEnviron + i] == NULL) {
1,548✔
241
          err = TSDB_CODE_OUT_OF_MEMORY;
×
242
          goto _OVER;
×
243
        }
244

245
        tstrncpy(envXnodedWithPEnv[numEnviron + i], envXnoded[i], len);
1,548✔
246
      }
247
    }
248
    envXnodedWithPEnv[numEnviron + lenEnvXnoded - 1] = NULL;
258✔
249

250
    options.env = envXnodedWithPEnv;
258✔
251
  } else {
252
    options.env = envXnoded;
×
253
  }
254

255
  err = uv_spawn(&pData->loop, &pData->process, &options);
258✔
256
  pData->process.data = (void *)pData;
258✔
257
  if (err != 0) {
258✔
258
    xndError("can not spawn xnoded. path: %s, error: %s", path, uv_strerror(err));
258✔
259
  } else {
260
    xndInfo("xnoded is initialized, xnoded pid: %d", pData->process.pid);
×
261
    saveXnodedPid(pData->process.pid);
×
262
  }
263

264
_OVER:
258✔
265
  // if (taosFqdnEnvItem) {
266
  //   taosMemoryFree(taosFqdnEnvItem);
267
  // }
268

269
  if (envXnodedWithPEnv != NULL) {
258✔
270
    int32_t i = 0;
258✔
271
    while (envXnodedWithPEnv[i] != NULL) {
9,546✔
272
      taosMemoryFree(envXnodedWithPEnv[i]);
9,288✔
273
      i++;
9,288✔
274
    }
275
    taosMemoryFree(envXnodedWithPEnv);
258✔
276
  }
277

278
  return err;
258✔
279
}
280

281
static void xnodeMgmtXnodedCloseWalkCb(uv_handle_t *handle, void *arg) {
×
282
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(handle);
×
283
  if (!uv_is_closing(handle)) {
×
284
    uv_close(handle, NULL);
×
285
  }
286
}
287

288
static void xnodeMgmtXnodedStopAsyncCb(uv_async_t *async) {
×
289
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(async);
×
290
  SXnodedData *pData = async->data;
×
291
  uv_stop(&pData->loop);
×
292
}
293

294
static void xnodeMgmtWatchXnoded(void *args) {
129✔
295
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(args);
258✔
296
  SXnodedData *pData = args;
129✔
297
  TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
129✔
298
  TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, xnodeMgmtXnodedStopAsyncCb));
129✔
299
  pData->stopAsync.data = pData;
129✔
300
  TAOS_UV_CHECK_ERRNO(xnodeMgmtSpawnXnoded(pData));
129✔
301
  atomic_store_32(&pData->spawnErr, 0);
×
302
  (void)uv_barrier_wait(&pData->barrier);
×
303
  int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
×
304
  xndInfo("xnoded loop exit with %d active handles, line:%d", num, __LINE__);
×
305

306
  uv_walk(&pData->loop, xnodeMgmtXnodedCloseWalkCb, NULL);
×
307
  num = uv_run(&pData->loop, UV_RUN_DEFAULT);
×
308
  xndInfo("xnoded loop exit with %d active handles, line:%d", num, __LINE__);
×
309
  if (uv_loop_close(&pData->loop) != 0) {
×
310
    xndError("xnoded loop close failed, lino:%d", __LINE__);
×
311
  }
312
  return;
×
313

314
_exit:
129✔
315
  if (terrno != 0) {
129✔
316
    (void)uv_barrier_wait(&pData->barrier);
129✔
317
    atomic_store_32(&pData->spawnErr, terrno);
129✔
318
    if (uv_loop_close(&pData->loop) != 0) {
129✔
319
      xndError("xnoded loop close failed, lino:%d", __LINE__);
129✔
320
    }
321

322
    xndError("xnoded thread exit with code:%d lino:%d", terrno, __LINE__);
129✔
323
    terrno = TSDB_CODE_XNODE_UV_EXEC_FAILURE;
129✔
324
  }
325
}
326

327
/**
328
 * start xnoded that serves xnode function invocation under dnode startDnodeId
329
 * @param startDnodeId
330
 * @return
331
 */
332
int32_t xnodeMgmtStartXnoded(SXnode *pXnode) {
129✔
333
  int32_t code = 0, lino = 0;
129✔
334

335
  SXnodedData *pData = &xnodedGlobal;
129✔
336
  pData->leaderEp = pXnode->ep;
129✔
337
  if (pData->isStarted) {
129✔
338
    xndInfo("dnode start xnoded already called");
×
339
    return 0;
×
340
  }
341
  pData->isStarted = true;
129✔
342
  char dnodeId[8] = {0};
129✔
343
  snprintf(dnodeId, sizeof(dnodeId), "%d", pXnode->dnodeId);
129✔
344
  TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
129✔
345
  pData->dnodeId = pXnode->dnodeId;
129✔
346
  pData->clusterId = pXnode->clusterId;
129✔
347
  memset(pData->userPass, 0, sizeof(pData->userPass));
129✔
348
  memcpy(pData->userPass, pXnode->userPass, pXnode->upLen);
129✔
349

350
  TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
129✔
351
  TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, xnodeMgmtWatchXnoded, pData), &lino, _exit);
129✔
352
  (void)uv_barrier_wait(&pData->barrier);
129✔
353
  int32_t err = atomic_load_32(&pData->spawnErr);
129✔
354
  if (err != 0) {
129✔
UNCOV
355
    uv_barrier_destroy(&pData->barrier);
×
UNCOV
356
    if (uv_async_send(&pData->stopAsync) != 0) {
×
357
      xndError("start xnoded: failed to send stop async");
×
358
    }
UNCOV
359
    if (uv_thread_join(&pData->thread) != 0) {
×
360
      xndError("start xnoded: failed to join xnoded thread");
×
361
    }
UNCOV
362
    pData->needCleanUp = false;
×
UNCOV
363
    xndInfo("xnoded is cleaned up after spawn err");
×
UNCOV
364
    TAOS_CHECK_GOTO(err, &lino, _exit);
×
365
  } else {
366
    pData->needCleanUp = true;
129✔
367
    atomic_store_32(&pData->isStopped, 0);
129✔
368
  }
369
_exit:
129✔
370
  if (code != 0) {
129✔
UNCOV
371
    xndError("xnoded start failed with lino:%d, code:%d, error: %s", code, lino, uv_strerror(code));
×
372
  }
373
  return code;
129✔
374
}
375
/**
376
 * stop xnoded
377
 * @return
378
 */
379
void xnodeMgmtStopXnoded(void) {
622,603✔
380
  SXnodedData *pData = &xnodedGlobal;
622,603✔
381
  xndInfo("stopping xnoded, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
622,603✔
382
  if (!pData->needCleanUp || atomic_load_32(&pData->isStopped)) {
622,603✔
383
    return;
622,474✔
384
  }
385
  atomic_store_32(&pData->isStopped, 1);
129✔
386
  pData->needCleanUp = false;
129✔
387
  (void)uv_process_kill(&pData->process, SIGTERM);
129✔
388
  uv_barrier_destroy(&pData->barrier);
129✔
389

390
  if (uv_thread_join(&pData->thread) != 0) {
129✔
391
    xndError("stop xnoded: failed to join xnoded thread");
×
392
  }
393
  xndInfo("xnoded is cleaned up");
129✔
394

395
  pData->isStarted = false;
129✔
396

397
  return;
129✔
398
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc