• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4931

16 Jan 2026 02:32AM UTC coverage: 66.749% (+0.03%) from 66.716%
#4931

push

travis-ci

web-flow
enh: interp supports using non-null prev/next values to fill (#34236)

281 of 327 new or added lines in 11 files covered. (85.93%)

1890 existing lines in 121 files now uncovered.

203303 of 304580 relevant lines covered (66.75%)

129941648.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.79
/source/libs/txnode/src/txnodeMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
// clang-format off
17
#include "uv.h"
18
#include "os.h"
19
#include "tarray.h"
20
#include "tglobal.h"
21
#include "txnode.h"
22
#include "txnodeInt.h"
23
#include "osString.h"
24

25
// clang-format on
26

27
extern char **environ;
28

29
#ifdef WINDOWS
30
#define XNODED_DEFAULT_PATH "C:\\TDengine"
31
#define XNODED_DEFAULT_EXEC "\\xnoded.exe"
32
#else
33
#define XNODED_DEFAULT_PATH "/usr/bin"
34
#define XNODED_DEFAULT_EXEC "/xnoded"
35
#endif
36

37
#define XNODED_XNODED_PID_NAME ".xnoded.pid"
38

39
typedef struct {
40
  bool         isStarted;
41
  bool         needCleanUp;
42
  uv_loop_t    loop;
43
  uv_thread_t  thread;
44
  uv_barrier_t barrier;
45
  uv_process_t process;
46
  int32_t      spawnErr;
47
  uv_pipe_t    ctrlPipe;
48
  uv_async_t   stopAsync;
49
  int32_t      isStopped;
50
  int32_t      dnodeId;
51
  int64_t      clusterId;
52
  char         userPass[XNODE_USER_PASS_LEN];
53
  SEp          leaderEp;
54
} SXnodedData;
55

56
SXnodedData xnodedGlobal = {0};
57

58
static int32_t xnodeMgmtSpawnXnoded(SXnodedData *pData);
59

60
static void getXnodedPidPath(char *pipeName, int32_t size) {
468✔
61
#ifdef _WIN32
62
  snprintf(pipeName, size, "%s", XNODED_XNODED_PID_NAME);
63
#else
64
  snprintf(pipeName, size, "%s%s", tsDataDir, XNODED_XNODED_PID_NAME);
468✔
65
#endif
66
  xndDebug("xnode get xnoded pid path:%s", pipeName);
468✔
67
}
468✔
68

69
static void    xnodeMgmtXnodedExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
×
70
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(process);
×
71
  xndDebug("xnoded process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
×
72
  SXnodedData *pData = process->data;
×
73
  if (pData == NULL) {
×
74
    xndError("xnoded process data is NULL");
×
75
    return;
×
76
  }
77
  if ((exitStatus == 0 && termSignal == 0) || atomic_load_32(&pData->isStopped)) {
×
78
    xndInfo("xnoded process exit due to exit status 0 or dnode-mgmt called stop");
×
79
    if (uv_async_send(&pData->stopAsync) != 0) {
×
80
      xndError("stop xnoded: failed to send stop async");
×
81
    }
82
    char xnodedPipeSocket[PATH_MAX] = {0};
×
83
    getXnodedPipeName(xnodedPipeSocket, PATH_MAX);
×
84
    if (0 != unlink(xnodedPipeSocket)) {
×
85
      xndWarn("txnode failed to unlink, socket:%s, err:%s", xnodedPipeSocket, terrstr());
×
86
    }
87

88
    char *pidPath = xnodedPipeSocket;
×
89
    memset(pidPath, 0, PATH_MAX);
×
90
    getXnodedPidPath(pidPath, PATH_MAX);
×
91
    (void)taosRemoveFile(pidPath);
×
92
  } else {
93
    xndInfo("xnoded process restart, exit status %" PRId64 ", signal %d", exitStatus, termSignal);
×
94
    uv_sleep(2000);
×
95
    int32_t code = xnodeMgmtSpawnXnoded(pData);
×
96
    if (code != 0) {
×
97
      xndError("xnoded process restart failed with code:%d", code);
×
98
    }
99
  }
100
}
101
void killPreXnoded() {
468✔
102
  char buf[PATH_MAX] = {0};
468✔
103
  getXnodedPidPath(buf, sizeof(buf));
468✔
104

105
  TdFilePtr pFile = NULL;
468✔
106
  pFile = taosOpenFile(buf, TD_FILE_READ);
468✔
107
  if (pFile == NULL) {
468✔
108
    xndWarn("xnode failed to open xnoded pid file:%s, file may not exist", buf);
468✔
109
    return;
468✔
110
  }
111
  int64_t readSize = taosReadFile(pFile, buf, sizeof(buf));
×
112
  if (readSize <= 0) {
×
113
    if (readSize < 0) {
×
114
      xndError("xnode failed to read len from file:%p since %s", pFile, terrstr());
×
115
    }
116
    (void)taosCloseFile(&pFile);
×
117
    return;
×
118
  }
119
  (void)taosCloseFile(&pFile);
×
120

121
  int32_t pid = taosStr2Int32(buf, NULL, 10);
×
122
  int result = uv_kill((uv_pid_t)pid, SIGTERM);
×
123
  if (result != 0) {
×
124
    if (result != UV_ESRCH) {
×
125
      xndError("xnode failed to kill process %d: %s", pid, uv_strerror(result));
×
126
    }
127
    return;
×
128
  }
129
}
130

131
void saveXnodedPid(int32_t pid) {
×
132
  char buf[PATH_MAX] = {0};
×
133
  getXnodedPidPath(buf, sizeof(buf));
×
134
  TdFilePtr testFilePtr = taosCreateFile(buf, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
×
135
  snprintf(buf, PATH_MAX, "%d", pid);
×
136
  (void)taosWriteFile(testFilePtr, buf, strlen(buf));
×
137
  (void)taosCloseFile(&testFilePtr);
×
138
}
×
139

140
static int32_t xnodeMgmtSpawnXnoded(SXnodedData *pData) {
468✔
141
  xndDebug("start to init xnoded");
468✔
142
  TAOS_XNODED_MGMT_CHECK_PTR_RCODE(pData);
936✔
143

144
  int32_t              err = 0;
468✔
145
  uv_process_options_t options = {0};
468✔
146

147
  char path[PATH_MAX] = {0};
468✔
148
  if (tsProcPath == NULL) {
468✔
149
    path[0] = '.';
×
150
#ifdef WINDOWS
151
    GetModuleFileName(NULL, path, PATH_MAX);
152
#elif defined(_TD_DARWIN_64)
153
    uint32_t pathSize = sizeof(path);
154
    _NSGetExecutablePath(path, &pathSize);
155
#endif
156
  } else {
157
    TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
468✔
158
  }
159

160
  TAOS_DIRNAME(path);
468✔
161

162
  if (strlen(path) == 0) {
468✔
163
    TAOS_STRCAT(path, XNODED_DEFAULT_PATH);
×
164
  }
165
  TAOS_STRCAT(path, XNODED_DEFAULT_EXEC);
468✔
166

167
  xndInfo("xnode mgmt spawn xnoded path: %s", path);
468✔
168
  // char *argsXnoded[] = {path, "-c", configDir, "-d", dnodeId, NULL};
169
  char *argsXnoded[] = {path, NULL};
468✔
170
  options.args = argsXnoded;
468✔
171
  options.file = path;
468✔
172

173
  options.exit_cb = xnodeMgmtXnodedExit;
468✔
174

175
  killPreXnoded();
468✔
176

177
  char xnodedPipeSocket[PATH_MAX] = {0};
468✔
178
  getXnodedPipeName(xnodedPipeSocket, PATH_MAX);
468✔
179
  if (0 != unlink(xnodedPipeSocket)) {
468✔
180
    xndWarn("txnode failed to unlink, ignore if first time, socket:%s, err:%s", xnodedPipeSocket, terrstr());
468✔
181
  }
182

183
  TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
468✔
184

185
  uv_stdio_container_t child_stdio[3];
468✔
186
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
468✔
187
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
468✔
188
  child_stdio[1].flags = UV_IGNORE;
468✔
189
  child_stdio[2].flags = UV_INHERIT_FD;
468✔
190
  child_stdio[2].data.fd = 2;
468✔
191
  options.stdio_count = 3;
468✔
192
  options.stdio = child_stdio;
468✔
193

194
  options.flags = UV_PROCESS_DETACHED;
468✔
195

196
  char xnodedCfgDir[PATH_MAX] = {0};
468✔
197
  snprintf(xnodedCfgDir, PATH_MAX, "%s=%s", "XNODED_CFG_DIR", configDir);
468✔
198
  char xnodedLogDir[PATH_MAX] = {0};
468✔
199
  snprintf(xnodedLogDir, PATH_MAX, "%s=%s", "XNODED_LOG_DIR", tsLogDir);
468✔
200
  char dnodeIdEnvItem[64] = {0};
468✔
201
  snprintf(dnodeIdEnvItem, 64, "%s=%s:%d", "XNODED_LEADER_EP", pData->leaderEp.fqdn, pData->leaderEp.port);
468✔
202
  char xnodedUserPass[XNODE_USER_PASS_LEN] = {0};
468✔
203
  snprintf(xnodedUserPass, XNODE_USER_PASS_LEN, "%s=%s", "XNODED_USER_PASS", pData->userPass);
468✔
204
  char xnodeClusterId[32] = {0};
468✔
205
  snprintf(xnodeClusterId, 32, "%s=%" PRIu64, "XNODED_CLUSTER_ID", pData->clusterId);
468✔
206

207
  char xnodePipeSocket[PATH_MAX + 64] = {0};
468✔
208
  snprintf(xnodePipeSocket, PATH_MAX + 64, "%s=%s", "XNODED_LISTEN", xnodedPipeSocket);
468✔
209

210
  xndDebug("txnode env: leader ep: %s, user pass:%s, pipe socket:%s", dnodeIdEnvItem, xnodedUserPass, xnodePipeSocket);
468✔
211

212
  char *envXnoded[] = {xnodedCfgDir,    xnodedLogDir, dnodeIdEnvItem, xnodedUserPass, xnodeClusterId,
468✔
213
                       xnodePipeSocket, NULL};
214

215
  char **envXnodedWithPEnv = NULL;
468✔
216
  if (environ != NULL) {
468✔
217
    int32_t lenEnvXnoded = ARRAY_SIZE(envXnoded);
468✔
218
    int32_t numEnviron = 0;
468✔
219
    while (environ[numEnviron] != NULL) {
14,508✔
220
      numEnviron++;
14,040✔
221
    }
222

223
    envXnodedWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvXnoded, sizeof(char *));
468✔
224
    if (envXnodedWithPEnv == NULL) {
468✔
225
      err = TSDB_CODE_OUT_OF_MEMORY;
×
226
      goto _OVER;
×
227
    }
228

229
    for (int32_t i = 0; i < numEnviron; i++) {
14,508✔
230
      int32_t len = strlen(environ[i]) + 1;
14,040✔
231
      envXnodedWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
14,040✔
232
      if (envXnodedWithPEnv[i] == NULL) {
14,040✔
233
        err = TSDB_CODE_OUT_OF_MEMORY;
×
234
        goto _OVER;
×
235
      }
236

237
      tstrncpy(envXnodedWithPEnv[i], environ[i], len);
14,040✔
238
    }
239

240
    for (int32_t i = 0; i < lenEnvXnoded; i++) {
3,744✔
241
      if (envXnoded[i] != NULL) {
3,276✔
242
        int32_t len = strlen(envXnoded[i]) + 1;
2,808✔
243
        envXnodedWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
2,808✔
244
        if (envXnodedWithPEnv[numEnviron + i] == NULL) {
2,808✔
245
          err = TSDB_CODE_OUT_OF_MEMORY;
×
246
          goto _OVER;
×
247
        }
248

249
        tstrncpy(envXnodedWithPEnv[numEnviron + i], envXnoded[i], len);
2,808✔
250
      }
251
    }
252
    envXnodedWithPEnv[numEnviron + lenEnvXnoded - 1] = NULL;
468✔
253

254
    options.env = envXnodedWithPEnv;
468✔
255
  } else {
256
    options.env = envXnoded;
×
257
  }
258

259
  err = uv_spawn(&pData->loop, &pData->process, &options);
468✔
260
  pData->process.data = (void *)pData;
468✔
261
  if (err != 0) {
468✔
262
    xndError("can not spawn xnoded. path: %s, error: %s", path, uv_strerror(err));
468✔
263
  } else {
264
    xndInfo("xnoded is initialized, xnoded pid: %d", pData->process.pid);
×
265
    saveXnodedPid(pData->process.pid);
×
266
  }
267

268
_OVER:
468✔
269
  // if (taosFqdnEnvItem) {
270
  //   taosMemoryFree(taosFqdnEnvItem);
271
  // }
272

273
  if (envXnodedWithPEnv != NULL) {
468✔
274
    int32_t i = 0;
468✔
275
    while (envXnodedWithPEnv[i] != NULL) {
17,316✔
276
      taosMemoryFree(envXnodedWithPEnv[i]);
16,848✔
277
      i++;
16,848✔
278
    }
279
    taosMemoryFree(envXnodedWithPEnv);
468✔
280
  }
281

282
  return err;
468✔
283
}
284

285
static void xnodeMgmtXnodedCloseWalkCb(uv_handle_t *handle, void *arg) {
×
286
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(handle);
×
287
  if (!uv_is_closing(handle)) {
×
288
    uv_close(handle, NULL);
×
289
  }
290
}
291

292
static void xnodeMgmtXnodedStopAsyncCb(uv_async_t *async) {
×
293
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(async);
×
294
  SXnodedData *pData = async->data;
×
295
  uv_stop(&pData->loop);
×
296
}
297

298
static void xnodeMgmtWatchXnoded(void *args) {
234✔
299
  TAOS_XNODED_MGMT_CHECK_PTR_RVOID(args);
468✔
300
  SXnodedData *pData = args;
234✔
301
  TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
234✔
302
  TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, xnodeMgmtXnodedStopAsyncCb));
234✔
303
  pData->stopAsync.data = pData;
234✔
304
  TAOS_UV_CHECK_ERRNO(xnodeMgmtSpawnXnoded(pData));
234✔
305
  atomic_store_32(&pData->spawnErr, 0);
×
306
  (void)uv_barrier_wait(&pData->barrier);
×
307
  int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
×
308
  xndInfo("xnoded loop exit with %d active handles, line:%d", num, __LINE__);
×
309

310
  uv_walk(&pData->loop, xnodeMgmtXnodedCloseWalkCb, NULL);
×
311
  num = uv_run(&pData->loop, UV_RUN_DEFAULT);
×
312
  xndInfo("xnoded loop exit with %d active handles, line:%d", num, __LINE__);
×
313
  if (uv_loop_close(&pData->loop) != 0) {
×
314
    xndError("xnoded loop close failed, lino:%d", __LINE__);
×
315
  }
316
  return;
×
317

318
_exit:
234✔
319
  if (terrno != 0) {
234✔
320
    (void)uv_barrier_wait(&pData->barrier);
234✔
321
    atomic_store_32(&pData->spawnErr, terrno);
234✔
322
    if (uv_loop_close(&pData->loop) != 0) {
234✔
323
      xndError("xnoded loop close failed, lino:%d", __LINE__);
234✔
324
    }
325

326
    xndError("xnoded thread exit with code:%d lino:%d", terrno, __LINE__);
234✔
327
    terrno = TSDB_CODE_XNODE_UV_EXEC_FAILURE;
234✔
328
  }
329
}
330

331
/**
332
 * start xnoded that serves xnode function invocation under dnode startDnodeId
333
 * @param startDnodeId
334
 * @return
335
 */
336
int32_t xnodeMgmtStartXnoded(SXnode *pXnode) {
234✔
337
  int32_t code = 0, lino = 0;
234✔
338

339
  SXnodedData *pData = &xnodedGlobal;
234✔
340
  pData->leaderEp = pXnode->ep;
234✔
341
  if (pData->isStarted) {
234✔
342
    xndInfo("dnode start xnoded already called");
×
343
    return 0;
×
344
  }
345
  pData->isStarted = true;
234✔
346
  char dnodeId[8] = {0};
234✔
347
  snprintf(dnodeId, sizeof(dnodeId), "%d", pXnode->dnodeId);
234✔
348
  TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
234✔
349
  pData->dnodeId = pXnode->dnodeId;
234✔
350
  pData->clusterId = pXnode->clusterId;
234✔
351
  memset(pData->userPass, 0, sizeof(pData->userPass));
234✔
352
  memcpy(pData->userPass, pXnode->userPass, pXnode->upLen);
234✔
353

354
  TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
234✔
355
  TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, xnodeMgmtWatchXnoded, pData), &lino, _exit);
234✔
356
  (void)uv_barrier_wait(&pData->barrier);
234✔
357
  int32_t err = atomic_load_32(&pData->spawnErr);
234✔
358
  if (err != 0) {
234✔
359
    uv_barrier_destroy(&pData->barrier);
234✔
360
    if (uv_async_send(&pData->stopAsync) != 0) {
234✔
361
      xndError("start xnoded: failed to send stop async");
×
362
    }
363
    if (uv_thread_join(&pData->thread) != 0) {
234✔
364
      xndError("start xnoded: failed to join xnoded thread");
×
365
    }
366
    pData->needCleanUp = false;
234✔
367
    xndInfo("xnoded is cleaned up after spawn err");
234✔
368
    TAOS_CHECK_GOTO(err, &lino, _exit);
234✔
369
  } else {
UNCOV
370
    pData->needCleanUp = true;
×
UNCOV
371
    atomic_store_32(&pData->isStopped, 0);
×
372
  }
373
_exit:
234✔
374
  if (code != 0) {
234✔
375
    xndError("xnoded start failed with lino:%d, code:%d, error: %s", code, lino, uv_strerror(code));
234✔
376
  }
377
  return code;
234✔
378
}
379
/**
380
 * stop xnoded
381
 * @return
382
 */
383
void xnodeMgmtStopXnoded(void) {
627,801✔
384
  SXnodedData *pData = &xnodedGlobal;
627,801✔
385
  xndInfo("stopping xnoded, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
627,801✔
386
  if (!pData->needCleanUp || atomic_load_32(&pData->isStopped)) {
627,801✔
387
    return;
627,801✔
388
  }
UNCOV
389
  atomic_store_32(&pData->isStopped, 1);
×
UNCOV
390
  pData->needCleanUp = false;
×
UNCOV
391
  (void)uv_process_kill(&pData->process, SIGTERM);
×
UNCOV
392
  uv_barrier_destroy(&pData->barrier);
×
393

UNCOV
394
  if (uv_thread_join(&pData->thread) != 0) {
×
395
    xndError("stop xnoded: failed to join xnoded thread");
×
396
  }
UNCOV
397
  xndInfo("xnoded is cleaned up");
×
398

UNCOV
399
  pData->isStarted = false;
×
400

UNCOV
401
  return;
×
402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc