• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4986

15 Mar 2026 08:32AM UTC coverage: 37.305% (-31.3%) from 68.601%
#4986

push

travis-ci

tomchon
test: keep docs and unit test

125478 of 336361 relevant lines covered (37.3%)

1134847.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tq/tqSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "tdbInt.h"
18
#include "tq.h"
19

20
// STqSnapReader ========================================
21
struct STqSnapReader {
22
  STQ*    pTq;
23
  int64_t sver;
24
  int64_t ever;
25
  TBC*    pCur;
26
  int8_t  type;
27
};
28

29
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) {
×
30
  int      code  = TSDB_CODE_SUCCESS;
×
31
  int32_t  lino  = 0;
×
32
  STqSnapReader* pReader = NULL;
×
33
  TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG);
×
34
  TSDB_CHECK_NULL(ppReader, code, lino, end, TSDB_CODE_INVALID_MSG);
×
35

36
  // alloc
37
  pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
×
38
  TSDB_CHECK_NULL(pReader, code, lino, end, terrno);
×
39

40
  pReader->pTq = pTq;
×
41
  pReader->sver = sver;
×
42
  pReader->ever = ever;
×
43
  pReader->type = type;
×
44

45
  // impl
46
  TTB* pTb = NULL;
×
47
  if (type == SNAP_DATA_TQ_HANDLE) {
×
48
    pTb = pTq->pExecStore;
×
49
  } else if (type == SNAP_DATA_TQ_OFFSET) {
×
50
    pTb = pTq->pOffsetStore;
×
51
  } else {
52
    code = TSDB_CODE_INVALID_MSG;
×
53
    goto end;
×
54
  }
55
  code = tdbTbcOpen(pTb, &pReader->pCur, NULL);
×
56
  TSDB_CHECK_CODE(code, lino, end);
×
57
  code = tdbTbcMoveToFirst(pReader->pCur);
×
58
  TSDB_CHECK_CODE(code, lino, end);
×
59
  tqInfo("vgId:%d, vnode tq snapshot reader open success", TD_VID(pTq->pVnode));
×
60
  *ppReader = pReader;
×
61

62
end:
×
63
  if (code != 0){
×
64
    tqError("%s failed at %d, vnode tq snapshot reader open failed since %s", __func__, lino, tstrerror(code));
×
65
    taosMemoryFreeClear(pReader);
×
66
  }
67

68
  return code;
×
69
}
70

71
void tqSnapReaderClose(STqSnapReader** ppReader) {
×
72
  if (ppReader == NULL || *ppReader == NULL) {
×
73
    return;
×
74
  }
75
  tdbTbcClose((*ppReader)->pCur);
×
76
  taosMemoryFreeClear(*ppReader);
×
77
}
78

79
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
×
80
  int     code  = TSDB_CODE_SUCCESS;
×
81
  int32_t lino  = 0;
×
82
  void*   pKey  = NULL;
×
83
  void*   pVal  = NULL;
×
84
  int32_t kLen  = 0;
×
85
  int32_t vLen  = 0;
×
86
  TSDB_CHECK_NULL(pReader, code, lino, end, TSDB_CODE_INVALID_MSG);
×
87
  TSDB_CHECK_NULL(ppData, code, lino, end, TSDB_CODE_INVALID_MSG);
×
88

89
  code = tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen);
×
90
  TSDB_CHECK_CONDITION(code == 0, code, lino, end, TDB_CODE_SUCCESS);
×
91

92
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
×
93
  TSDB_CHECK_NULL(*ppData, code, lino, end, terrno);
×
94

95
  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
×
96
  pHdr->type = pReader->type;
×
97
  pHdr->size = vLen;
×
98
  (void)memcpy(pHdr->data, pVal, vLen);
×
99
  tqInfo("vgId:%d, vnode tq snapshot read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
×
100

101
end:
×
102
  if (code != 0) {
×
103
    tqError("%s failed at %d, vnode tq snapshot read data failed since %s", __func__, lino, tstrerror(code));
×
104
  }
105
  tdbFree(pKey);
×
106
  tdbFree(pVal);
×
107
  return code;
×
108
}
109

110
// STqSnapWriter ========================================
111
struct STqSnapWriter {
112
  STQ*    pTq;
113
  int64_t sver;
114
  int64_t ever;
115
  TXN*    txn;
116
};
117

118
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
×
119
  int     code  = TSDB_CODE_SUCCESS;
×
120
  int32_t lino  = 0;
×
121
  STqSnapWriter* pWriter = NULL;
×
122

123
  TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG);
×
124
  TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
×
125

126
  // alloc
127
  pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
×
128
  TSDB_CHECK_NULL(pWriter, code, lino, end, terrno);
×
129
  pWriter->pTq = pTq;
×
130
  pWriter->sver = sver;
×
131
  pWriter->ever = ever;
×
132

133
  code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
×
134
  TSDB_CHECK_CODE(code, lino, end);
×
135
  tqInfo("vgId:%d, tq snapshot writer opene success", TD_VID(pTq->pVnode));
×
136
  *ppWriter = pWriter;
×
137

138
end:
×
139
  if (code != 0){
×
140
    tqError("%s failed at %d tq snapshot writer open failed since %s", __func__, lino, tstrerror(code));
×
141
    taosMemoryFreeClear(pWriter);
×
142
  }
143
  return code;
×
144
}
145

146
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
×
147
  int     code  = TSDB_CODE_SUCCESS;
×
148
  int32_t lino  = 0;
×
149
  STqSnapWriter* pWriter = NULL;
×
150

151
  TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
×
152
  TSDB_CHECK_NULL(*ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
×
153
  pWriter = *ppWriter;
×
154

155
  if (rollback) {
×
156
    tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
×
157
  } else {
158
    code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
×
159
    TSDB_CHECK_CODE(code, lino, end);
×
160

161
    code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
×
162
    TSDB_CHECK_CODE(code, lino, end);
×
163
  }
164
  tqInfo("vgId:%d, tq snapshot writer close success", TD_VID(pWriter->pTq->pVnode));
×
165
  taosMemoryFreeClear(*ppWriter);
×
166

167
end:
×
168
  if (code != 0){
×
169
    tqError("%s failed at %d, tq snapshot writer close failed since %s", __func__, lino, tstrerror(code));
×
170
  }
171
  return code;
×
172
}
173

174
static int32_t tqWriteCheck(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData){
×
175
  int       code  = TSDB_CODE_SUCCESS;
×
176
  int32_t   lino  = 0;
×
177
  TSDB_CHECK_NULL(pWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
×
178
  TSDB_CHECK_NULL(pData, code, lino, end, TSDB_CODE_INVALID_MSG);
×
179
  TSDB_CHECK_CONDITION(nData >= sizeof(SSnapDataHdr), code, lino, end, TSDB_CODE_INVALID_MSG);
×
180
end:
×
181
  if (code != 0){
×
182
    tqError("%s failed at %d failed since %s", __func__, lino, tstrerror(code));
×
183
  }
184
  return code;
×
185
}
186
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
187
  int       code  = TSDB_CODE_SUCCESS;
×
188
  int32_t   lino  = 0;
×
189
  SDecoder  decoder = {0};
×
190
  SDecoder* pDecoder = &decoder;
×
191
  STqHandle handle = {0};
×
192
  code = tqWriteCheck(pWriter, pData, nData);
×
193
  TSDB_CHECK_CODE(code, lino, end);
×
194

195
  STQ*      pTq = pWriter->pTq;
×
196
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
197
  code = tDecodeSTqHandle(pDecoder, &handle);
×
198
  TSDB_CHECK_CODE(code, lino, end);
×
199

200
  taosWLockLatch(&pTq->lock);
×
201
  code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, strlen(handle.subKey), pData + sizeof(SSnapDataHdr),
×
202
                        nData - sizeof(SSnapDataHdr));
203
  taosWUnLockLatch(&pTq->lock);
×
204
  TSDB_CHECK_CODE(code, lino, end);
×
205
  tqInfo("vgId:%d, vnode tq snapshot write success", TD_VID(pTq->pVnode));
×
206

207
end:
×
208
  tDecoderClear(pDecoder);
×
209
  tqDestroyTqHandle(&handle);
×
210
  if (code != 0){
×
211
    tqError("%s failed at %d, vnode tq snapshot write failed since %s", __func__, lino, tstrerror(code));
×
212
  }
213
  return code;
×
214
}
215

216
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
217
  int       code  = TSDB_CODE_SUCCESS;
×
218
  int32_t   lino  = 0;
×
219
  code = tqWriteCheck(pWriter, pData, nData);
×
220
  TSDB_CHECK_CODE(code, lino, end);
×
221

222
  STQ*    pTq = pWriter->pTq;
×
223
  STqOffset info = {0};
×
224
  code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
225
  TSDB_CHECK_CODE(code, lino, end);
×
226

227
  taosWLockLatch(&pTq->lock);
×
228
  code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr),
×
229
                        nData - sizeof(SSnapDataHdr));
230
  taosWUnLockLatch(&pTq->lock);
×
231
  tDeleteSTqOffset(&info);
×
232
  TSDB_CHECK_CODE(code, lino, end);
×
233
  tqInfo("vgId:%d, vnode tq offset write success", TD_VID(pTq->pVnode));
×
234

235
end:
×
236
  if (code != 0){
×
237
    tqError("%s failed at %d, vnode tq offset write failed since %s", __func__, lino, tstrerror(code));
×
238
  }
239
  return code;
×
240
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc