• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4761

28 Sep 2025 10:49AM UTC coverage: 57.837% (-1.0%) from 58.866%
#4761

push

travis-ci

web-flow
merge: set version (#33122)

136913 of 302095 branches covered (45.32%)

Branch coverage included in aggregate %.

207750 of 293830 relevant lines covered (70.7%)

5673932.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.86
/source/dnode/vnode/src/vnd/vnodeRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "vnd.h"
17

18
extern int32_t tsdbAsyncRetention(STsdb *tsdb, STimeWindow tw, int8_t optrType, int8_t triggerType);
19
extern int32_t tsdbListSsMigrateFileSets(STsdb *tsdb, SArray *fidArr);
20
extern int32_t tsdbAsyncSsMigrateFileSet(STsdb *tsdb, SSsMigrateFileSetReq *pReq);
21
extern int32_t tsdbQuerySsMigrateProgress(STsdb *tsdb, SSsMigrateProgress *pProgress);
22
extern int32_t tsdbUpdateSsMigrateProgress(STsdb *tsdb, SSsMigrateProgress *pProgress);
23
extern void    tsdbStopSsMigrateTask(STsdb *tsdb, int32_t ssMigrateId);
24
extern int32_t tsdbRetentionMonitorGetInfo(STsdb *tsdb, SQueryRetentionProgressRsp *rsp);
25

26
int32_t vnodeAsyncRetention(SVnode *pVnode, STimeWindow tw, int8_t optrType, int8_t triggerType) {
47✔
27
  // async retention
28
  return tsdbAsyncRetention(pVnode->pTsdb, tw, optrType, triggerType);
47✔
29
}
30

31

32

33
int32_t vnodeQuerySsMigrateProgress(SVnode *pVnode, SRpcMsg *pMsg) {
×
34
#ifdef USE_SHARED_STORAGE
35

36
  int32_t code = 0;
×
37

38
  SSsMigrateProgress req = {0};
×
39

40
  int32_t                  rspSize = 0;
×
41
  SRpcMsg                  rspMsg = {0};
×
42
  void                    *pRsp = NULL;
×
43

44
  char* buf = (char*)pMsg->pCont + sizeof(SMsgHead);
×
45
  code = tDeserializeSSsMigrateProgress(buf, pMsg->contLen - sizeof(SMsgHead), &req);
×
46
  if (code) {
×
47
    code = TSDB_CODE_INVALID_MSG;
×
48
    goto _exit;
×
49
  }
50

51
  vDebug("vgId:%d, ssMigrateId:%d, processing query ss migrate progress request", req.vgId, req.ssMigrateId);
×
52
  SSsMigrateProgress rsp = req;
×
53
  code = tsdbQuerySsMigrateProgress(pVnode->pTsdb, &rsp);
×
54
  if (code) {
×
55
    goto _exit;
×
56
  }
57

58
  rspSize = tSerializeSSsMigrateProgress(NULL, 0, &rsp);
×
59
  pRsp = rpcMallocCont(rspSize);
×
60
  if (pRsp == NULL) {
×
61
    code = TSDB_CODE_OUT_OF_MEMORY;
×
62
    vError("vgId:%d, ssMigrateId:%d, failed to allocate response buffer since %s", req.vgId, req.ssMigrateId, tstrerror(code));
×
63
    rspSize = 0;
×
64
    goto _exit;
×
65
  }
66
  TAOS_UNUSED(tSerializeSSsMigrateProgress(pRsp, rspSize, &rsp));
×
67

68
_exit:
×
69
  rspMsg.info = pMsg->info;
×
70
  rspMsg.pCont = pRsp;
×
71
  rspMsg.contLen = rspSize;
×
72
  rspMsg.code = code;
×
73
  rspMsg.msgType = TDMT_VND_QUERY_SSMIGRATE_PROGRESS_RSP;
×
74

75
  tmsgSendRsp(&rspMsg);
×
76
  return 0;
×
77

78
#else
79
  return TSDB_CODE_OPS_NOT_SUPPORT;
80
#endif
81
}
82

83

84

85
int32_t vnodeListSsMigrateFileSets(SVnode *pVnode, SRpcMsg *pMsg) {
×
86
#ifdef USE_SHARED_STORAGE
87

88
  int32_t code = 0, vgId = TD_VID(pVnode);
×
89
  SListSsMigrateFileSetsReq req = {0};
×
90
  SArray* fidArr = NULL;
×
91

92
  int32_t                  rspSize = 0;
×
93
  SRpcMsg                  rspMsg = {0};
×
94
  void                    *pRsp = NULL;
×
95
  SListSsMigrateFileSetsRsp rsp = {0};
×
96

97
  // deserialize request
98
  char* buf = (char*)pMsg->pCont + sizeof(SMsgHead);
×
99
  code = tDeserializeSListSsMigrateFileSetsReq(buf, pMsg->contLen - sizeof(SMsgHead), &req);
×
100
  if (code) {
×
101
    vError("vgId:%d, failed to deserialize ss migrate query file sets request since %s", vgId, tstrerror(code));
×
102
    goto _exit;
×
103
  }
104

105
  fidArr = taosArrayInit(10, sizeof(int32_t));
×
106
  if (fidArr == NULL) {
×
107
    code = terrno;
×
108
    vError("vgId:%d, failed to initialize file set id array since %s", TD_VID(pVnode), tstrerror(code));
×
109
    goto _exit;
×
110
  }
111

112
  code = tsdbListSsMigrateFileSets(pVnode->pTsdb, fidArr);
×
113
  if (code != TSDB_CODE_SUCCESS) {
×
114
    vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
×
115
    goto _exit;
×
116
  }
117

118
  rsp.ssMigrateId = req.ssMigrateId;
×
119
  rsp.vgId = vgId;
×
120
  rsp.pFileSets = fidArr;
×
121
  rspSize = tSerializeSListSsMigrateFileSetsRsp(NULL, 0, &rsp);
×
122
  pRsp = rpcMallocCont(rspSize);
×
123
  if (pRsp == NULL) {
×
124
    code = TSDB_CODE_OUT_OF_MEMORY;
×
125
    vError("vgId:%d, failed to allocate response buffer of size %d since %s", vgId, rspSize, tstrerror(code));
×
126
    rspSize = 0;
×
127
    goto _exit;
×
128
  }
129
  TAOS_UNUSED(tSerializeSListSsMigrateFileSetsRsp(pRsp, rspSize, &rsp));
×
130

131
_exit:
×
132
  taosArrayDestroy(fidArr);
×
133
  rspMsg.info = pMsg->info;
×
134
  rspMsg.pCont = pRsp;
×
135
  rspMsg.contLen = rspSize;
×
136
  rspMsg.code = code;
×
137
  rspMsg.msgType = TDMT_VND_LIST_SSMIGRATE_FILESETS_RSP;
×
138

139
  tmsgSendRsp(&rspMsg);
×
140
  return 0;
×
141

142
#else
143
  return TSDB_CODE_OPS_NOT_SUPPORT;
144
#endif
145
}
146

147

148

149
int32_t vnodeAsyncSsMigrateFileSet(SVnode *pVnode, SSsMigrateFileSetReq *pReq) {
×
150
  // async migration
151
#ifdef USE_SHARED_STORAGE
152
  if (tsSsEnabled) {
×
153
    return tsdbAsyncSsMigrateFileSet(pVnode->pTsdb, pReq);
×
154
  }
155
#endif
156
  return TSDB_CODE_OPS_NOT_SUPPORT;
×
157
}
158

159

160

161
int32_t vnodeFollowerSsMigrate(SVnode *pVnode, SSsMigrateProgress *pReq) {
×
162
#ifdef USE_SHARED_STORAGE
163
  return tsdbUpdateSsMigrateProgress(pVnode->pTsdb, pReq);
×
164
#else
165
  return TSDB_CODE_OPS_NOT_SUPPORT;
166
#endif
167
}
168

169

170

171
extern int32_t vnodeKillSsMigrate(SVnode *pVnode, SVnodeKillSsMigrateReq *pReq) {
×
172
#ifdef USE_SHARED_STORAGE
173
  tsdbStopSsMigrateTask(pVnode->pTsdb, pReq->ssMigrateId);
×
174
  return TSDB_CODE_SUCCESS;
×
175
#else
176
  return TSDB_CODE_OPS_NOT_SUPPORT;
177
#endif
178
}
179

180
int32_t vnodeQueryRetentionProgress(SVnode *pVnode, SRpcMsg *pMsg) {
97✔
181
  int32_t                    code = 0;
97✔
182
  SQueryRetentionProgressReq req = {0};
97✔
183
  int32_t                    rspSize = 0;
97✔
184
  SRpcMsg                    rspMsg = {0};
97✔
185
  void                      *pRsp = NULL;
97✔
186
  SQueryRetentionProgressRsp rsp = {0}; // same as SQueryCompactProgressRsp
97✔
187

188
  code = tDeserializeSQueryCompactProgressReq(pMsg->pCont, pMsg->contLen, &req);
97✔
189
  if (code) {
97!
190
    code = TSDB_CODE_INVALID_MSG;
×
191
    goto _exit;
×
192
  }
193

194
  rsp.dnodeId = req.dnodeId;
97✔
195
  TAOS_UNUSED(tsdbRetentionMonitorGetInfo(pVnode->pTsdb, &rsp));
97✔
196
  vInfo("update retention progress, id:%d vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", rsp.id, rsp.vgId,
97!
197
        rsp.dnodeId, rsp.numberFileset, rsp.finished);
198
  rsp.id = req.id;
97✔
199

200
  rspSize = tSerializeSQueryCompactProgressRsp(NULL, 0, &rsp);
97✔
201
  if (rspSize < 0) {
97!
202
    code = TSDB_CODE_INVALID_MSG;
×
203
    goto _exit;
×
204
  }
205
  if (!(pRsp = rpcMallocCont(rspSize))) {
97!
206
    code = TSDB_CODE_OUT_OF_MEMORY;
×
207
    goto _exit;
×
208
  }
209
  if ((code = tSerializeSQueryCompactProgressRsp(pRsp, rspSize, &rsp)) < 0) {
97!
210
    goto _exit;
×
211
  }
212
  code = 0; // set to 0 since tSerializeSQueryCompactProgressRsp may return the rspSize
97✔
213
_exit:
97✔
214
  rspMsg.info = pMsg->info;
97✔
215
  rspMsg.pCont = pRsp;
97✔
216
  rspMsg.contLen = rspSize;
97✔
217
  rspMsg.code = code;
97✔
218
  rspMsg.msgType = TDMT_VND_QUERY_TRIM_PROGRESS_RSP;
97✔
219
  tmsgSendRsp(&rspMsg);
97✔
220

221
  return 0;
97✔
222
}
223

224
extern void tsdbStopAllRetentionTask(STsdb *tsdb);
225

226
int32_t vnodeProcessKillRetentionReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
×
227
  SVKillRetentionReq req = {0};  // same as SVKillCompactReq
×
228

229
  vDebug("vgId:%d, kill retention msg will be processed, pReq:%p, len:%d", TD_VID(pVnode), pReq, len);
×
230
  int32_t code = tDeserializeSVKillCompactReq(pReq, len, &req);
×
231
  if (code) {
×
232
    return TSDB_CODE_INVALID_MSG;
×
233
  }
234
  vInfo("vgId:%d, kill retention msg will be processed, taskId:%d, dnodeId:%d, vgId:%d", TD_VID(pVnode), req.taskId,
×
235
        req.dnodeId, req.vgId);
236

237
  tsdbStopAllRetentionTask(pVnode->pTsdb);
×
238

239
  pRsp->msgType = TDMT_VND_KILL_TRIM_RSP;
×
240
  pRsp->code = TSDB_CODE_SUCCESS;
×
241
  pRsp->pCont = NULL;
×
242
  pRsp->contLen = 0;
×
243

244
  return 0;
×
245
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc