• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.24
/source/dnode/vnode/src/tq/tqStreamTaskSnap.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "tdbInt.h"
18
#include "tq.h"
19

20
// STqSnapReader ========================================
21

22
typedef struct {
23
  int8_t type;
24
  TTB*   tbl;
25
} STablePair;
26
struct SStreamTaskReader {
27
  STQ*    pTq;
28
  int64_t sver;
29
  int64_t ever;
30
  TBC*    pCur;
31
  SArray* tdbTbList;
32
  int8_t  pos;
33
};
34

35
int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) {
54✔
36
  int32_t            code = 0;
54✔
37
  SStreamTaskReader* pReader = NULL;
54✔
38

39
  // alloc
40
  pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader));
54!
41
  if (pReader == NULL) {
54!
42
    TAOS_CHECK_RETURN(terrno);
×
43
  }
44
  pReader->pTq = pTq;
54✔
45
  pReader->sver = sver;
54✔
46
  pReader->ever = ever;
54✔
47
  pReader->tdbTbList = taosArrayInit(4, sizeof(STablePair));
54✔
48
  if (pReader->tdbTbList == NULL) {
54!
49
    TAOS_CHECK_GOTO(terrno, NULL, _err);
×
50
  }
51

52
  STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK};
54✔
53
  if (NULL == taosArrayPush(pReader->tdbTbList, &pair1)) {
108!
54
    TAOS_CHECK_GOTO(terrno, NULL, _err);
×
55
  }
56

57
  STablePair pair2 = {.tbl = pTq->pStreamMeta->pCheckpointDb, .type = SNAP_DATA_STREAM_TASK_CHECKPOINT};
54✔
58
  if (NULL == taosArrayPush(pReader->tdbTbList, &pair2)) {
108!
59
    TAOS_CHECK_GOTO(terrno, NULL, _err);
×
60
  }
61

62
  pReader->pos = 0;
54✔
63

64
  STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
54✔
65
  code = tdbTbcOpen(pPair->tbl, &pReader->pCur, NULL);
54✔
66
  if (code) {
54!
67
    tqInfo("vgId:%d, vnode stream-task snapshot reader failed to open, reason: %s", TD_VID(pTq->pVnode),
×
68
           tstrerror(code));
69
    TAOS_CHECK_GOTO(code, NULL, _err);
×
70
  }
71

72
  code = tdbTbcMoveToFirst(pReader->pCur);
54✔
73
  if (code) {
54!
74
    tqInfo("vgId:%d, vnode stream-task snapshot reader failed to iterate, reason: %s", TD_VID(pTq->pVnode),
×
75
           tstrerror(code));
76
    TAOS_CHECK_GOTO(code, NULL, _err);
×
77
  }
78

79
  tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode));
54!
80

81
  *ppReader = pReader;
54✔
82
  return code;
54✔
83

84
_err:
×
85
  tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
86
  int32_t ret = streamTaskSnapReaderClose(pReader);
×
87
  *ppReader = NULL;
×
88
  return code;
×
89
}
90

91
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
54✔
92
  if (pReader == NULL) {
54!
93
    return 0;
×
94
  }
95

96
  int32_t code = 0;
54✔
97
  int32_t vgId = TD_VID(pReader->pTq->pVnode);
54✔
98

99
  taosArrayDestroy(pReader->tdbTbList);
54✔
100
  tdbTbcClose(pReader->pCur);
54✔
101
  tqInfo("vgId:%d, vnode stream-task snapshot reader closed", vgId);
54!
102

103
  taosMemoryFree(pReader);
54!
104
  return code;
54✔
105
}
106

107
int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
54✔
108
  int32_t     code = 0;
54✔
109
  const void* pKey = NULL;
54✔
110
  void*       pVal = NULL;
54✔
111
  int32_t     kLen = 0;
54✔
112
  int32_t     vLen = 0;
54✔
113
  SDecoder    decoder;
114
  STqHandle   handle;
115

116
  *ppData = NULL;
54✔
117
  int8_t except = 0;
54✔
118
  tqDebug("vgId:%d, vnode stream-task snapshot start read data", TD_VID(pReader->pTq->pVnode));
54!
119

120
  STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
54✔
121

122
NextTbl:
108✔
123
  except = 0;
108✔
124
  for (;;) {
125
    const void* tVal = NULL;
108✔
126
    int32_t     tLen = 0;
108✔
127
    if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &tVal, &tLen)) {
108!
128
      except = 1;
108✔
129
      break;
108✔
130
    } else {
UNCOV
131
      pVal = taosMemoryCalloc(1, tLen);
×
UNCOV
132
      if (pVal == NULL) {
×
133
        code = terrno;
×
134
        goto _err;
×
135
      }
136

UNCOV
137
      memcpy(pVal, tVal, tLen);
×
UNCOV
138
      vLen = tLen;
×
139
    }
UNCOV
140
    TAOS_UNUSED(tdbTbcMoveToNext(pReader->pCur));
×
UNCOV
141
    break;
×
142
  }
143
  if (except == 1) {
108!
144
    if (pReader->pos + 1 < taosArrayGetSize(pReader->tdbTbList)) {
108✔
145
      tdbTbcClose(pReader->pCur);
54✔
146

147
      pReader->pos += 1;
54✔
148
      pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
54✔
149
      code = tdbTbcOpen(pPair->tbl, &pReader->pCur, NULL);
54✔
150
      TAOS_UNUSED(tdbTbcMoveToFirst(pReader->pCur));
54✔
151

152
      goto NextTbl;
54✔
153
    }
154
  }
155
  if (pVal == NULL || vLen == 0) {
54!
156
    *ppData = NULL;
54✔
157
    tqDebug("vgId:%d, vnode stream-task snapshot finished read data", TD_VID(pReader->pTq->pVnode));
54!
158
    return code;
54✔
159
  }
UNCOV
160
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
×
UNCOV
161
  if (*ppData == NULL) {
×
162
    code = terrno;
×
163
    goto _err;
×
164
  }
165

UNCOV
166
  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
×
UNCOV
167
  pHdr->type = pPair->type;
×
UNCOV
168
  pHdr->size = vLen;
×
UNCOV
169
  memcpy(pHdr->data, pVal, vLen);
×
UNCOV
170
  taosMemoryFree(pVal);
×
171

UNCOV
172
  tqDebug("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
×
UNCOV
173
  return code;
×
174

175
_err:
×
176
  tqError("vgId:%d, vnode stream-task snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode),
×
177
          tstrerror(code));
178
  return code;
×
179
}
180

181
// STqSnapWriter ========================================
182
struct SStreamTaskWriter {
183
  STQ*    pTq;
184
  int64_t sver;
185
  int64_t ever;
186
};
187

UNCOV
188
int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) {
×
UNCOV
189
  int32_t            code = 0;
×
190
  SStreamTaskWriter* pWriter;
191

192
  // alloc
UNCOV
193
  pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
×
UNCOV
194
  if (pWriter == NULL) {
×
195
    TAOS_CHECK_RETURN(terrno);
×
196
  }
UNCOV
197
  pWriter->pTq = pTq;
×
UNCOV
198
  pWriter->sver = sver;
×
UNCOV
199
  pWriter->ever = ever;
×
200

UNCOV
201
  *ppWriter = pWriter;
×
UNCOV
202
  tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode));
×
UNCOV
203
  return code;
×
204
}
205

UNCOV
206
int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback, int8_t loadTask) {
×
UNCOV
207
  int32_t code = 0;
×
UNCOV
208
  STQ*    pTq = pWriter->pTq;
×
209

UNCOV
210
  streamMetaWLock(pTq->pStreamMeta);
×
UNCOV
211
  tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode));
×
UNCOV
212
  if (rollback) {
×
213
    TAOS_UNUSED(tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn));
×
214
  } else {
UNCOV
215
    code = tdbCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
×
UNCOV
216
    if (code) goto _err;
×
UNCOV
217
    code = tdbPostCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
×
UNCOV
218
    if (code) goto _err;
×
219
  }
220

UNCOV
221
  if ((code = tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0)) < 0) {
×
222
    taosMemoryFree(pWriter);
×
223
    goto _err;
×
224
  }
UNCOV
225
  streamMetaWUnLock(pTq->pStreamMeta);
×
UNCOV
226
  taosMemoryFree(pWriter);
×
227

UNCOV
228
  if (loadTask == 1) {
×
UNCOV
229
    streamMetaLoadAllTasks(pTq->pStreamMeta);
×
230
  }
UNCOV
231
  return code;
×
232

233
_err:
×
234
  tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode),
×
235
          tstrerror(code));
236
  streamMetaWUnLock(pTq->pStreamMeta);
×
237
  return code;
×
238
}
239

UNCOV
240
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
241
  int32_t       code = 0;
×
UNCOV
242
  STQ*          pTq = pWriter->pTq;
×
UNCOV
243
  SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
×
UNCOV
244
  if (pHdr->type == SNAP_DATA_STREAM_TASK) {
×
UNCOV
245
    STaskId taskId = {0};
×
246

247
    SDecoder decoder;
UNCOV
248
    tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
UNCOV
249
    code = tDecodeStreamTaskId(&decoder, &taskId);
×
UNCOV
250
    if (code < 0) {
×
251
      tDecoderClear(&decoder);
×
252
      goto _err;
×
253
    }
UNCOV
254
    tDecoderClear(&decoder);
×
255

UNCOV
256
    int64_t key[2] = {taskId.streamId, taskId.taskId};
×
UNCOV
257
    streamMetaWLock(pTq->pStreamMeta);
×
UNCOV
258
    if ((code =
×
UNCOV
259
             tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
×
UNCOV
260
                         nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn)) < 0) {
×
261
      streamMetaWUnLock(pTq->pStreamMeta);
×
262
      return code;
×
263
    }
UNCOV
264
    streamMetaWUnLock(pTq->pStreamMeta);
×
265
  } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
×
266
    // do nothing
267
  }
UNCOV
268
  tqDebug("vgId:%d, vnode stream-task snapshot write", TD_VID(pTq->pVnode));
×
269

UNCOV
270
  return code;
×
271

272
_err:
×
273
  tqError("vgId:%d, vnode stream-task snapshot failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code));
×
274
  return code;
×
275
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc