• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.77
/source/dnode/vnode/src/tq/tqStreamStateSnap.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "streamSnapshot.h"
18
#include "tdbInt.h"
19
#include "tq.h"
20

21
// STqSnapReader ========================================
22
struct SStreamStateReader {
23
  STQ*    pTq;
24
  int64_t sver;
25
  int64_t ever;
26
  TBC*    pCur;
27

28
  SStreamSnapReader* pReaderImpl;
29
  int32_t            complete;  // open reader or not
30
};
31

32
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) {
19✔
33
  int32_t             code = 0;
19✔
34
  SStreamStateReader* pReader = NULL;
19✔
35

36
  char tdir[TSDB_FILENAME_LEN * 2] = {0};
19✔
37

38
  // alloc
39
  pReader = (SStreamStateReader*)taosMemoryCalloc(1, sizeof(SStreamStateReader));
19✔
40
  if (pReader == NULL) {
19!
41
    code = terrno;
×
42
    goto _err;
×
43
  }
44

45
  SStreamMeta* meta = pTq->pStreamMeta;
19✔
46
  pReader->pTq = pTq;
19✔
47
  pReader->sver = sver;
19✔
48
  pReader->ever = ever;
19✔
49

50
  int64_t chkpId = meta ? meta->chkpId : 0;
19!
51

52
  SStreamSnapReader* pSnapReader = NULL;
19✔
53

54
  if ((code = streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader)) == 0) {
19!
55
    pReader->complete = 1;
19✔
56
  } else {
57
    taosMemoryFree(pReader);
×
58
    goto _err;
×
59
  }
60
  pReader->pReaderImpl = pSnapReader;
19✔
61

62
  tqDebug("vgId:%d, vnode %s snapshot reader opened", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER);
19✔
63

64
  *ppReader = pReader;
19✔
65
  return code;
19✔
66

67
_err:
×
68
  tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
×
69
          tstrerror(code));
70
  *ppReader = NULL;
×
71
  return code;
×
72
}
73

74
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
19✔
75
  int32_t code = 0;
19✔
76
  tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
19✔
77
  code = streamSnapReaderClose(pReader->pReaderImpl);
19✔
78
  taosMemoryFree(pReader);
19✔
79
  return code;
19✔
80
}
81

82
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
19✔
83
  tqDebug("vgId:%d, vnode %s snapshot read data", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
19✔
84

85
  int32_t code = 0;
19✔
86
  if (pReader->complete == 0) {
19!
87
    return 0;
×
88
  }
89

90
  uint8_t* rowData = NULL;
19✔
91
  int64_t  len;
92
  code = streamSnapRead(pReader->pReaderImpl, &rowData, &len);
19✔
93
  if (code != 0 || rowData == NULL || len == 0) {
19!
94
    return code;
19✔
95
  }
UNCOV
96
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + len);
×
UNCOV
97
  if (*ppData == NULL) {
×
98
    code = terrno;
×
99
    goto _err;
×
100
  }
101
  // refactor later, avoid mem/free freq
UNCOV
102
  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
×
UNCOV
103
  pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND;
×
UNCOV
104
  pHdr->size = len;
×
UNCOV
105
  memcpy(pHdr->data, rowData, len);
×
UNCOV
106
  taosMemoryFree(rowData);
×
UNCOV
107
  tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
×
UNCOV
108
  return code;
×
109

110
_err:
×
111
  tqError("vgId:%d, vnode stream-state snapshot failed to read since %s", TD_VID(pReader->pTq->pVnode),
×
112
          tstrerror(code));
113
  return code;
×
114
}
115

116
// STqSnapWriter ========================================
117
struct SStreamStateWriter {
118
  STQ*    pTq;
119
  int64_t sver;
120
  int64_t ever;
121
  TXN*    txn;
122

123
  SStreamSnapWriter* pWriterImpl;
124
};
125

UNCOV
126
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter) {
×
UNCOV
127
  int32_t             code = 0;
×
128
  SStreamStateWriter* pWriter;
129

130
  // alloc
UNCOV
131
  pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
×
UNCOV
132
  if (pWriter == NULL) {
×
133
    code = terrno;
×
134
    goto _err;
×
135
  }
UNCOV
136
  pWriter->pTq = pTq;
×
UNCOV
137
  pWriter->sver = sver;
×
UNCOV
138
  pWriter->ever = ever;
×
139

UNCOV
140
  if (taosMkDir(pTq->pStreamMeta->path) != 0) {
×
141
    code = TAOS_SYSTEM_ERROR(errno);
×
142
    tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode),
×
143
            STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(code));
144
    goto _err;
×
145
  }
146

UNCOV
147
  SStreamSnapWriter* pSnapWriter = NULL;
×
UNCOV
148
  if ((code = streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter)) < 0) {
×
149
    goto _err;
×
150
  }
151

UNCOV
152
  tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
×
153
          pTq->pStreamMeta->path);
UNCOV
154
  pWriter->pWriterImpl = pSnapWriter;
×
155

UNCOV
156
  *ppWriter = pWriter;
×
UNCOV
157
  return 0;
×
158

159
_err:
×
160
  tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
×
161
          tstrerror(code));
162
  taosMemoryFree(pWriter);
×
163
  *ppWriter = NULL;
×
164
  return code;
×
165
}
166

UNCOV
167
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
×
UNCOV
168
  tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
UNCOV
169
  return streamSnapWriterClose(pWriter->pWriterImpl, rollback);
×
170
}
171

UNCOV
172
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
173
  tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
UNCOV
174
  return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
175
}
UNCOV
176
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
×
UNCOV
177
  tqDebug("vgId:%d, vnode %s  start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
UNCOV
178
  int32_t code = streamStateLoadTasks(pWriter);
×
UNCOV
179
  tqDebug("vgId:%d, vnode %s  succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
UNCOV
180
  taosMemoryFree(pWriter);
×
UNCOV
181
  return code;
×
182
}
183

UNCOV
184
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
×
UNCOV
185
  streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
×
UNCOV
186
  return TSDB_CODE_SUCCESS;
×
187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc