• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.77
/source/dnode/vnode/src/tq/tqStreamStateSnap.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "streamSnapshot.h"
18
#include "tdbInt.h"
19
#include "tq.h"
20

21
// STqSnapReader ========================================
22
struct SStreamStateReader {
23
  STQ*    pTq;
24
  int64_t sver;
25
  int64_t ever;
26
  TBC*    pCur;
27

28
  SStreamSnapReader* pReaderImpl;
29
  int32_t            complete;  // open reader or not
30
};
31

32
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) {
86✔
33
  int32_t             code = 0;
86✔
34
  SStreamStateReader* pReader = NULL;
86✔
35

36
  char tdir[TSDB_FILENAME_LEN * 2] = {0};
86✔
37

38
  // alloc
39
  pReader = (SStreamStateReader*)taosMemoryCalloc(1, sizeof(SStreamStateReader));
86✔
40
  if (pReader == NULL) {
86!
41
    code = terrno;
×
42
    goto _err;
×
43
  }
44

45
  SStreamMeta* meta = pTq->pStreamMeta;
86✔
46
  pReader->pTq = pTq;
86✔
47
  pReader->sver = sver;
86✔
48
  pReader->ever = ever;
86✔
49

50
  int64_t chkpId = meta ? meta->chkpId : 0;
86!
51

52
  SStreamSnapReader* pSnapReader = NULL;
86✔
53

54
  if ((code = streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader)) == 0) {
86!
55
    pReader->complete = 1;
86✔
56
  } else {
57
    taosMemoryFree(pReader);
×
58
    goto _err;
×
59
  }
60
  pReader->pReaderImpl = pSnapReader;
86✔
61

62
  tqDebug("vgId:%d, vnode %s snapshot reader opened", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER);
86✔
63

64
  *ppReader = pReader;
86✔
65
  return code;
86✔
66

67
_err:
×
68
  tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
×
69
          tstrerror(code));
70
  *ppReader = NULL;
×
71
  return code;
×
72
}
73

74
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
86✔
75
  int32_t code = 0;
86✔
76
  tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
86✔
77
  code = streamSnapReaderClose(pReader->pReaderImpl);
86✔
78
  taosMemoryFree(pReader);
86✔
79
  return code;
86✔
80
}
81

82
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
86✔
83
  tqDebug("vgId:%d, vnode %s snapshot read data", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
86✔
84

85
  int32_t code = 0;
86✔
86
  if (pReader->complete == 0) {
86!
87
    return 0;
×
88
  }
89

90
  uint8_t* rowData = NULL;
86✔
91
  int64_t  len;
92
  code = streamSnapRead(pReader->pReaderImpl, &rowData, &len);
86✔
93
  if (code != 0 || rowData == NULL || len == 0) {
86!
94
    return code;
86✔
95
  }
96
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + len);
×
97
  if (*ppData == NULL) {
×
98
    code = terrno;
×
99
    goto _err;
×
100
  }
101
  // refactor later, avoid mem/free freq
102
  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
×
103
  pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND;
×
104
  pHdr->size = len;
×
105
  memcpy(pHdr->data, rowData, len);
×
106
  taosMemoryFree(rowData);
×
107
  tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
×
108
  return code;
×
109

110
_err:
×
111
  tqError("vgId:%d, vnode stream-state snapshot failed to read since %s", TD_VID(pReader->pTq->pVnode),
×
112
          tstrerror(code));
113
  return code;
×
114
}
115

116
// STqSnapWriter ========================================
117
struct SStreamStateWriter {
118
  STQ*    pTq;
119
  int64_t sver;
120
  int64_t ever;
121
  TXN*    txn;
122

123
  SStreamSnapWriter* pWriterImpl;
124
};
125

126
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter) {
×
127
  int32_t             code = 0;
×
128
  SStreamStateWriter* pWriter;
129

130
  // alloc
131
  pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
×
132
  if (pWriter == NULL) {
×
133
    code = terrno;
×
134
    goto _err;
×
135
  }
136
  pWriter->pTq = pTq;
×
137
  pWriter->sver = sver;
×
138
  pWriter->ever = ever;
×
139

140
  if (taosMkDir(pTq->pStreamMeta->path) != 0) {
×
141
    code = TAOS_SYSTEM_ERROR(errno);
×
142
    tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode),
×
143
            STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(code));
144
    goto _err;
×
145
  }
146

147
  SStreamSnapWriter* pSnapWriter = NULL;
×
148
  if ((code = streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter)) < 0) {
×
149
    goto _err;
×
150
  }
151

152
  tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
×
153
          pTq->pStreamMeta->path);
154
  pWriter->pWriterImpl = pSnapWriter;
×
155

156
  *ppWriter = pWriter;
×
157
  return 0;
×
158

159
_err:
×
160
  tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
×
161
          tstrerror(code));
162
  taosMemoryFree(pWriter);
×
163
  *ppWriter = NULL;
×
164
  return code;
×
165
}
166

167
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
×
168
  tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
169
  return streamSnapWriterClose(pWriter->pWriterImpl, rollback);
×
170
}
171

172
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
173
  tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
174
  return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
175
}
176
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
×
177
  tqDebug("vgId:%d, vnode %s  start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
178
  int32_t code = streamStateLoadTasks(pWriter);
×
179
  tqDebug("vgId:%d, vnode %s  succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
180
  taosMemoryFree(pWriter);
×
181
  return code;
×
182
}
183

184
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
×
185
  streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
×
186
  return TSDB_CODE_SUCCESS;
×
187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc