• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3529

14 Nov 2024 01:56PM UTC coverage: 60.888% (-0.02%) from 60.905%
#3529

push

travis-ci

web-flow
Merge pull request #28764 from taosdata/docs/TS-4937

doc(arch/last): new section for last/last_row cache

119990 of 252020 branches covered (47.61%)

Branch coverage included in aggregate %.

200800 of 274829 relevant lines covered (73.06%)

15624555.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.41
/source/common/src/rsync.c
1
//
2
// Created by mingming wanng on 2023/11/2.
3
//
4
#include "rsync.h"
5
#include <stdlib.h>
6
#include "tglobal.h"
7

8
#define ERRNO_ERR_FORMAT "errno:%d,msg:%s"
9
#define ERRNO_ERR_DATA   errno, strerror(errno)
10

11
// deleteRsync function produce empty directories, traverse base directory to remove them
12
static void removeEmptyDir() {
35✔
13
  TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir);
35✔
14
  if (pDir == NULL) return;
35!
15

16
  TdDirEntryPtr de = NULL;
35✔
17
  while ((de = taosReadDir(pDir)) != NULL) {
171✔
18
    if (!taosDirEntryIsDir(de)) {
136✔
19
      continue;
136✔
20
    }
21

22
    if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
70!
23

24
    char filename[PATH_MAX] = {0};
×
25
    snprintf(filename, sizeof(filename), "%s%s", tsCheckpointBackupDir, taosGetDirEntryName(de));
×
26

27
    TdDirPtr      pDirTmp = taosOpenDir(filename);
×
28
    TdDirEntryPtr deTmp = NULL;
×
29
    bool          empty = true;
×
30
    while ((deTmp = taosReadDir(pDirTmp)) != NULL) {
×
31
      if (strcmp(taosGetDirEntryName(deTmp), ".") == 0 || strcmp(taosGetDirEntryName(deTmp), "..") == 0) continue;
×
32
      empty = false;
×
33
    }
34
    if (empty) taosRemoveDir(filename);
×
35
    if (taosCloseDir(&pDirTmp) != 0) {
×
36
      uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
37
    }
38
  }
39

40
  if (taosCloseDir(&pDir) != 0) {
35!
41
    uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
42
  }
43
}
44

45
#ifdef WINDOWS
46
// C:\TDengine\data\backup\checkpoint\ -> /c/TDengine/data/backup/checkpoint/
47
static void changeDirFromWindowsToLinux(char* from, char* to) {
48
  to[0] = '/';
49
  to[1] = from[0];
50
  for (int32_t i = 2; i < strlen(from); i++) {
51
    if (from[i] == '\\') {
52
      to[i] = '/';
53
    } else {
54
      to[i] = from[i];
55
    }
56
  }
57
}
58
#endif
59

60
static int32_t generateConfigFile(char* confDir) {
35✔
61
  int32_t   code = 0;
35✔
62
  TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
35✔
63
  if (pFile == NULL) {
35!
64
    uError("[rsync] open conf file error, dir:%s," ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA);
×
65
    return terrno;
×
66
  }
67

68
#ifdef WINDOWS
69
  char path[PATH_MAX] = {0};
70
  changeDirFromWindowsToLinux(tsCheckpointBackupDir, path);
71
#endif
72

73
  char confContent[PATH_MAX * 4] = {0};
35✔
74
  snprintf(confContent, PATH_MAX * 4,
35✔
75
#ifndef WINDOWS
76
           "uid = root\n"
77
           "gid = root\n"
78
#endif
79
           "use chroot = false\n"
80
           "max connections = 200\n"
81
           "timeout = 100\n"
82
           "lock file = %srsync.lock\n"
83
           "log file = %srsync.log\n"
84
           "ignore errors = true\n"
85
           "read only = false\n"
86
           "list = false\n"
87
           "[checkpoint]\n"
88
           "path = %s",
89
           tsCheckpointBackupDir, tsCheckpointBackupDir,
90
#ifdef WINDOWS
91
           path
92
#else
93
           tsCheckpointBackupDir
94
#endif
95
  );
96
  uDebug("[rsync] conf:%s", confContent);
35!
97
  if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0) {
35!
98
    uError("[rsync] write conf file error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
99
    (void)taosCloseFile(&pFile);
×
100
    code = terrno;
×
101
    return code;
×
102
  }
103

104
  (void)taosCloseFile(&pFile);
35✔
105
  return 0;
35✔
106
}
107

108
static int32_t execCommand(char* command) {
×
109
  int32_t try = 3;
×
110
  int32_t code = 0;
×
111
  while (try-- > 0) {
×
112
    code = system(command);
×
113
    if (code == 0) {
×
114
      break;
×
115
    }
116
    taosMsleep(10);
×
117
  }
118
  return code;
×
119
}
120

121
void stopRsync() {
70✔
122
  int32_t pid = 0;
70✔
123
  int32_t code = 0;
70✔
124
  char    buf[128] = {0};
70✔
125

126
#ifdef WINDOWS
127
  code = system("taskkill /f /im rsync.exe");
128
#else
129
  code = taosGetPIdByName("rsync", &pid);
70✔
130
  if (code == 0) {
70✔
131
    int32_t ret = tsnprintf(buf, tListLen(buf), "kill -9 %d", pid);
36✔
132
    if (ret > 0) {
36!
133
      uInfo("kill rsync program pid:%d", pid);
36!
134
      code = system(buf);
36✔
135
    }
136
  }
137
#endif
138

139
  if (code != 0) {
70✔
140
    uError("[rsync] stop rsync server failed," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
34!
141
  } else {
142
    uDebug("[rsync] stop rsync server successful");
36!
143
  }
144

145
  taosMsleep(500);  // sleep 500 ms to wait for the completion of kill operation.
70✔
146
}
70✔
147

148
int32_t startRsync() {
35✔
149
  int32_t code = 0;
35✔
150
  if (taosMulMkDir(tsCheckpointBackupDir) != 0) {
35!
151
    uError("[rsync] build checkpoint backup dir failed, path:%s," ERRNO_ERR_FORMAT, tsCheckpointBackupDir,
×
152
           ERRNO_ERR_DATA);
153
    code = TAOS_SYSTEM_ERROR(errno);
×
154
    return code;
×
155
  }
156

157
  removeEmptyDir();
35✔
158

159
  char confDir[PATH_MAX] = {0};
35✔
160
  snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir);
35✔
161

162
  code = generateConfigFile(confDir);
35✔
163
  if (code != 0) {
35!
164
    return code;
×
165
  }
166

167
  char cmd[PATH_MAX] = {0};
35✔
168
  snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir);
35✔
169
  // start rsync service to backup checkpoint
170
  code = system(cmd);
35✔
171
  if (code != 0) {
35!
172
    uError("[rsync] cmd:%s start server failed, code:%d," ERRNO_ERR_FORMAT, cmd, code, ERRNO_ERR_DATA);
×
173
    if (errno == 0) {
×
174
      return 0;
×
175
    } else {
176
      code = TAOS_SYSTEM_ERROR(errno);
×
177
    }
178
  } else {
179
    uInfo("[rsync] cmd:%s start server successful", cmd);
35!
180
  }
181
  return code;
35✔
182
}
183

184
int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId) {
×
185
  int64_t st = taosGetTimestampMs();
×
186
  char    command[PATH_MAX] = {0};
×
187

188
#ifdef WINDOWS
189
  char pathTransform[PATH_MAX] = {0};
190
  changeDirFromWindowsToLinux(path, pathTransform);
191

192
  if (pathTransform[strlen(pathTransform) - 1] != '/') {
193
#else
194
  if (path[strlen(path) - 1] != '/') {
×
195
#endif
196
    snprintf(command, PATH_MAX,
×
197
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s/ "
198
             "rsync://%s/checkpoint/%s/",
199
             tsLogDir,
200
#ifdef WINDOWS
201
             pathTransform
202
#else
203
             path
204
#endif
205
             ,
206
             tsSnodeAddress, id);
207
  } else {
208
    snprintf(command, PATH_MAX,
×
209
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s "
210
             "rsync://%s/checkpoint/%s/",
211
             tsLogDir,
212
#ifdef WINDOWS
213
             pathTransform
214
#else
215
             path
216
#endif
217
             ,
218
             tsSnodeAddress, id);
219
  }
220

221
  // prepare the data directory
222
  int32_t code = execCommand(command);
×
223
  if (code != 0) {
×
224
    uError("[rsync] s-task:%s prepare checkpoint dir in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
×
225
           tsSnodeAddress, code, ERRNO_ERR_DATA);
226
    code = TAOS_SYSTEM_ERROR(errno);
×
227
  } else {
228
    int64_t el = (taosGetTimestampMs() - st);
×
229
    uDebug("[rsync] s-task:%s prepare checkpoint dir in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
×
230
           tsSnodeAddress, el);
231
  }
232

233
#ifdef WINDOWS
234
  memset(pathTransform, 0, PATH_MAX);
235
  changeDirFromWindowsToLinux(path, pathTransform);
236

237
  if (pathTransform[strlen(pathTransform) - 1] != '/') {
238
#else
239
  if (path[strlen(path) - 1] != '/') {
×
240
#endif
241
    snprintf(command, PATH_MAX,
×
242
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ "
243
             "rsync://%s/checkpoint/%s/%" PRId64 "/",
244
             tsLogDir,
245
#ifdef WINDOWS
246
             pathTransform
247
#else
248
             path
249
#endif
250
             ,
251
             tsSnodeAddress, id, checkpointId);
252
  } else {
253
    snprintf(command, PATH_MAX,
×
254
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s "
255
             "rsync://%s/checkpoint/%s/%" PRId64 "/",
256
             tsLogDir,
257
#ifdef WINDOWS
258
             pathTransform
259
#else
260
             path
261
#endif
262
             ,
263
             tsSnodeAddress, id, checkpointId);
264
  }
265

266
  code = execCommand(command);
×
267
  if (code != 0) {
×
268
    uError("[rsync] s-task:%s upload checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
×
269
           tsSnodeAddress, code, ERRNO_ERR_DATA);
270
    code = TAOS_SYSTEM_ERROR(errno);
×
271
  } else {
272
    int64_t el = (taosGetTimestampMs() - st);
×
273
    uDebug("[rsync] s-task:%s upload checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
×
274
           tsSnodeAddress, el);
275
  }
276

277
  return code;
×
278
}
279

280
// abort from retry if quit
281
int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId) {
×
282
  int64_t st = taosGetTimestampMs();
×
283
  int32_t MAX_RETRY = 10;
×
284
  int32_t times = 0;
×
285
  int32_t code = 0;
×
286

287
#ifdef WINDOWS
288
  char pathTransform[PATH_MAX] = {0};
289
  changeDirFromWindowsToLinux(path, pathTransform);
290
#endif
291

292
  char command[PATH_MAX] = {0};
×
293
  snprintf(
×
294
      command, PATH_MAX,
295
      "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/%" PRId64
296
      "/ %s",
297
      tsLogDir, tsSnodeAddress, id, checkpointId,
298
#ifdef WINDOWS
299
      pathTransform
300
#else
301
      path
302
#endif
303
  );
304

305
  uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command);
×
306

307
  code = execCommand(command);
×
308
  if (code != TSDB_CODE_SUCCESS) {
×
309
    uError("[rsync] %s download checkpointId:%" PRId64
×
310
           " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
311
           id, checkpointId, path, times, code, ERRNO_ERR_DATA);
312
  } else {
313
    int32_t el = taosGetTimestampMs() - st;
×
314
    uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
×
315
           path, el);
316
  }
317

318
  if (code != TSDB_CODE_SUCCESS) {  // if failed, try to load it from data directory
×
319
#ifdef WINDOWS
320
    memset(pathTransform, 0, PATH_MAX);
321
    changeDirFromWindowsToLinux(path, pathTransform);
322
#endif
323

324
    memset(command, 0, PATH_MAX);
×
325
    snprintf(
×
326
        command, PATH_MAX,
327
        "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
328
        tsLogDir, tsSnodeAddress, id,
329
#ifdef WINDOWS
330
        pathTransform
331
#else
332
        path
333
#endif
334
    );
335

336
    uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command);
×
337

338
    code = execCommand(command);
×
339
    if (code != TSDB_CODE_SUCCESS) {
×
340
      uError("[rsync] %s download checkpointId:%" PRId64
×
341
             " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
342
             id, checkpointId, path, times, code, ERRNO_ERR_DATA);
343
      code = TAOS_SYSTEM_ERROR(code);
×
344
    } else {
345
      int32_t el = taosGetTimestampMs() - st;
×
346
      uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
×
347
             path, el);
348
    }
349
  }
350
  return code;
×
351
}
352

353
int32_t deleteRsync(const char* id) {
×
354
  char*   tmp = "./tmp_empty/";
×
355
  int32_t code = taosMkDir(tmp);
×
356
  if (code != 0) {
×
357
    uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
×
358
    return TAOS_SYSTEM_ERROR(errno);
×
359
  }
360

361
  char command[PATH_MAX] = {0};
×
362
  snprintf(command, PATH_MAX,
×
363
           "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/data/",
364
           tsLogDir, tmp, tsSnodeAddress, id);
365

366
  code = execCommand(command);
×
367
  taosRemoveDir(tmp);
×
368
  if (code != 0) {
×
369
    uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
×
370
    return TAOS_SYSTEM_ERROR(errno);
×
371
  }
372

373
  uDebug("[rsync] delete data:%s successful", id);
×
374
  return 0;
×
375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc