• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tq/tqStreamStateSnap.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "streamSnapshot.h"
18
#include "tdbInt.h"
19
#include "tq.h"
20

21
// STqSnapReader ========================================
22
struct SStreamStateReader {
23
  STQ*    pTq;
24
  int64_t sver;
25
  int64_t ever;
26
  TBC*    pCur;
27

28
  SStreamSnapReader* pReaderImpl;
29
  int32_t            complete;  // open reader or not
30
};
31

UNCOV
32
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) {
×
UNCOV
33
  int32_t             code = 0;
×
UNCOV
34
  SStreamStateReader* pReader = NULL;
×
35

UNCOV
36
  char tdir[TSDB_FILENAME_LEN * 2] = {0};
×
37

38
  // alloc
UNCOV
39
  pReader = (SStreamStateReader*)taosMemoryCalloc(1, sizeof(SStreamStateReader));
×
UNCOV
40
  if (pReader == NULL) {
×
41
    code = terrno;
×
42
    goto _err;
×
43
  }
44

UNCOV
45
  SStreamMeta* meta = pTq->pStreamMeta;
×
UNCOV
46
  pReader->pTq = pTq;
×
UNCOV
47
  pReader->sver = sver;
×
UNCOV
48
  pReader->ever = ever;
×
49

UNCOV
50
  int64_t chkpId = meta ? meta->chkpId : 0;
×
51

UNCOV
52
  SStreamSnapReader* pSnapReader = NULL;
×
53

UNCOV
54
  if ((code = streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader)) == 0) {
×
UNCOV
55
    pReader->complete = 1;
×
56
  } else {
57
    taosMemoryFree(pReader);
×
58
    goto _err;
×
59
  }
UNCOV
60
  pReader->pReaderImpl = pSnapReader;
×
61

UNCOV
62
  tqDebug("vgId:%d, vnode %s snapshot reader opened", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER);
×
63

UNCOV
64
  *ppReader = pReader;
×
UNCOV
65
  return code;
×
66

67
_err:
×
68
  tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
×
69
          tstrerror(code));
70
  *ppReader = NULL;
×
71
  return code;
×
72
}
73

UNCOV
74
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
×
UNCOV
75
  int32_t code = 0;
×
UNCOV
76
  tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
×
UNCOV
77
  code = streamSnapReaderClose(pReader->pReaderImpl);
×
UNCOV
78
  taosMemoryFree(pReader);
×
UNCOV
79
  return code;
×
80
}
81

UNCOV
82
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
×
UNCOV
83
  tqDebug("vgId:%d, vnode %s snapshot read data", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
×
84

UNCOV
85
  int32_t code = 0;
×
UNCOV
86
  if (pReader->complete == 0) {
×
87
    return 0;
×
88
  }
89

UNCOV
90
  uint8_t* rowData = NULL;
×
91
  int64_t  len;
UNCOV
92
  code = streamSnapRead(pReader->pReaderImpl, &rowData, &len);
×
UNCOV
93
  if (code != 0 || rowData == NULL || len == 0) {
×
UNCOV
94
    return code;
×
95
  }
UNCOV
96
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + len);
×
UNCOV
97
  if (*ppData == NULL) {
×
98
    code = terrno;
×
99
    goto _err;
×
100
  }
101
  // refactor later, avoid mem/free freq
UNCOV
102
  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
×
UNCOV
103
  pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND;
×
UNCOV
104
  pHdr->size = len;
×
UNCOV
105
  memcpy(pHdr->data, rowData, len);
×
UNCOV
106
  taosMemoryFree(rowData);
×
UNCOV
107
  tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
×
UNCOV
108
  return code;
×
109

110
_err:
×
111
  tqError("vgId:%d, vnode stream-state snapshot failed to read since %s", TD_VID(pReader->pTq->pVnode),
×
112
          tstrerror(code));
113
  return code;
×
114
}
115

116
// STqSnapWriter ========================================
117
struct SStreamStateWriter {
118
  STQ*    pTq;
119
  int64_t sver;
120
  int64_t ever;
121
  TXN*    txn;
122

123
  SStreamSnapWriter* pWriterImpl;
124
};
125

UNCOV
126
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter) {
×
UNCOV
127
  int32_t             code = 0;
×
128
  SStreamStateWriter* pWriter;
129

130
  // alloc
UNCOV
131
  pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
×
UNCOV
132
  if (pWriter == NULL) {
×
133
    code = terrno;
×
134
    goto _err;
×
135
  }
UNCOV
136
  pWriter->pTq = pTq;
×
UNCOV
137
  pWriter->sver = sver;
×
UNCOV
138
  pWriter->ever = ever;
×
139

UNCOV
140
  if (taosMkDir(pTq->pStreamMeta->path) != 0) {
×
141
    code = TAOS_SYSTEM_ERROR(errno);
×
142
    tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode),
×
143
            STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(code));
144
    goto _err;
×
145
  }
146

UNCOV
147
  SStreamSnapWriter* pSnapWriter = NULL;
×
UNCOV
148
  if ((code = streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter)) < 0) {
×
149
    goto _err;
×
150
  }
151

UNCOV
152
  tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
×
153
          pTq->pStreamMeta->path);
UNCOV
154
  pWriter->pWriterImpl = pSnapWriter;
×
155

UNCOV
156
  *ppWriter = pWriter;
×
UNCOV
157
  return 0;
×
158

159
_err:
×
160
  tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
×
161
          tstrerror(code));
162
  taosMemoryFree(pWriter);
×
163
  *ppWriter = NULL;
×
164
  return code;
×
165
}
166

UNCOV
167
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
×
UNCOV
168
  tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
UNCOV
169
  return streamSnapWriterClose(pWriter->pWriterImpl, rollback);
×
170
}
171

UNCOV
172
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
173
  tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
UNCOV
174
  return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
175
}
UNCOV
176
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
×
UNCOV
177
  tqDebug("vgId:%d, vnode %s  start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
UNCOV
178
  int32_t code = streamStateLoadTasks(pWriter);
×
UNCOV
179
  tqDebug("vgId:%d, vnode %s  succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
×
UNCOV
180
  taosMemoryFree(pWriter);
×
UNCOV
181
  return code;
×
182
}
183

UNCOV
184
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
×
UNCOV
185
  streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
×
UNCOV
186
  return TSDB_CODE_SUCCESS;
×
187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc