• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.08
/source/common/src/rsync.c
1
//
2
// Created by mingming wanng on 2023/11/2.
3
//
4
#include "rsync.h"
5
#include <stdlib.h>
6
#include "tglobal.h"
7

8
#define ERRNO_ERR_FORMAT "errno:%d,msg:%s"
9
#define ERRNO_ERR_DATA   ERRNO, strerror(ERRNO)
10

11
// deleteRsync function produce empty directories, traverse base directory to remove them
UNCOV
12
static void removeEmptyDir() {
×
UNCOV
13
  TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir);
×
UNCOV
14
  if (pDir == NULL) return;
×
15

UNCOV
16
  TdDirEntryPtr de = NULL;
×
UNCOV
17
  while ((de = taosReadDir(pDir)) != NULL) {
×
UNCOV
18
    if (!taosDirEntryIsDir(de)) {
×
UNCOV
19
      continue;
×
20
    }
21

UNCOV
22
    if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
×
23

24
    char filename[PATH_MAX] = {0};
×
25
    snprintf(filename, sizeof(filename), "%s%s", tsCheckpointBackupDir, taosGetDirEntryName(de));
×
26

27
    TdDirPtr      pDirTmp = taosOpenDir(filename);
×
28
    TdDirEntryPtr deTmp = NULL;
×
29
    bool          empty = true;
×
30
    while ((deTmp = taosReadDir(pDirTmp)) != NULL) {
×
31
      if (strcmp(taosGetDirEntryName(deTmp), ".") == 0 || strcmp(taosGetDirEntryName(deTmp), "..") == 0) continue;
×
32
      empty = false;
×
33
    }
34
    if (empty) taosRemoveDir(filename);
×
35
    if (taosCloseDir(&pDirTmp) != 0) {
×
36
      uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
37
    }
38
  }
39

UNCOV
40
  if (taosCloseDir(&pDir) != 0) {
×
41
    uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
42
  }
43
}
44

45
#ifdef WINDOWS
46
// C:\TDengine\data\backup\checkpoint\ -> /c/TDengine/data/backup/checkpoint/
47
static void changeDirFromWindowsToLinux(char* from, char* to) {
48
  to[0] = '/';
49
  to[1] = from[0];
50
  for (int32_t i = 2; i < strlen(from); i++) {
51
    if (from[i] == '\\') {
52
      to[i] = '/';
53
    } else {
54
      to[i] = from[i];
55
    }
56
  }
57
}
58
#endif
59

UNCOV
60
static int32_t generateConfigFile(char* confDir) {
×
UNCOV
61
  int32_t   code = 0;
×
UNCOV
62
  TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
×
UNCOV
63
  if (pFile == NULL) {
×
64
    uError("[rsync] open conf file error, dir:%s," ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA);
×
65
    return terrno;
×
66
  }
67

68
#ifdef WINDOWS
69
  char path[PATH_MAX] = {0};
70
  changeDirFromWindowsToLinux(tsCheckpointBackupDir, path);
71
#endif
72

UNCOV
73
  char confContent[PATH_MAX * 4] = {0};
×
UNCOV
74
  snprintf(confContent, PATH_MAX * 4,
×
75
#ifndef WINDOWS
76
           "uid = root\n"
77
           "gid = root\n"
78
#endif
79
           "use chroot = false\n"
80
           "max connections = 200\n"
81
           "timeout = 100\n"
82
           "lock file = %srsync.lock\n"
83
           "log file = %srsync.log\n"
84
           "ignore errors = true\n"
85
           "read only = false\n"
86
           "list = false\n"
87
           "[checkpoint]\n"
88
           "path = %s",
89
           tsCheckpointBackupDir, tsCheckpointBackupDir,
90
#ifdef WINDOWS
91
           path
92
#else
93
           tsCheckpointBackupDir
94
#endif
95
  );
UNCOV
96
  uDebug("[rsync] conf:%s", confContent);
×
UNCOV
97
  if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0) {
×
98
    uError("[rsync] write conf file error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
99
    (void)taosCloseFile(&pFile);
×
100
    code = terrno;
×
101
    return code;
×
102
  }
103

UNCOV
104
  (void)taosCloseFile(&pFile);
×
UNCOV
105
  return 0;
×
106
}
107

108
static int32_t execCommand(char* command) {
2✔
109
  int32_t try = 3;
2✔
110
  int32_t code = 0;
2✔
111
  while (try-- > 0) {
8✔
112
    code = system(command);
6✔
113
    if (code == 0) {
6!
114
      break;
×
115
    }
116
    taosMsleep(10);
6✔
117
  }
118
  return code;
2✔
119
}
120

UNCOV
121
void stopRsync() {
×
UNCOV
122
  int32_t pid = 0;
×
UNCOV
123
  int32_t code = 0;
×
UNCOV
124
  char    buf[128] = {0};
×
125

126
#ifdef WINDOWS
127
  code = system("taskkill /f /im rsync.exe");
128
#else
UNCOV
129
  code = taosGetPIdByName("rsync", &pid);
×
UNCOV
130
  if (code == 0) {
×
UNCOV
131
    int32_t ret = tsnprintf(buf, tListLen(buf), "kill -9 %d", pid);
×
UNCOV
132
    if (ret > 0) {
×
UNCOV
133
      uInfo("kill rsync program pid:%d", pid);
×
UNCOV
134
      code = system(buf);
×
135
    }
136
  }
137
#endif
138

UNCOV
139
  if (code != 0) {
×
UNCOV
140
    uError("[rsync] stop rsync server failed," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
×
141
  } else {
UNCOV
142
    uDebug("[rsync] stop rsync server successful");
×
143
  }
144

UNCOV
145
  taosMsleep(500);  // sleep 500 ms to wait for the completion of kill operation.
×
UNCOV
146
}
×
147

UNCOV
148
int32_t startRsync() {
×
UNCOV
149
  int32_t code = 0;
×
UNCOV
150
  if (taosMulMkDir(tsCheckpointBackupDir) != 0) {
×
151
    uError("[rsync] build checkpoint backup dir failed, path:%s," ERRNO_ERR_FORMAT, tsCheckpointBackupDir,
×
152
           ERRNO_ERR_DATA);
153
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
154
    return code;
×
155
  }
156

UNCOV
157
  removeEmptyDir();
×
158

UNCOV
159
  char confDir[PATH_MAX] = {0};
×
UNCOV
160
  snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir);
×
161

UNCOV
162
  code = generateConfigFile(confDir);
×
UNCOV
163
  if (code != 0) {
×
164
    return code;
×
165
  }
166

UNCOV
167
  char cmd[PATH_MAX] = {0};
×
UNCOV
168
  snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir);
×
169
  // start rsync service to backup checkpoint
UNCOV
170
  code = system(cmd);
×
UNCOV
171
  if (code != 0) {
×
172
    uError("[rsync] cmd:%s start server failed, code:%d," ERRNO_ERR_FORMAT, cmd, code, ERRNO_ERR_DATA);
×
173
    if (ERRNO == 0) {
×
174
      return 0;
×
175
    } else {
176
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
177
    }
178
  } else {
UNCOV
179
    uInfo("[rsync] cmd:%s start server successful", cmd);
×
180
  }
UNCOV
181
  return code;
×
182
}
183

184
int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId) {
×
185
  int64_t st = taosGetTimestampMs();
×
186
  char    command[PATH_MAX] = {0};
×
187

188
#ifdef WINDOWS
189
  char pathTransform[PATH_MAX] = {0};
190
  changeDirFromWindowsToLinux(path, pathTransform);
191

192
  if (pathTransform[strlen(pathTransform) - 1] != '/') {
193
#else
194
  if (path[strlen(path) - 1] != '/') {
×
195
#endif
196
    snprintf(command, PATH_MAX,
×
197
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s/ "
198
             "rsync://%s/checkpoint/%s/",
199
             tsLogDir,
200
#ifdef WINDOWS
201
             pathTransform
202
#else
203
             path
204
#endif
205
             ,
206
             tsSnodeAddress, id);
207
  } else {
208
    snprintf(command, PATH_MAX,
×
209
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s "
210
             "rsync://%s/checkpoint/%s/",
211
             tsLogDir,
212
#ifdef WINDOWS
213
             pathTransform
214
#else
215
             path
216
#endif
217
             ,
218
             tsSnodeAddress, id);
219
  }
220

221
  // prepare the data directory
222
  int32_t code = execCommand(command);
×
223
  if (code != 0) {
×
224
    uError("[rsync] s-task:%s prepare checkpoint dir in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
×
225
           tsSnodeAddress, code, ERRNO_ERR_DATA);
226
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
227
  } else {
228
    int64_t el = (taosGetTimestampMs() - st);
×
229
    uDebug("[rsync] s-task:%s prepare checkpoint dir in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
×
230
           tsSnodeAddress, el);
231
  }
232

233
#ifdef WINDOWS
234
  memset(pathTransform, 0, PATH_MAX);
235
  changeDirFromWindowsToLinux(path, pathTransform);
236

237
  if (pathTransform[strlen(pathTransform) - 1] != '/') {
238
#else
239
  if (path[strlen(path) - 1] != '/') {
×
240
#endif
241
    snprintf(command, PATH_MAX,
×
242
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ "
243
             "rsync://%s/checkpoint/%s/%" PRId64 "/",
244
             tsLogDir,
245
#ifdef WINDOWS
246
             pathTransform
247
#else
248
             path
249
#endif
250
             ,
251
             tsSnodeAddress, id, checkpointId);
252
  } else {
253
    snprintf(command, PATH_MAX,
×
254
             "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s "
255
             "rsync://%s/checkpoint/%s/%" PRId64 "/",
256
             tsLogDir,
257
#ifdef WINDOWS
258
             pathTransform
259
#else
260
             path
261
#endif
262
             ,
263
             tsSnodeAddress, id, checkpointId);
264
  }
265

266
  code = execCommand(command);
×
267
  if (code != 0) {
×
268
    uError("[rsync] s-task:%s upload checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
×
269
           tsSnodeAddress, code, ERRNO_ERR_DATA);
270
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
271
  } else {
272
    int64_t el = (taosGetTimestampMs() - st);
×
273
    uDebug("[rsync] s-task:%s upload checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
×
274
           tsSnodeAddress, el);
275
  }
276

277
  return code;
×
278
}
279

280
// abort from retry if quit
281
int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId) {
1✔
282
  int64_t st = taosGetTimestampMs();
1✔
283
  int32_t MAX_RETRY = 10;
1✔
284
  int32_t times = 0;
1✔
285
  int32_t code = 0;
1✔
286

287
#ifdef WINDOWS
288
  char pathTransform[PATH_MAX] = {0};
289
  changeDirFromWindowsToLinux(path, pathTransform);
290
#endif
291

292
  char command[PATH_MAX] = {0};
1✔
293
  snprintf(
1✔
294
      command, PATH_MAX,
295
      "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/%" PRId64
296
      "/ %s",
297
      tsLogDir, tsSnodeAddress, id, checkpointId,
298
#ifdef WINDOWS
299
      pathTransform
300
#else
301
      path
302
#endif
303
  );
304

305
  uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command);
1!
306

307
  code = execCommand(command);
1✔
308
  if (code != TSDB_CODE_SUCCESS) {
1!
309
    uError("[rsync] %s download checkpointId:%" PRId64
1!
310
           " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
311
           id, checkpointId, path, times, code, ERRNO_ERR_DATA);
312
  } else {
313
    int32_t el = taosGetTimestampMs() - st;
×
314
    uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
×
315
           path, el);
316
  }
317

318
  if (code != TSDB_CODE_SUCCESS) {  // if failed, try to load it from data directory
1!
319
#ifdef WINDOWS
320
    memset(pathTransform, 0, PATH_MAX);
321
    changeDirFromWindowsToLinux(path, pathTransform);
322
#endif
323

324
    memset(command, 0, PATH_MAX);
1✔
325
    snprintf(
1✔
326
        command, PATH_MAX,
327
        "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
328
        tsLogDir, tsSnodeAddress, id,
329
#ifdef WINDOWS
330
        pathTransform
331
#else
332
        path
333
#endif
334
    );
335

336
    uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command);
1!
337

338
    code = execCommand(command);
1✔
339
    if (code != TSDB_CODE_SUCCESS) {
1!
340
      uError("[rsync] %s download checkpointId:%" PRId64
1!
341
             " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
342
             id, checkpointId, path, times, code, ERRNO_ERR_DATA);
343
      code = TAOS_SYSTEM_ERROR(code);
1✔
344
    } else {
345
      int32_t el = taosGetTimestampMs() - st;
×
346
      uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
×
347
             path, el);
348
    }
349
  }
350
  return code;
1✔
351
}
352

353
int32_t deleteRsync(const char* id) {
×
354
  char*   tmp = "./tmp_empty/";
×
355
  int32_t code = taosMkDir(tmp);
×
356
  if (code != 0) {
×
357
    uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
×
358
    return TAOS_SYSTEM_ERROR(ERRNO);
×
359
  }
360

361
  char command[PATH_MAX] = {0};
×
362
  snprintf(command, PATH_MAX,
×
363
           "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/data/",
364
           tsLogDir, tmp, tsSnodeAddress, id);
365

366
  code = execCommand(command);
×
367
  taosRemoveDir(tmp);
×
368
  if (code != 0) {
×
369
    uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
×
370
    return TAOS_SYSTEM_ERROR(ERRNO);
×
371
  }
372

373
  uDebug("[rsync] delete data:%s successful", id);
×
374
  return 0;
×
375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc