• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.41
/source/dnode/vnode/src/tq/tqSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "tdbInt.h"
18
#include "tq.h"
19

20
// STqSnapReader ========================================
21
struct STqSnapReader {
22
  STQ*    pTq;
23
  int64_t sver;
24
  int64_t ever;
25
  TBC*    pCur;
26
  int8_t  type;
27
};
28

29
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) {
1,632✔
30
  int      code  = TSDB_CODE_SUCCESS;
1,632✔
31
  int32_t  lino  = 0;
1,632✔
32
  STqSnapReader* pReader = NULL;
1,632✔
33
  TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG);
1,632!
34
  TSDB_CHECK_NULL(ppReader, code, lino, end, TSDB_CODE_INVALID_MSG);
1,632!
35

36
  // alloc
37
  pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
1,632!
38
  TSDB_CHECK_NULL(pReader, code, lino, end, terrno);
1,632!
39

40
  pReader->pTq = pTq;
1,632✔
41
  pReader->sver = sver;
1,632✔
42
  pReader->ever = ever;
1,632✔
43
  pReader->type = type;
1,632✔
44

45
  // impl
46
  TTB* pTb = NULL;
1,632✔
47
  if (type == SNAP_DATA_TQ_CHECKINFO) {
1,632✔
48
    pTb = pTq->pCheckStore;
544✔
49
  } else if (type == SNAP_DATA_TQ_HANDLE) {
1,088✔
50
    pTb = pTq->pExecStore;
544✔
51
  } else if (type == SNAP_DATA_TQ_OFFSET) {
544!
52
    pTb = pTq->pOffsetStore;
544✔
53
  } else {
54
    code = TSDB_CODE_INVALID_MSG;
×
55
    goto end;
×
56
  }
57
  code = tdbTbcOpen(pTb, &pReader->pCur, NULL);
1,632✔
58
  TSDB_CHECK_CODE(code, lino, end);
1,632!
59
  code = tdbTbcMoveToFirst(pReader->pCur);
1,632✔
60
  TSDB_CHECK_CODE(code, lino, end);
1,632!
61
  tqInfo("vgId:%d, vnode tq snapshot reader open success", TD_VID(pTq->pVnode));
1,632!
62
  *ppReader = pReader;
1,632✔
63

64
end:
1,632✔
65
  if (code != 0){
1,632!
66
    tqError("%s failed at %d, vnode tq snapshot reader open failed since %s", __func__, lino, tstrerror(code));
×
67
    taosMemoryFreeClear(pReader);
×
68
  }
69

70
  return code;
1,632✔
71
}
72

73
void tqSnapReaderClose(STqSnapReader** ppReader) {
1,632✔
74
  if (ppReader == NULL || *ppReader == NULL) {
1,632!
75
    return;
×
76
  }
77
  tdbTbcClose((*ppReader)->pCur);
1,632✔
78
  taosMemoryFreeClear(*ppReader);
1,632!
79
}
80

81
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
1,632✔
82
  int     code  = TSDB_CODE_SUCCESS;
1,632✔
83
  int32_t lino  = 0;
1,632✔
84
  void*   pKey  = NULL;
1,632✔
85
  void*   pVal  = NULL;
1,632✔
86
  int32_t kLen  = 0;
1,632✔
87
  int32_t vLen  = 0;
1,632✔
88
  TSDB_CHECK_NULL(pReader, code, lino, end, TSDB_CODE_INVALID_MSG);
1,632!
89
  TSDB_CHECK_NULL(ppData, code, lino, end, TSDB_CODE_INVALID_MSG);
1,632!
90

91
  code = tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen);
1,632✔
92
  TSDB_CHECK_CONDITION(code == 0, code, lino, end, TDB_CODE_SUCCESS);
1,632!
93

UNCOV
94
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
×
UNCOV
95
  TSDB_CHECK_NULL(*ppData, code, lino, end, terrno);
×
96

UNCOV
97
  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
×
UNCOV
98
  pHdr->type = pReader->type;
×
UNCOV
99
  pHdr->size = vLen;
×
UNCOV
100
  (void)memcpy(pHdr->data, pVal, vLen);
×
UNCOV
101
  tqInfo("vgId:%d, vnode tq snapshot read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
×
102

103
end:
1,632✔
104
  if (code != 0) {
1,632!
105
    tqError("%s failed at %d, vnode tq snapshot read data failed since %s", __func__, lino, tstrerror(code));
×
106
  }
107
  tdbFree(pKey);
1,632✔
108
  tdbFree(pVal);
1,632✔
109
  return code;
1,632✔
110
}
111

112
// STqSnapWriter ========================================
113
struct STqSnapWriter {
114
  STQ*    pTq;
115
  int64_t sver;
116
  int64_t ever;
117
  TXN*    txn;
118
};
119

UNCOV
120
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
×
UNCOV
121
  int     code  = TSDB_CODE_SUCCESS;
×
UNCOV
122
  int32_t lino  = 0;
×
UNCOV
123
  STqSnapWriter* pWriter = NULL;
×
124

UNCOV
125
  TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG);
×
UNCOV
126
  TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
×
127

128
  // alloc
UNCOV
129
  pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
×
UNCOV
130
  TSDB_CHECK_NULL(pWriter, code, lino, end, terrno);
×
UNCOV
131
  pWriter->pTq = pTq;
×
UNCOV
132
  pWriter->sver = sver;
×
UNCOV
133
  pWriter->ever = ever;
×
134

UNCOV
135
  code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
×
UNCOV
136
  TSDB_CHECK_CODE(code, lino, end);
×
UNCOV
137
  tqInfo("vgId:%d, tq snapshot writer opene success", TD_VID(pTq->pVnode));
×
UNCOV
138
  *ppWriter = pWriter;
×
139

UNCOV
140
end:
×
UNCOV
141
  if (code != 0){
×
142
    tqError("%s failed at %d tq snapshot writer open failed since %s", __func__, lino, tstrerror(code));
×
143
    taosMemoryFreeClear(pWriter);
×
144
  }
UNCOV
145
  return code;
×
146
}
147

UNCOV
148
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
×
UNCOV
149
  int     code  = TSDB_CODE_SUCCESS;
×
UNCOV
150
  int32_t lino  = 0;
×
UNCOV
151
  STqSnapWriter* pWriter = NULL;
×
152

UNCOV
153
  TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
×
UNCOV
154
  TSDB_CHECK_NULL(*ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
×
UNCOV
155
  pWriter = *ppWriter;
×
156

UNCOV
157
  if (rollback) {
×
158
    tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
×
159
  } else {
UNCOV
160
    code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
×
UNCOV
161
    TSDB_CHECK_CODE(code, lino, end);
×
162

UNCOV
163
    code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
×
UNCOV
164
    TSDB_CHECK_CODE(code, lino, end);
×
165
  }
UNCOV
166
  tqInfo("vgId:%d, tq snapshot writer close success", TD_VID(pWriter->pTq->pVnode));
×
UNCOV
167
  taosMemoryFreeClear(*ppWriter);
×
168

169
end:
×
UNCOV
170
  if (code != 0){
×
171
    tqError("%s failed at %d, tq snapshot writer close failed since %s", __func__, lino, tstrerror(code));
×
172
  }
UNCOV
173
  return code;
×
174
}
175

UNCOV
176
static int32_t tqWriteCheck(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData){
×
UNCOV
177
  int       code  = TSDB_CODE_SUCCESS;
×
UNCOV
178
  int32_t   lino  = 0;
×
UNCOV
179
  TSDB_CHECK_NULL(pWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
×
UNCOV
180
  TSDB_CHECK_NULL(pData, code, lino, end, TSDB_CODE_INVALID_MSG);
×
UNCOV
181
  TSDB_CHECK_CONDITION(nData >= sizeof(SSnapDataHdr), code, lino, end, TSDB_CODE_INVALID_MSG);
×
UNCOV
182
end:
×
UNCOV
183
  if (code != 0){
×
184
    tqError("%s failed at %d failed since %s", __func__, lino, tstrerror(code));
×
185
  }
UNCOV
186
  return code;
×
187
}
UNCOV
188
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
189
  int       code  = TSDB_CODE_SUCCESS;
×
UNCOV
190
  int32_t   lino  = 0;
×
UNCOV
191
  SDecoder  decoder = {0};
×
UNCOV
192
  SDecoder* pDecoder = &decoder;
×
UNCOV
193
  STqHandle handle = {0};
×
UNCOV
194
  code = tqWriteCheck(pWriter, pData, nData);
×
UNCOV
195
  TSDB_CHECK_CODE(code, lino, end);
×
196

UNCOV
197
  STQ*      pTq = pWriter->pTq;
×
UNCOV
198
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
UNCOV
199
  code = tDecodeSTqHandle(pDecoder, &handle);
×
UNCOV
200
  TSDB_CHECK_CODE(code, lino, end);
×
201

UNCOV
202
  taosWLockLatch(&pTq->lock);
×
UNCOV
203
  code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, strlen(handle.subKey), pData + sizeof(SSnapDataHdr),
×
204
                        nData - sizeof(SSnapDataHdr));
UNCOV
205
  taosWUnLockLatch(&pTq->lock);
×
UNCOV
206
  TSDB_CHECK_CODE(code, lino, end);
×
UNCOV
207
  tqInfo("vgId:%d, vnode tq snapshot write success", TD_VID(pTq->pVnode));
×
208

UNCOV
209
end:
×
UNCOV
210
  tDecoderClear(pDecoder);
×
UNCOV
211
  tqDestroyTqHandle(&handle);
×
UNCOV
212
  if (code != 0){
×
213
    tqError("%s failed at %d, vnode tq snapshot write failed since %s", __func__, lino, tstrerror(code));
×
214
  }
UNCOV
215
  return code;
×
216
}
217

UNCOV
218
int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
219
  int       code  = TSDB_CODE_SUCCESS;
×
UNCOV
220
  int32_t   lino  = 0;
×
221

UNCOV
222
  code = tqWriteCheck(pWriter, pData, nData);
×
UNCOV
223
  TSDB_CHECK_CODE(code, lino, end);
×
224

UNCOV
225
  STQ*         pTq = pWriter->pTq;
×
UNCOV
226
  STqCheckInfo info = {0};
×
UNCOV
227
  code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
UNCOV
228
  TSDB_CHECK_CODE(code, lino, end);
×
229

UNCOV
230
  taosWLockLatch(&pTq->lock);
×
UNCOV
231
  code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr),
×
232
                        nData - sizeof(SSnapDataHdr));
UNCOV
233
  taosWUnLockLatch(&pTq->lock);
×
234

UNCOV
235
  tDeleteSTqCheckInfo(&info);
×
UNCOV
236
  TSDB_CHECK_CODE(code, lino, end);
×
UNCOV
237
  tqInfo("vgId:%d, vnode tq check info  write success", TD_VID(pTq->pVnode));
×
238

UNCOV
239
end:
×
UNCOV
240
  if (code != 0){
×
241
    tqError("%s failed at %d, vnode tq check info write failed since %s", __func__, lino, tstrerror(code));
×
242
  }
UNCOV
243
  return code;
×
244
}
245

UNCOV
246
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
247
  int       code  = TSDB_CODE_SUCCESS;
×
UNCOV
248
  int32_t   lino  = 0;
×
UNCOV
249
  code = tqWriteCheck(pWriter, pData, nData);
×
UNCOV
250
  TSDB_CHECK_CODE(code, lino, end);
×
251

UNCOV
252
  STQ*    pTq = pWriter->pTq;
×
UNCOV
253
  STqOffset info = {0};
×
UNCOV
254
  code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
UNCOV
255
  TSDB_CHECK_CODE(code, lino, end);
×
256

UNCOV
257
  taosWLockLatch(&pTq->lock);
×
UNCOV
258
  code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr),
×
259
                        nData - sizeof(SSnapDataHdr));
UNCOV
260
  taosWUnLockLatch(&pTq->lock);
×
UNCOV
261
  tDeleteSTqOffset(&info);
×
UNCOV
262
  TSDB_CHECK_CODE(code, lino, end);
×
UNCOV
263
  tqInfo("vgId:%d, vnode tq offset write success", TD_VID(pTq->pVnode));
×
264

UNCOV
265
end:
×
UNCOV
266
  if (code != 0){
×
267
    tqError("%s failed at %d, vnode tq offset write failed since %s", __func__, lino, tstrerror(code));
×
268
  }
UNCOV
269
  return code;
×
270
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc