• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.0
/source/dnode/vnode/src/tq/tqSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "tdbInt.h"
18
#include "tq.h"
19

20
// STqSnapReader ========================================
21
struct STqSnapReader {
22
  STQ*    pTq;
23
  int64_t sver;
24
  int64_t ever;
25
  TBC*    pCur;
26
  int8_t  type;
27
};
28

29
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) {
207✔
30
  int32_t        code = 0;
207✔
31
  STqSnapReader* pReader = NULL;
207✔
32

33
  // alloc
34
  pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
207!
35
  if (pReader == NULL) {
207!
36
    code = terrno;
×
37
    goto _err;
×
38
  }
39
  pReader->pTq = pTq;
207✔
40
  pReader->sver = sver;
207✔
41
  pReader->ever = ever;
207✔
42
  pReader->type = type;
207✔
43

44
  // impl
45
  TTB* pTb = NULL;
207✔
46
  if (type == SNAP_DATA_TQ_CHECKINFO) {
207✔
47
    pTb = pTq->pCheckStore;
69✔
48
  } else if (type == SNAP_DATA_TQ_HANDLE) {
138✔
49
    pTb = pTq->pExecStore;
69✔
50
  } else if (type == SNAP_DATA_TQ_OFFSET) {
69!
51
    pTb = pTq->pOffsetStore;
69✔
52
  } else {
53
    code = TSDB_CODE_INVALID_MSG;
×
54
    goto _err;
×
55
  }
56
  code = tdbTbcOpen(pTb, &pReader->pCur, NULL);
207✔
57
  if (code) {
207!
58
    taosMemoryFree(pReader);
×
59
    goto _err;
×
60
  }
61

62
  code = tdbTbcMoveToFirst(pReader->pCur);
207✔
63
  if (code) {
207!
64
    taosMemoryFree(pReader);
×
65
    goto _err;
×
66
  }
67

68
  tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
207!
69

70
  *ppReader = pReader;
207✔
71
  return code;
207✔
72

73
_err:
×
74
  tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
75
  *ppReader = NULL;
×
76
  return code;
×
77
}
78

79
void tqSnapReaderClose(STqSnapReader** ppReader) {
207✔
80
  tdbTbcClose((*ppReader)->pCur);
207✔
81
  taosMemoryFree(*ppReader);
207!
82
  *ppReader = NULL;
207✔
83
}
207✔
84

85
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
291✔
86
  int32_t code = 0;
291✔
87
  void*   pKey = NULL;
291✔
88
  void*   pVal = NULL;
291✔
89
  int32_t kLen = 0;
291✔
90
  int32_t vLen = 0;
291✔
91

92
  if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
291✔
93
    goto _exit;
207✔
94
  }
95

96
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
84!
97
  if (*ppData == NULL) {
84!
98
    code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
99
    goto _err;
×
100
  }
101

102
  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
84✔
103
  pHdr->type = pReader->type;
84✔
104
  pHdr->size = vLen;
84✔
105
  (void)memcpy(pHdr->data, pVal, vLen);
84✔
106

107
_exit:
291✔
108
  tdbFree(pKey);
291✔
109
  tdbFree(pVal);
291✔
110
  tqInfo("vgId:%d, vnode snapshot tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
291!
111
  return code;
291✔
112

113
_err:
×
114
  tdbFree(pKey);
×
115
  tdbFree(pVal);
×
116
  tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
×
117
  return code;
×
118
}
119

120
// STqSnapWriter ========================================
121
struct STqSnapWriter {
122
  STQ*    pTq;
123
  int64_t sver;
124
  int64_t ever;
125
  TXN*    txn;
126
};
127

128
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
84✔
129
  int32_t        code = 0;
84✔
130
  STqSnapWriter* pWriter = NULL;
84✔
131

132
  // alloc
133
  pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
84!
134
  if (pWriter == NULL) {
84!
135
    code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
136
    ;
137
    goto _err;
×
138
  }
139
  pWriter->pTq = pTq;
84✔
140
  pWriter->sver = sver;
84✔
141
  pWriter->ever = ever;
84✔
142

143
  code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
84✔
144
  if (code < 0) {
84!
145
    taosMemoryFree(pWriter);
×
146
    goto _err;
×
147
  }
148

149
  *ppWriter = pWriter;
84✔
150
  return code;
84✔
151

152
_err:
×
153
  tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
154
  *ppWriter = NULL;
×
155
  return code;
×
156
}
157

158
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
84✔
159
  int32_t        code = 0;
84✔
160
  STqSnapWriter* pWriter = *ppWriter;
84✔
161
  STQ*           pTq = pWriter->pTq;
84✔
162

163
  if (rollback) {
84!
164
    tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
×
165
  } else {
166
    code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
84✔
167
    if (code) goto _err;
84!
168
    code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
84✔
169
    if (code) goto _err;
84!
170
  }
171

172
  taosMemoryFree(pWriter);
84!
173
  *ppWriter = NULL;
84✔
174

175
  return code;
84✔
176

177
_err:
×
178
  tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
179
  return code;
×
180
}
181

182
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
42✔
183
  int32_t   code = 0;
42✔
184
  STQ*      pTq = pWriter->pTq;
42✔
185
  SDecoder  decoder = {0};
42✔
186
  SDecoder* pDecoder = &decoder;
42✔
187
  STqHandle handle = {0};
42✔
188

189
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
42✔
190
  code = tDecodeSTqHandle(pDecoder, &handle);
42✔
191
  if (code) goto end;
42!
192
  taosWLockLatch(&pTq->lock);
42✔
193
  code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, (int)strlen(handle.subKey), pData + sizeof(SSnapDataHdr),
42✔
194
                        nData - sizeof(SSnapDataHdr));
42✔
195
  taosWUnLockLatch(&pTq->lock);
42✔
196

197
end:
42✔
198
  tDecoderClear(pDecoder);
42✔
199
  tqDestroyTqHandle(&handle);
42✔
200
  tqInfo("vgId:%d, vnode snapshot tq write result:%d", TD_VID(pTq->pVnode), code);
42!
201
  return code;
42✔
202
}
203

UNCOV
204
int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
205
  int32_t      code = 0;
×
UNCOV
206
  STQ*         pTq = pWriter->pTq;
×
UNCOV
207
  STqCheckInfo info = {0};
×
UNCOV
208
  code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
UNCOV
209
  if (code != 0) {
×
210
    goto _err;
×
211
  }
212

UNCOV
213
  code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr),
×
UNCOV
214
                        nData - sizeof(SSnapDataHdr));
×
UNCOV
215
  tDeleteSTqCheckInfo(&info);
×
UNCOV
216
  if (code) goto _err;
×
217

UNCOV
218
  return code;
×
219

220
_err:
×
221
  tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
222
  return code;
×
223
}
224

225
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
42✔
226
  int32_t code = 0;
42✔
227
  STQ*    pTq = pWriter->pTq;
42✔
228

229
  STqOffset info = {0};
42✔
230
  code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
42✔
231
  if (code != 0) {
42!
232
    goto _err;
×
233
  }
234

235
  code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr),
42✔
236
                        nData - sizeof(SSnapDataHdr));
42✔
237
  tDeleteSTqOffset(&info);
42✔
238
  if (code) goto _err;
42!
239

240
  return code;
42✔
241

242
_err:
×
243
  tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
244
  return code;
×
245
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc