• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tq/tqSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "tdbInt.h"
18
#include "tq.h"
19

20
// STqSnapReader ========================================
21
struct STqSnapReader {
22
  STQ*    pTq;
23
  int64_t sver;
24
  int64_t ever;
25
  TBC*    pCur;
26
  int8_t  type;
27
};
28

UNCOV
29
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) {
×
UNCOV
30
  if (pTq == NULL || ppReader == NULL) {
×
31
    return TSDB_CODE_INVALID_MSG;
×
32
  }
UNCOV
33
  int32_t        code = 0;
×
UNCOV
34
  STqSnapReader* pReader = NULL;
×
35

36
  // alloc
UNCOV
37
  pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
×
UNCOV
38
  if (pReader == NULL) {
×
39
    code = terrno;
×
40
    goto _err;
×
41
  }
UNCOV
42
  pReader->pTq = pTq;
×
UNCOV
43
  pReader->sver = sver;
×
UNCOV
44
  pReader->ever = ever;
×
UNCOV
45
  pReader->type = type;
×
46

47
  // impl
UNCOV
48
  TTB* pTb = NULL;
×
UNCOV
49
  if (type == SNAP_DATA_TQ_CHECKINFO) {
×
UNCOV
50
    pTb = pTq->pCheckStore;
×
UNCOV
51
  } else if (type == SNAP_DATA_TQ_HANDLE) {
×
UNCOV
52
    pTb = pTq->pExecStore;
×
UNCOV
53
  } else if (type == SNAP_DATA_TQ_OFFSET) {
×
UNCOV
54
    pTb = pTq->pOffsetStore;
×
55
  } else {
56
    code = TSDB_CODE_INVALID_MSG;
×
57
    goto _err;
×
58
  }
UNCOV
59
  code = tdbTbcOpen(pTb, &pReader->pCur, NULL);
×
UNCOV
60
  if (code) {
×
61
    taosMemoryFree(pReader);
×
62
    goto _err;
×
63
  }
64

UNCOV
65
  code = tdbTbcMoveToFirst(pReader->pCur);
×
UNCOV
66
  if (code) {
×
67
    taosMemoryFree(pReader);
×
68
    goto _err;
×
69
  }
70

UNCOV
71
  tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
×
72

UNCOV
73
  *ppReader = pReader;
×
UNCOV
74
  return code;
×
75

76
_err:
×
77
  tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
78
  *ppReader = NULL;
×
79
  return code;
×
80
}
81

UNCOV
82
void tqSnapReaderClose(STqSnapReader** ppReader) {
×
UNCOV
83
  if (ppReader == NULL || *ppReader == NULL) {
×
84
    return;
×
85
  }
UNCOV
86
  tdbTbcClose((*ppReader)->pCur);
×
UNCOV
87
  taosMemoryFree(*ppReader);
×
UNCOV
88
  *ppReader = NULL;
×
89
}
90

UNCOV
91
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
×
UNCOV
92
  if (pReader == NULL || ppData == NULL) {
×
93
    return TSDB_CODE_INVALID_MSG;
×
94
  }
UNCOV
95
  int32_t code = 0;
×
UNCOV
96
  void*   pKey = NULL;
×
UNCOV
97
  void*   pVal = NULL;
×
UNCOV
98
  int32_t kLen = 0;
×
UNCOV
99
  int32_t vLen = 0;
×
100

UNCOV
101
  if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
×
UNCOV
102
    goto _exit;
×
103
  }
104

UNCOV
105
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
×
UNCOV
106
  if (*ppData == NULL) {
×
107
    code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
108
    goto _err;
×
109
  }
110

UNCOV
111
  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
×
UNCOV
112
  pHdr->type = pReader->type;
×
UNCOV
113
  pHdr->size = vLen;
×
UNCOV
114
  (void)memcpy(pHdr->data, pVal, vLen);
×
115

UNCOV
116
_exit:
×
UNCOV
117
  tdbFree(pKey);
×
UNCOV
118
  tdbFree(pVal);
×
UNCOV
119
  tqInfo("vgId:%d, vnode snapshot tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
×
UNCOV
120
  return code;
×
121

122
_err:
×
123
  tdbFree(pKey);
×
124
  tdbFree(pVal);
×
125
  tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
×
126
  return code;
×
127
}
128

129
// STqSnapWriter ========================================
130
struct STqSnapWriter {
131
  STQ*    pTq;
132
  int64_t sver;
133
  int64_t ever;
134
  TXN*    txn;
135
};
136

UNCOV
137
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
×
UNCOV
138
  if (pTq == NULL || ppWriter == NULL) {
×
139
    return TSDB_CODE_INVALID_MSG;
×
140
  }
UNCOV
141
  int32_t        code = 0;
×
UNCOV
142
  STqSnapWriter* pWriter = NULL;
×
143

144
  // alloc
UNCOV
145
  pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
×
UNCOV
146
  if (pWriter == NULL) {
×
147
    code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
148
    ;
149
    goto _err;
×
150
  }
UNCOV
151
  pWriter->pTq = pTq;
×
UNCOV
152
  pWriter->sver = sver;
×
UNCOV
153
  pWriter->ever = ever;
×
154

UNCOV
155
  code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
×
UNCOV
156
  if (code < 0) {
×
157
    taosMemoryFree(pWriter);
×
158
    goto _err;
×
159
  }
160

UNCOV
161
  *ppWriter = pWriter;
×
UNCOV
162
  return code;
×
163

164
_err:
×
165
  tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
166
  *ppWriter = NULL;
×
167
  return code;
×
168
}
169

UNCOV
170
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
×
UNCOV
171
  if (ppWriter == NULL || *ppWriter == NULL) {
×
172
    return TSDB_CODE_INVALID_MSG;
×
173
  }
UNCOV
174
  int32_t        code = 0;
×
UNCOV
175
  STqSnapWriter* pWriter = *ppWriter;
×
UNCOV
176
  STQ*           pTq = pWriter->pTq;
×
177

UNCOV
178
  if (rollback) {
×
179
    tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
×
180
  } else {
UNCOV
181
    code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
×
UNCOV
182
    if (code) goto _err;
×
UNCOV
183
    code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
×
UNCOV
184
    if (code) goto _err;
×
185
  }
186

UNCOV
187
  taosMemoryFree(pWriter);
×
UNCOV
188
  *ppWriter = NULL;
×
189

UNCOV
190
  return code;
×
191

192
_err:
×
193
  tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
194
  return code;
×
195
}
196

UNCOV
197
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
198
  if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) {
×
199
    return TSDB_CODE_INVALID_MSG;
×
200
  }
UNCOV
201
  int32_t   code = 0;
×
UNCOV
202
  STQ*      pTq = pWriter->pTq;
×
UNCOV
203
  SDecoder  decoder = {0};
×
UNCOV
204
  SDecoder* pDecoder = &decoder;
×
UNCOV
205
  STqHandle handle = {0};
×
206

UNCOV
207
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
UNCOV
208
  code = tDecodeSTqHandle(pDecoder, &handle);
×
UNCOV
209
  if (code) goto end;
×
UNCOV
210
  taosWLockLatch(&pTq->lock);
×
UNCOV
211
  code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, strlen(handle.subKey), pData + sizeof(SSnapDataHdr),
×
212
                        nData - sizeof(SSnapDataHdr));
UNCOV
213
  taosWUnLockLatch(&pTq->lock);
×
214

UNCOV
215
end:
×
UNCOV
216
  tDecoderClear(pDecoder);
×
UNCOV
217
  tqDestroyTqHandle(&handle);
×
UNCOV
218
  tqInfo("vgId:%d, vnode snapshot tq write result:%d", TD_VID(pTq->pVnode), code);
×
UNCOV
219
  return code;
×
220
}
221

222
int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
223
  if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) {
×
224
    return TSDB_CODE_INVALID_MSG;
×
225
  }
226
  int32_t      code = 0;
×
227
  STQ*         pTq = pWriter->pTq;
×
228
  STqCheckInfo info = {0};
×
229
  code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
230
  if (code != 0) {
×
231
    goto _err;
×
232
  }
233

234
  code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr),
×
235
                        nData - sizeof(SSnapDataHdr));
236
  tDeleteSTqCheckInfo(&info);
×
237
  if (code) goto _err;
×
238

239
  return code;
×
240

241
_err:
×
242
  tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
243
  return code;
×
244
}
245

UNCOV
246
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
247
  if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) {
×
248
    return TSDB_CODE_INVALID_MSG;
×
249
  }
UNCOV
250
  int32_t code = 0;
×
UNCOV
251
  STQ*    pTq = pWriter->pTq;
×
252

UNCOV
253
  STqOffset info = {0};
×
UNCOV
254
  code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
UNCOV
255
  if (code != 0) {
×
256
    goto _err;
×
257
  }
258

UNCOV
259
  code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr),
×
260
                        nData - sizeof(SSnapDataHdr));
UNCOV
261
  tDeleteSTqOffset(&info);
×
UNCOV
262
  if (code) goto _err;
×
263

UNCOV
264
  return code;
×
265

266
_err:
×
267
  tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
268
  return code;
×
269
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc