• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3809

01 Apr 2025 03:03AM UTC coverage: 34.048% (+0.02%) from 34.033%
#3809

push

travis-ci

happyguoxy
test:alter gcda dir

148452 of 599532 branches covered (24.76%)

Branch coverage included in aggregate %.

222312 of 489411 relevant lines covered (45.42%)

761122.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.0
/source/common/src/rsync.c
1
//
2
// Created by mingming wanng on 2023/11/2.
3
//
4
#include "rsync.h"
5
#include <stdlib.h>
6
#include "tglobal.h"
7

8
#define ERRNO_ERR_FORMAT "errno:%d,msg:%s"
9
#define ERRNO_ERR_DATA   ERRNO, strerror(ERRNO)
10

11
// deleteRsync function produce empty directories, traverse base directory to remove them
12
static void removeEmptyDir() {
2✔
13
  TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir);
2✔
14
  if (pDir == NULL) return;
2!
15

16
  TdDirEntryPtr de = NULL;
2✔
17
  while ((de = taosReadDir(pDir)) != NULL) {
8✔
18
    if (!taosDirEntryIsDir(de)) {
6✔
19
      continue;
6✔
20
    }
21

22
    if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
4!
23

24
    char filename[PATH_MAX] = {0};
×
25
    snprintf(filename, sizeof(filename), "%s%s", tsCheckpointBackupDir, taosGetDirEntryName(de));
×
26

27
    TdDirPtr      pDirTmp = taosOpenDir(filename);
×
28
    TdDirEntryPtr deTmp = NULL;
×
29
    bool          empty = true;
×
30
    while ((deTmp = taosReadDir(pDirTmp)) != NULL) {
×
31
      if (strcmp(taosGetDirEntryName(deTmp), ".") == 0 || strcmp(taosGetDirEntryName(deTmp), "..") == 0) continue;
×
32
      empty = false;
×
33
    }
34
    if (empty) taosRemoveDir(filename);
×
35
    if (taosCloseDir(&pDirTmp) != 0) {
×
36
      uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
37
    }
38
  }
39

40
  if (taosCloseDir(&pDir) != 0) {
2!
41
    uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
42
  }
43
}
44

45
#ifdef WINDOWS
46
// C:\TDengine\data\backup\checkpoint\ -> /c/TDengine/data/backup/checkpoint/
47
static void changeDirFromWindowsToLinux(char* from, char* to) {
48
  to[0] = '/';
49
  to[1] = from[0];
50
  for (int32_t i = 2; i < strlen(from); i++) {
51
    if (from[i] == '\\') {
52
      to[i] = '/';
53
    } else {
54
      to[i] = from[i];
55
    }
56
  }
57
}
58
#endif
59

60
static int32_t generateConfigFile(char* confDir) {
2✔
61
  int32_t   code = 0;
2✔
62
  TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
2✔
63
  if (pFile == NULL) {
2!
64
    uError("[rsync] open conf file error, dir:%s," ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA);
×
65
    return terrno;
×
66
  }
67

68
#ifdef WINDOWS
69
  char path[PATH_MAX] = {0};
70
  changeDirFromWindowsToLinux(tsCheckpointBackupDir, path);
71
#endif
72

73
  char confContent[PATH_MAX * 4] = {0};
2✔
74
  snprintf(confContent, PATH_MAX * 4,
2✔
75
#ifndef WINDOWS
76
           "uid = root\n"
77
           "gid = root\n"
78
#endif
79
           "use chroot = false\n"
80
           "max connections = 200\n"
81
           "timeout = 100\n"
82
           "lock file = %srsync.lock\n"
83
           "log file = %srsync.log\n"
84
           "ignore errors = true\n"
85
           "read only = false\n"
86
           "list = false\n"
87
           "[checkpoint]\n"
88
           "path = %s",
89
           tsCheckpointBackupDir, tsCheckpointBackupDir,
90
#ifdef WINDOWS
91
           path
92
#else
93
           tsCheckpointBackupDir
94
#endif
95
  );
96
  uDebug("[rsync] conf:%s", confContent);
2!
97
  if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0) {
2!
98
    uError("[rsync] write conf file error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
99
    (void)taosCloseFile(&pFile);
×
100
    code = terrno;
×
101
    return code;
×
102
  }
103

104
  (void)taosCloseFile(&pFile);
2✔
105
  return 0;
2✔
106
}
107

108
static int32_t execCommand(char* command) {
2✔
109
  int32_t try = 3;
2✔
110
  int32_t code = 0;
2✔
111
  while (try-- > 0) {
8✔
112
    code = system(command);
6✔
113
    if (code == 0) {
6!
114
      break;
×
115
    }
116
    taosMsleep(10);
6✔
117
  }
118
  return code;
2✔
119
}
120

121
void stopRsync() {
4✔
122
  int32_t pid = 0;
4✔
123
  int32_t code = 0;
4✔
124
  char    buf[128] = {0};
4✔
125

126
#ifdef WINDOWS
127
  code = system("taskkill /f /im rsync.exe");
128
#else
129
  code = taosGetPIdByName("rsync", &pid);
4✔
130
  if (code == 0) {
4✔
131
    int32_t ret = tsnprintf(buf, tListLen(buf), "kill -9 %d", pid);
2✔
132
    if (ret > 0) {
2!
133
      uInfo("kill rsync program pid:%d", pid);
2!
134
      code = system(buf);
2✔
135
    }
136
  }
137
#endif
138

139
  if (code != 0) {
4✔
140
    uError("[rsync] stop rsync server failed," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
2!
141
  } else {
142
    uDebug("[rsync] stop rsync server successful");
2!
143
  }
144

145
  taosMsleep(500);  // sleep 500 ms to wait for the completion of kill operation.
4✔
146
}
4✔
147

148
int32_t startRsync() {
2✔
149
  int32_t code = 0;
2✔
150
  if (taosMulMkDir(tsCheckpointBackupDir) != 0) {
2!
151
    uError("[rsync] build checkpoint backup dir failed, path:%s," ERRNO_ERR_FORMAT, tsCheckpointBackupDir,
×
152
           ERRNO_ERR_DATA);
153
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
154
    return code;
×
155
  }
156

157
  removeEmptyDir();
2✔
158

159
  char confDir[PATH_MAX] = {0};
2✔
160
  snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir);
2✔
161

162
  code = generateConfigFile(confDir);
2✔
163
  if (code != 0) {
2!
164
    return code;
×
165
  }
166

167
  char cmd[PATH_MAX] = {0};
2✔
168
  snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir);
2✔
169
  // start rsync service to backup checkpoint
170
  code = system(cmd);
2✔
171
  if (code != 0) {
2!
172
    uError("[rsync] cmd:%s start server failed, code:%d," ERRNO_ERR_FORMAT, cmd, code, ERRNO_ERR_DATA);
×
173
    if (ERRNO == 0) {
×
174
      return 0;
×
175
    } else {
176
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
177
    }
178
  } else {
179
    uInfo("[rsync] cmd:%s start server successful", cmd);
2!
180
  }
181
  return code;
2✔
182
}
183

184
int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId) {
×
185
  int64_t st = taosGetTimestampMs();
×
186
  char    command[PATH_MAX] = {0};
×
187

188
#ifdef WINDOWS
189
  char pathTransform[PATH_MAX] = {0};
190
  changeDirFromWindowsToLinux(path, pathTransform);
191

192
  if (pathTransform[strlen(pathTransform) - 1] != '/') {
193
#else
194
  if (path[strlen(path) - 1] != '/') {
×
195
#endif
196
    snprintf(command, PATH_MAX,
×
197
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s/ "
198
             "rsync://%s/checkpoint/%s/",
199
             tsLogDir,
200
#ifdef WINDOWS
201
             pathTransform
202
#else
203
             path
204
#endif
205
             ,
206
             tsSnodeAddress, id);
207
  } else {
208
    snprintf(command, PATH_MAX,
×
209
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s "
210
             "rsync://%s/checkpoint/%s/",
211
             tsLogDir,
212
#ifdef WINDOWS
213
             pathTransform
214
#else
215
             path
216
#endif
217
             ,
218
             tsSnodeAddress, id);
219
  }
220

221
  // prepare the data directory
222
  int32_t code = execCommand(command);
×
223
  if (code != 0) {
×
224
    uError("[rsync] s-task:%s prepare checkpoint dir in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
×
225
           tsSnodeAddress, code, ERRNO_ERR_DATA);
226
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
227
  } else {
228
    int64_t el = (taosGetTimestampMs() - st);
×
229
    uDebug("[rsync] s-task:%s prepare checkpoint dir in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
×
230
           tsSnodeAddress, el);
231
  }
232

233
#ifdef WINDOWS
234
  memset(pathTransform, 0, PATH_MAX);
235
  changeDirFromWindowsToLinux(path, pathTransform);
236

237
  if (pathTransform[strlen(pathTransform) - 1] != '/') {
238
#else
239
  if (path[strlen(path) - 1] != '/') {
×
240
#endif
241
    snprintf(command, PATH_MAX,
×
242
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ "
243
             "rsync://%s/checkpoint/%s/%" PRId64 "/",
244
             tsLogDir,
245
#ifdef WINDOWS
246
             pathTransform
247
#else
248
             path
249
#endif
250
             ,
251
             tsSnodeAddress, id, checkpointId);
252
  } else {
253
    snprintf(command, PATH_MAX,
×
254
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s "
255
             "rsync://%s/checkpoint/%s/%" PRId64 "/",
256
             tsLogDir,
257
#ifdef WINDOWS
258
             pathTransform
259
#else
260
             path
261
#endif
262
             ,
263
             tsSnodeAddress, id, checkpointId);
264
  }
265

266
  code = execCommand(command);
×
267
  if (code != 0) {
×
268
    uError("[rsync] s-task:%s upload checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
×
269
           tsSnodeAddress, code, ERRNO_ERR_DATA);
270
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
271
  } else {
272
    int64_t el = (taosGetTimestampMs() - st);
×
273
    uDebug("[rsync] s-task:%s upload checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
×
274
           tsSnodeAddress, el);
275
  }
276

277
  return code;
×
278
}
279

280
// abort from retry if quit
281
int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId) {
1✔
282
  int64_t st = taosGetTimestampMs();
1✔
283
  int32_t times = 0;
1✔
284
  int32_t code = 0;
1✔
285

286
#ifdef WINDOWS
287
  char pathTransform[PATH_MAX] = {0};
288
  changeDirFromWindowsToLinux(path, pathTransform);
289
#endif
290

291
  char command[PATH_MAX] = {0};
1✔
292
  snprintf(
1✔
293
      command, PATH_MAX,
294
      "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/%" PRId64
295
      "/ %s",
296
      tsLogDir, tsSnodeAddress, id, checkpointId,
297
#ifdef WINDOWS
298
      pathTransform
299
#else
300
      path
301
#endif
302
  );
303

304
  uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command);
1!
305

306
  code = execCommand(command);
1✔
307
  if (code != TSDB_CODE_SUCCESS) {
1!
308
    uError("[rsync] %s download checkpointId:%" PRId64
1!
309
           " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
310
           id, checkpointId, path, times, code, ERRNO_ERR_DATA);
311
  } else {
312
    int32_t el = taosGetTimestampMs() - st;
×
313
    uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
×
314
           path, el);
315
  }
316

317
  if (code != TSDB_CODE_SUCCESS) {  // if failed, try to load it from data directory
1!
318
#ifdef WINDOWS
319
    memset(pathTransform, 0, PATH_MAX);
320
    changeDirFromWindowsToLinux(path, pathTransform);
321
#endif
322

323
    memset(command, 0, PATH_MAX);
1✔
324
    snprintf(
1✔
325
        command, PATH_MAX,
326
        "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
327
        tsLogDir, tsSnodeAddress, id,
328
#ifdef WINDOWS
329
        pathTransform
330
#else
331
        path
332
#endif
333
    );
334

335
    uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command);
1!
336

337
    code = execCommand(command);
1✔
338
    if (code != TSDB_CODE_SUCCESS) {
1!
339
      uError("[rsync] %s download checkpointId:%" PRId64
1!
340
             " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
341
             id, checkpointId, path, times, code, ERRNO_ERR_DATA);
342
      code = TAOS_SYSTEM_ERROR(code);
1✔
343
    } else {
344
      int32_t el = taosGetTimestampMs() - st;
×
345
      uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
×
346
             path, el);
347
    }
348
  }
349
  return code;
1✔
350
}
351

352
int32_t deleteRsync(const char* pTaskId, int64_t checkpointId) {
×
353
  char*   tmp = "./tmp_empty/";
×
354
  int32_t code = taosMkDir(tmp);
×
355
  if (code != 0) {
×
356
    uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
×
357
    return TAOS_SYSTEM_ERROR(ERRNO);
×
358
  }
359

360
  char command[PATH_MAX] = {0};
×
361
  snprintf(command, PATH_MAX,
×
362
           "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/%" PRId64
363
           "/",
364
           tsLogDir, tmp, tsSnodeAddress, pTaskId, checkpointId);
365

366
  code = execCommand(command);
×
367
  taosRemoveDir(tmp);
×
368
  if (code != 0) {
×
369
    uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
×
370
    return TAOS_SYSTEM_ERROR(ERRNO);
×
371
  }
372

373
  uInfo("[rsync] cmd:%s delete checkpoint remote backup data for task:%s, checkpointId:%" PRId64 " successful",
×
374
         command, pTaskId, checkpointId);
375
  return 0;
×
376
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc