• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4912

04 Jan 2026 09:05AM UTC coverage: 64.888% (-0.1%) from 65.028%
#4912

push

travis-ci

web-flow
merge: from main to 3.0 branch #34156

1206 of 4524 new or added lines in 22 files covered. (26.66%)

5351 existing lines in 123 files now uncovered.

194856 of 300296 relevant lines covered (64.89%)

118198896.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.89
/source/libs/txnode/src/txnodeMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
// clang-format off
17
#include "uv.h"
18
#include "os.h"
19
#include "tarray.h"
20
#include "tglobal.h"
21
#include "txnode.h"
22
#include "txnodeInt.h"
23
#include "osString.h"
24

25
// clang-format on
26

27
extern char **environ;
28

29
#ifdef WINDOWS
30
#define XNODED_DEFAULT_PATH "C:\\TDengine"
31
#define XNODED_DEFAULT_EXEC "\\xnoded.exe"
32
#else
33
#define XNODED_DEFAULT_PATH "/usr/bin"
34
#define XNODED_DEFAULT_EXEC "/xnoded"
35
#endif
36

37
#define XNODED_XNODED_PID_NAME ".xnoded.pid"
38

39
typedef struct {
40
  bool         isStarted;
41
  bool         needCleanUp;
42
  uv_loop_t    loop;
43
  uv_thread_t  thread;
44
  uv_barrier_t barrier;
45
  uv_process_t process;
46
  int32_t      spawnErr;
47
  uv_pipe_t    ctrlPipe;
48
  uv_async_t   stopAsync;
49
  int32_t      isStopped;
50
  int32_t      dnodeId;
51
  int64_t      clusterId;
52
  char         userPass[XNODE_USER_PASS_LEN];
53
  SEp          leaderEp;
54
} SXnodedData;
55

56
SXnodedData xnodedGlobal = {0};
57

58
static int32_t xnodeMgmtSpawnXnoded(SXnodedData *pData);
59

60
static void getXnodedPidPath(char *pipeName, int32_t size) {
264✔
61
#ifdef _WIN32
62
  snprintf(pipeName, size, "%s", XNODED_XNODED_PID_NAME);
63
#else
64
  snprintf(pipeName, size, "%s%s", tsDataDir, XNODED_XNODED_PID_NAME);
264✔
65
#endif
66
  xndDebug("xnode get xnoded pid path:%s", pipeName);
264✔
67
}
264✔
68

NEW
69
static void    xnodeMgmtXnodedExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
×
NEW
70
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(process);
×
NEW
71
  xndDebug("xnoded process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
×
NEW
72
  SXnodedData *pData = process->data;
×
NEW
73
  if (pData == NULL) {
×
NEW
74
    xndError("xnoded process data is NULL");
×
NEW
75
    return;
×
76
  }
NEW
77
  if ((exitStatus == 0 && termSignal == 0) || atomic_load_32(&pData->isStopped)) {
×
NEW
78
    xndInfo("xnoded process exit due to exit status 0 or dnode-mgmt called stop");
×
NEW
79
    if (uv_async_send(&pData->stopAsync) != 0) {
×
NEW
80
      xndError("stop xnoded: failed to send stop async");
×
81
    }
NEW
82
    char xnodedPipeSocket[PATH_MAX] = {0};
×
NEW
83
    getXnodedPipeName(xnodedPipeSocket, PATH_MAX);
×
NEW
84
    if (0 != unlink(xnodedPipeSocket)) {
×
NEW
85
      xndWarn("txnode failed to unlink, socket:%s, err:%s", xnodedPipeSocket, terrstr());
×
86
    }
87

NEW
88
    char *pidPath = xnodedPipeSocket;
×
NEW
89
    memset(pidPath, 0, PATH_MAX);
×
NEW
90
    getXnodedPidPath(pidPath, PATH_MAX);
×
NEW
91
    (void)taosRemoveFile(pidPath);
×
92
  } else {
NEW
93
    xndInfo("xnoded process restart, exit status %" PRId64 ", signal %d", exitStatus, termSignal);
×
NEW
94
    uv_sleep(2000);
×
NEW
95
    int32_t code = xnodeMgmtSpawnXnoded(pData);
×
NEW
96
    if (code != 0) {
×
NEW
97
      xndError("xnoded process restart failed with code:%d", code);
×
98
    }
99
  }
100
}
101
void killPreXnoded() {
264✔
102
  char buf[PATH_MAX] = {0};
264✔
103
  getXnodedPidPath(buf, sizeof(buf));
264✔
104

105
  TdFilePtr pFile = NULL;
264✔
106
  pFile = taosOpenFile(buf, TD_FILE_READ);
264✔
107
  if (pFile == NULL) {
264✔
108
    xndWarn("xnode failed to open xnoded pid file:%s, file may not exist", buf);
264✔
109
    return;
264✔
110
  }
NEW
111
  int64_t readSize = taosReadFile(pFile, buf, sizeof(buf));
×
NEW
112
  if (readSize <= 0) {
×
NEW
113
    if (readSize < 0) {
×
NEW
114
      xndError("xnode failed to read len from file:%p since %s", pFile, terrstr());
×
115
    }
NEW
116
    (void)taosCloseFile(&pFile);
×
NEW
117
    return;
×
118
  }
NEW
119
  int32_t pid = taosStr2Int32(buf, NULL, 10);
×
NEW
120
  int result = uv_kill((uv_pid_t)pid, SIGTERM);
×
NEW
121
  if (result != 0) {
×
NEW
122
    if (result != UV_ESRCH) {
×
NEW
123
      xndError("xnode failed to kill process %d: %s", pid, uv_strerror(result));
×
124
    }
NEW
125
    return;
×
126
  }
127
}
128

NEW
129
void saveXnodedPid(int32_t pid) {
×
NEW
130
  char buf[PATH_MAX] = {0};
×
NEW
131
  getXnodedPidPath(buf, sizeof(buf));
×
NEW
132
  TdFilePtr testFilePtr = taosCreateFile(buf, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
×
NEW
133
  snprintf(buf, PATH_MAX, "%d", pid);
×
NEW
134
  (void)taosWriteFile(testFilePtr, buf, strlen(buf));
×
NEW
135
  (void)taosCloseFile(&testFilePtr);
×
NEW
136
}
×
137

138
static int32_t xnodeMgmtSpawnXnoded(SXnodedData *pData) {
264✔
139
  xndDebug("start to init xnoded");
264✔
140
  TAOS_XNODED_MGMT_CHECK_PTR_RCODE(pData);
528✔
141

142
  int32_t              err = 0;
264✔
143
  uv_process_options_t options = {0};
264✔
144

145
  char path[PATH_MAX] = {0};
264✔
146
  if (tsProcPath == NULL) {
264✔
NEW
147
    path[0] = '.';
×
148
#ifdef WINDOWS
149
    GetModuleFileName(NULL, path, PATH_MAX);
150
#elif defined(_TD_DARWIN_64)
151
    uint32_t pathSize = sizeof(path);
152
    _NSGetExecutablePath(path, &pathSize);
153
#endif
154
  } else {
155
    TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
264✔
156
  }
157

158
  TAOS_DIRNAME(path);
264✔
159

160
  if (strlen(path) == 0) {
264✔
NEW
161
    TAOS_STRCAT(path, XNODED_DEFAULT_PATH);
×
162
  }
163
  TAOS_STRCAT(path, XNODED_DEFAULT_EXEC);
264✔
164

165
  xndInfo("xnode mgmt spawn xnoded path: %s", path);
264✔
166
  // char *argsXnoded[] = {path, "-c", configDir, "-d", dnodeId, NULL};
167
  char *argsXnoded[] = {path, NULL};
264✔
168
  options.args = argsXnoded;
264✔
169
  options.file = path;
264✔
170

171
  options.exit_cb = xnodeMgmtXnodedExit;
264✔
172

173
  killPreXnoded();
264✔
174

175
  char xnodedPipeSocket[PATH_MAX] = {0};
264✔
176
  getXnodedPipeName(xnodedPipeSocket, PATH_MAX);
264✔
177
  if (0 != unlink(xnodedPipeSocket)) {
264✔
178
    xndWarn("txnode failed to unlink, ignore if first time, socket:%s, err:%s", xnodedPipeSocket, terrstr());
264✔
179
  }
180

181
  TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
264✔
182

183
  uv_stdio_container_t child_stdio[3];
264✔
184
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
264✔
185
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
264✔
186
  child_stdio[1].flags = UV_IGNORE;
264✔
187
  child_stdio[2].flags = UV_INHERIT_FD;
264✔
188
  child_stdio[2].data.fd = 2;
264✔
189
  options.stdio_count = 3;
264✔
190
  options.stdio = child_stdio;
264✔
191

192
  options.flags = UV_PROCESS_DETACHED;
264✔
193

194
  char xnodedCfgDir[PATH_MAX] = {0};
264✔
195
  snprintf(xnodedCfgDir, PATH_MAX, "%s=%s", "XNODED_CFG_DIR", configDir);
264✔
196
  char xnodedLogDir[PATH_MAX] = {0};
264✔
197
  snprintf(xnodedLogDir, PATH_MAX, "%s=%s", "XNODED_LOG_DIR", tsLogDir);
264✔
198
  char dnodeIdEnvItem[64] = {0};
264✔
199
  snprintf(dnodeIdEnvItem, 64, "%s=%s:%d", "XNODED_LEADER_EP", pData->leaderEp.fqdn, pData->leaderEp.port);
264✔
200
  char xnodedUserPass[XNODE_USER_PASS_LEN] = {0};
264✔
201
  snprintf(xnodedUserPass, XNODE_USER_PASS_LEN, "%s=%s", "XNODED_USER_PASS", pData->userPass);
264✔
202
  char xnodeClusterId[32] = {0};
264✔
203
  snprintf(xnodeClusterId, 32, "%s=%" PRIu64, "XNODED_CLUSTER_ID", pData->clusterId);
264✔
204

205
  char xnodePipeSocket[PATH_MAX + 64] = {0};
264✔
206
  snprintf(xnodePipeSocket, PATH_MAX + 64, "%s=%s", "XNODED_LISTEN", xnodedPipeSocket);
264✔
207

208
  char *envXnoded[] = {xnodedCfgDir,    xnodedLogDir, dnodeIdEnvItem, xnodedUserPass, xnodeClusterId,
264✔
209
                       xnodePipeSocket, NULL};
210

211
  char **envXnodedWithPEnv = NULL;
264✔
212
  if (environ != NULL) {
264✔
213
    int32_t lenEnvXnoded = ARRAY_SIZE(envXnoded);
264✔
214
    int32_t numEnviron = 0;
264✔
215
    while (environ[numEnviron] != NULL) {
8,184✔
216
      numEnviron++;
7,920✔
217
    }
218

219
    envXnodedWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvXnoded, sizeof(char *));
264✔
220
    if (envXnodedWithPEnv == NULL) {
264✔
NEW
221
      err = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
222
      goto _OVER;
×
223
    }
224

225
    for (int32_t i = 0; i < numEnviron; i++) {
8,184✔
226
      int32_t len = strlen(environ[i]) + 1;
7,920✔
227
      envXnodedWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
7,920✔
228
      if (envXnodedWithPEnv[i] == NULL) {
7,920✔
NEW
229
        err = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
230
        goto _OVER;
×
231
      }
232

233
      tstrncpy(envXnodedWithPEnv[i], environ[i], len);
7,920✔
234
    }
235

236
    for (int32_t i = 0; i < lenEnvXnoded; i++) {
2,112✔
237
      if (envXnoded[i] != NULL) {
1,848✔
238
        int32_t len = strlen(envXnoded[i]) + 1;
1,584✔
239
        envXnodedWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
1,584✔
240
        if (envXnodedWithPEnv[numEnviron + i] == NULL) {
1,584✔
NEW
241
          err = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
242
          goto _OVER;
×
243
        }
244

245
        tstrncpy(envXnodedWithPEnv[numEnviron + i], envXnoded[i], len);
1,584✔
246
      }
247
    }
248
    envXnodedWithPEnv[numEnviron + lenEnvXnoded - 1] = NULL;
264✔
249

250
    options.env = envXnodedWithPEnv;
264✔
251
  } else {
NEW
252
    options.env = envXnoded;
×
253
  }
254

255
  err = uv_spawn(&pData->loop, &pData->process, &options);
264✔
256
  pData->process.data = (void *)pData;
264✔
257
  if (err != 0) {
264✔
258
    xndError("can not spawn xnoded. path: %s, error: %s", path, uv_strerror(err));
264✔
259
  } else {
NEW
260
    xndInfo("xnoded is initialized, xnoded pid: %d", pData->process.pid);
×
NEW
261
    saveXnodedPid(pData->process.pid);
×
262
  }
263

264
_OVER:
264✔
265
  // if (taosFqdnEnvItem) {
266
  //   taosMemoryFree(taosFqdnEnvItem);
267
  // }
268

269
  if (envXnodedWithPEnv != NULL) {
264✔
270
    int32_t i = 0;
264✔
271
    while (envXnodedWithPEnv[i] != NULL) {
9,768✔
272
      taosMemoryFree(envXnodedWithPEnv[i]);
9,504✔
273
      i++;
9,504✔
274
    }
275
    taosMemoryFree(envXnodedWithPEnv);
264✔
276
  }
277

278
  return err;
264✔
279
}
280

NEW
281
static void xnodeMgmtXnodedCloseWalkCb(uv_handle_t *handle, void *arg) {
×
NEW
282
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(handle);
×
NEW
283
  if (!uv_is_closing(handle)) {
×
NEW
284
    uv_close(handle, NULL);
×
285
  }
286
}
287

NEW
288
static void xnodeMgmtXnodedStopAsyncCb(uv_async_t *async) {
×
NEW
289
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(async);
×
NEW
290
  SXnodedData *pData = async->data;
×
NEW
291
  uv_stop(&pData->loop);
×
292
}
293

294
static void xnodeMgmtWatchXnoded(void *args) {
132✔
295
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(args);
264✔
296
  SXnodedData *pData = args;
132✔
297
  TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
132✔
298
  TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, xnodeMgmtXnodedStopAsyncCb));
132✔
299
  pData->stopAsync.data = pData;
132✔
300
  TAOS_UV_CHECK_ERRNO(xnodeMgmtSpawnXnoded(pData));
132✔
NEW
301
  atomic_store_32(&pData->spawnErr, 0);
×
NEW
302
  (void)uv_barrier_wait(&pData->barrier);
×
NEW
303
  int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
×
NEW
304
  xndInfo("xnoded loop exit with %d active handles, line:%d", num, __LINE__);
×
305

NEW
306
  uv_walk(&pData->loop, xnodeMgmtXnodedCloseWalkCb, NULL);
×
NEW
307
  num = uv_run(&pData->loop, UV_RUN_DEFAULT);
×
NEW
308
  xndInfo("xnoded loop exit with %d active handles, line:%d", num, __LINE__);
×
NEW
309
  if (uv_loop_close(&pData->loop) != 0) {
×
NEW
310
    xndError("xnoded loop close failed, lino:%d", __LINE__);
×
311
  }
NEW
312
  return;
×
313

314
_exit:
132✔
315
  if (terrno != 0) {
132✔
316
    (void)uv_barrier_wait(&pData->barrier);
132✔
317
    atomic_store_32(&pData->spawnErr, terrno);
132✔
318
    if (uv_loop_close(&pData->loop) != 0) {
132✔
319
      xndError("xnoded loop close failed, lino:%d", __LINE__);
132✔
320
    }
321

322
    xndError("xnoded thread exit with code:%d lino:%d", terrno, __LINE__);
132✔
323
    terrno = TSDB_CODE_XNODE_UV_EXEC_FAILURE;
132✔
324
  }
325
}
326

327
/**
328
 * start xnoded that serves xnode function invocation under dnode startDnodeId
329
 * @param startDnodeId
330
 * @return
331
 */
332
int32_t xnodeMgmtStartXnoded(SXnode *pXnode) {
132✔
333
  int32_t code = 0, lino = 0;
132✔
334

335
  SXnodedData *pData = &xnodedGlobal;
132✔
336
  pData->leaderEp = pXnode->ep;
132✔
337
  if (pData->isStarted) {
132✔
NEW
338
    xndInfo("dnode start xnoded already called");
×
NEW
339
    return 0;
×
340
  }
341
  pData->isStarted = true;
132✔
342
  char dnodeId[8] = {0};
132✔
343
  snprintf(dnodeId, sizeof(dnodeId), "%d", pXnode->dnodeId);
132✔
344
  TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
132✔
345
  pData->dnodeId = pXnode->dnodeId;
132✔
346
  pData->clusterId = pXnode->clusterId;
132✔
347
  memset(pData->userPass, 0, sizeof(pData->userPass));
132✔
348
  memcpy(pData->userPass, pXnode->userPass, pXnode->upLen);
132✔
349

350
  TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
132✔
351
  TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, xnodeMgmtWatchXnoded, pData), &lino, _exit);
132✔
352
  (void)uv_barrier_wait(&pData->barrier);
132✔
353
  int32_t err = atomic_load_32(&pData->spawnErr);
132✔
354
  if (err != 0) {
132✔
355
    uv_barrier_destroy(&pData->barrier);
132✔
356
    if (uv_async_send(&pData->stopAsync) != 0) {
132✔
NEW
357
      xndError("start xnoded: failed to send stop async");
×
358
    }
359
    if (uv_thread_join(&pData->thread) != 0) {
132✔
NEW
360
      xndError("start xnoded: failed to join xnoded thread");
×
361
    }
362
    pData->needCleanUp = false;
132✔
363
    xndInfo("xnoded is cleaned up after spawn err");
132✔
364
    TAOS_CHECK_GOTO(err, &lino, _exit);
132✔
365
  } else {
NEW
366
    pData->needCleanUp = true;
×
NEW
367
    atomic_store_32(&pData->isStopped, 0);
×
368
  }
369
_exit:
132✔
370
  if (code != 0) {
132✔
371
    xndError("xnoded start failed with lino:%d, code:%d, error: %s", code, lino, uv_strerror(code));
132✔
372
  }
373
  return code;
132✔
374
}
375
/**
376
 * stop xnoded
377
 * @return
378
 */
379
void xnodeMgmtStopXnoded(void) {
621,145✔
380
  SXnodedData *pData = &xnodedGlobal;
621,145✔
381
  xndInfo("stopping xnoded, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
621,145✔
382
  if (!pData->needCleanUp || atomic_load_32(&pData->isStopped)) {
621,145✔
383
    return;
621,145✔
384
  }
NEW
385
  atomic_store_32(&pData->isStopped, 1);
×
NEW
386
  pData->needCleanUp = false;
×
NEW
387
  (void)uv_process_kill(&pData->process, SIGTERM);
×
NEW
388
  uv_barrier_destroy(&pData->barrier);
×
389

NEW
390
  if (uv_thread_join(&pData->thread) != 0) {
×
NEW
391
    xndError("stop xnoded: failed to join xnoded thread");
×
392
  }
NEW
393
  xndInfo("xnoded is cleaned up");
×
394

NEW
395
  pData->isStarted = false;
×
396

NEW
397
  return;
×
398
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc