• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.31
/source/dnode/vnode/src/tq/tqMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "tdbInt.h"
16
#include "tq.h"
17

18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
6,127✔
19
  if (pEncoder == NULL || pHandle == NULL) {
6,127!
20
    return TSDB_CODE_INVALID_PARA;
×
21
  }
22
  int32_t code = 0;
6,131✔
23
  int32_t lino;
24

25
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
6,131!
26
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->subKey));
12,260!
27
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->fetchMeta));
12,260!
28
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->consumerId));
12,260!
29
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->snapshotVer));
12,260!
30
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pHandle->epoch));
12,260!
31
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->execHandle.subType));
12,260!
32
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
6,130✔
33
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg));
9,726!
34
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
1,267✔
35
    int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
1,056✔
36
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
1,056!
37
    void* pIter = NULL;
1,056✔
38
    pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
1,056✔
39
    while (pIter) {
1,056!
40
      int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
×
41
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *tbUid));
×
42
      pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
×
43
    }
44
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
211!
45
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid));
424!
46
    if (pHandle->execHandle.execTb.qmsg != NULL) {
212!
47
      TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg));
424!
48
    }
49
  }
50
  tEndEncode(pEncoder);
6,130✔
51
_exit:
6,130✔
52
  if (code) {
6,130!
53
    return code;
×
54
  } else {
55
    return pEncoder->pos;
6,130✔
56
  }
57
}
58

59
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
2,748✔
60
  if (pDecoder == NULL || pHandle == NULL) {
2,748!
61
    return TSDB_CODE_INVALID_PARA;
×
62
  }
63
  int32_t code = 0;
2,748✔
64
  int32_t lino;
65

66
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
2,748!
67
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pHandle->subKey));
2,748!
68
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->fetchMeta));
5,496!
69
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->consumerId));
5,496!
70
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->snapshotVer));
5,496!
71
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pHandle->epoch));
5,496!
72
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->execHandle.subType));
5,496!
73
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,748✔
74
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg));
5,410!
75
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
43✔
76
    pHandle->execHandle.execDb.pFilterOutTbUid =
15✔
77
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
15✔
78
    if (pHandle->execHandle.execDb.pFilterOutTbUid == NULL) {
15!
79
      TAOS_CHECK_EXIT(terrno);
×
80
    }
81
    int32_t size = 0;
15✔
82
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
15!
83
    for (int32_t i = 0; i < size; i++) {
15!
84
      int64_t tbUid = 0;
×
85
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &tbUid));
×
86
      TAOS_CHECK_EXIT(taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0));
×
87
    }
88
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
28!
89
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid));
56!
90
    if (!tDecodeIsEnd(pDecoder)) {
28!
91
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg));
56!
92
    }
93
  }
94
  tEndDecode(pDecoder);
2,748✔
95

96
_exit:
2,748✔
97
  return code;
2,748✔
98
}
99

100
int32_t tqMetaDecodeCheckInfo(STqCheckInfo* info, void* pVal, uint32_t vLen) {
209✔
101
  if (info == NULL || pVal == NULL) {
209!
102
    return TSDB_CODE_INVALID_PARA;
×
103
  }
104
  SDecoder decoder = {0};
210✔
105
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
210✔
106
  int32_t code = tDecodeSTqCheckInfo(&decoder, info);
210✔
107
  tDecoderClear(&decoder);
210✔
108

109
  if (code != 0) {
210!
110
    tDeleteSTqCheckInfo(info);
×
111
    return TSDB_CODE_OUT_OF_MEMORY;
×
112
  }
113
  return code;
210✔
114
}
115

116
int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, uint32_t vLen) {
181✔
117
  if (info == NULL || pVal == NULL) {
181!
118
    return TSDB_CODE_INVALID_PARA;
×
119
  }
120
  SDecoder decoder = {0};
181✔
121
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
181✔
122
  int32_t code = tDecodeSTqOffset(&decoder, info);
181✔
123
  tDecoderClear(&decoder);
181✔
124

125
  if (code != 0) {
181!
126
    tDeleteSTqOffset(info);
×
127
    return TSDB_CODE_OUT_OF_MEMORY;
×
128
  }
129
  return code;
181✔
130
}
131

132
int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
1✔
133
  if (pTq == NULL || pOffset == NULL) {
1!
134
    return TSDB_CODE_INVALID_PARA;
×
135
  }
136
  void*    buf = NULL;
1✔
137
  int32_t  code = TDB_CODE_SUCCESS;
1✔
138
  uint32_t  vlen;
139
  SEncoder encoder = {0};
1✔
140
  tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
1!
141
  if (code < 0) {
1!
142
    goto END;
×
143
  }
144

145
  buf = taosMemoryCalloc(1, vlen);
1!
146
  if (buf == NULL) {
1!
147
    code = terrno;
×
148
    goto END;
×
149
  }
150

151
  tEncoderInit(&encoder, buf, vlen);
1✔
152
  code = tEncodeSTqOffset(&encoder, pOffset);
1✔
153
  if (code < 0) {
1!
154
    goto END;
×
155
  }
156

157
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), buf, vlen));
1!
158

159
END:
1✔
160
  tEncoderClear(&encoder);
1✔
161
  taosMemoryFree(buf);
1!
162
  return code;
1✔
163
}
164

165
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen, const void* value, uint32_t vLen) {
15,689✔
166
  if (pTq == NULL || ttb == NULL || key == NULL || value == NULL) {
15,689!
167
    return TSDB_CODE_INVALID_PARA;
×
168
  }
169
  int32_t code = TDB_CODE_SUCCESS;
15,689✔
170
  TXN*    txn = NULL;
15,689✔
171

172
  TQ_ERR_GO_TO_END(
15,689!
173
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
174
  TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
15,669!
175
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
15,675!
176
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
15,685!
177

178
  return 0;
15,684✔
179

180
END:
×
181
  tdbAbort(pTq->pMetaDB, txn);
×
182
  return code;
×
183
}
184

185
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen) {
1,864✔
186
  if (pTq == NULL || ttb == NULL || key == NULL) {
1,864!
187
    return TSDB_CODE_INVALID_PARA;
×
188
  }
189
  int32_t code = TDB_CODE_SUCCESS;
1,864✔
190
  TXN*    txn = NULL;
1,864✔
191

192
  TQ_ERR_GO_TO_END(
1,864!
193
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
194
  TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
1,861✔
195
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
1,606!
196
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
1,610!
197

198
  return 0;
1,609✔
199

200
END:
254✔
201
  tdbAbort(pTq->pMetaDB, txn);
254✔
202
  return code;
254✔
203
}
204

205
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) {
318,197✔
206
  if (pTq == NULL || subkey == NULL || pOffset == NULL) {
318,197!
207
    return TSDB_CODE_INVALID_PARA;
×
208
  }
209
  void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
318,197✔
210
  if (data == NULL) {
318,196✔
211
    int vLen = 0;
306,707✔
212
    if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) {
306,707✔
213
      tdbFree(data);
306,559✔
214
      return TSDB_CODE_OUT_OF_MEMORY;
306,555✔
215
    }
216

217
    STqOffset offset = {0};
143✔
218
    if (tqMetaDecodeOffsetInfo(&offset, data, vLen >= 0 ? vLen : 0) != TDB_CODE_SUCCESS) {
143!
219
      tdbFree(data);
×
220
      return TSDB_CODE_OUT_OF_MEMORY;
×
221
    }
222

223
    if (taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0) {
143!
224
      tDeleteSTqOffset(&offset);
×
225
      tdbFree(data);
×
226
      return terrno;
×
227
    }
228
    tdbFree(data);
143✔
229

230
    *pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
143✔
231
    if (*pOffset == NULL) {
143!
232
      return TSDB_CODE_OUT_OF_MEMORY;
×
233
    }
234
  } else {
235
    *pOffset = data;
11,489✔
236
  }
237
  return 0;
11,632✔
238
}
239

240
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
3,065✔
241
  if (pTq == NULL || key == NULL || pHandle == NULL) {
3,065!
UNCOV
242
    return TSDB_CODE_INVALID_PARA;
×
243
  }
244
  int32_t  code = TDB_CODE_SUCCESS;
3,065✔
245
  uint32_t  vlen;
246
  void*    buf = NULL;
3,065✔
247
  SEncoder encoder = {0};
3,065✔
248
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
3,065!
249
  if (code < 0) {
3,064!
250
    goto END;
×
251
  }
252

253
  tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
3,064✔
254
          (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
255

256
  buf = taosMemoryCalloc(1, vlen);
3,066!
257
  if (buf == NULL) {
3,066!
258
    code = terrno;
×
259
    goto END;
×
260
  }
261

262
  tEncoderInit(&encoder, buf, vlen);
3,066✔
263
  code = tEncodeSTqHandle(&encoder, pHandle);
3,066✔
264
  if (code < 0) {
3,066!
265
    goto END;
×
266
  }
267

268
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pExecStore, key, strlen(key), buf, vlen));
3,066!
269

270
END:
3,064✔
271
  tEncoderClear(&encoder);
3,064✔
272
  taosMemoryFree(buf);
3,064!
273
  return code;
3,066✔
274
}
275

276
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
4,154✔
277
  if (pTq == NULL || handle == NULL) {
4,154!
278
    return TSDB_CODE_INVALID_PARA;
×
279
  }
280
  int32_t code = TDB_CODE_SUCCESS;
4,154✔
281

282
  SVnode* pVnode = pTq->pVnode;
4,154✔
283
  int32_t vgId = TD_VID(pVnode);
4,154✔
284

285
  handle->pRef = walOpenRef(pVnode->pWal);
4,154✔
286

287
  TQ_NULL_GO_TO_END(handle->pRef);
4,154!
288
  TQ_ERR_GO_TO_END(walSetRefVer(handle->pRef, handle->snapshotVer));
4,154✔
289

290
  SReadHandle reader = {
1,552✔
291
      .vnode = pVnode,
292
      .initTableReader = true,
293
      .initTqReader = true,
294
      .version = handle->snapshotVer,
1,552✔
295
  };
296

297
  initStorageAPI(&reader.api);
1,552✔
298

299
  if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,552✔
300
    handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId,
2,485✔
301
                                                       &handle->execHandle.numOfCols, handle->consumerId);
1,244✔
302
    TQ_NULL_GO_TO_END(handle->execHandle.task);
1,241!
303
    void* scanner = NULL;
1,241✔
304
    qExtractStreamScanner(handle->execHandle.task, &scanner);
1,241✔
305
    TQ_NULL_GO_TO_END(scanner);
1,243!
306
    handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
1,243✔
307
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
1,242!
308
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
308✔
309
    handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
244✔
310
    TQ_NULL_GO_TO_END(handle->pWalReader);
244!
311
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
244✔
312
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
244!
313
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
244!
314
                                      (SSnapContext**)(&reader.sContext)));
315
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
244✔
316
    TQ_NULL_GO_TO_END(handle->execHandle.task);
244!
317
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
64!
318
    handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
64✔
319
    TQ_NULL_GO_TO_END(handle->pWalReader);
64!
320
    if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
64!
321
      if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
21!
322
        tqError("nodesStringToNode error in sub stable, since %s", terrstr());
×
323
        return TSDB_CODE_SCH_INTERNAL_ERROR;
×
324
      }
325
    }
326
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid,
64!
327
                                      handle->execHandle.subType, handle->fetchMeta,
328
                                      (SSnapContext**)(&reader.sContext)));
329
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
64✔
330
    TQ_NULL_GO_TO_END(handle->execHandle.task);
64!
331
    SArray* tbUidList = NULL;
64✔
332
    int     ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList,
64✔
333
                                handle->execHandle.task);
334
    if (ret != TDB_CODE_SUCCESS) {
64!
335
      tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId);
×
336
      taosArrayDestroy(tbUidList);
×
337
      return TSDB_CODE_SCH_INTERNAL_ERROR;
×
338
    }
339
    tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId,
64!
340
           handle->execHandle.execTb.suid);
341
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
64✔
342
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
64!
343
    tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
64✔
344
    taosArrayDestroy(tbUidList);
64✔
345
  }
346
  handle->tableCreateTimeHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1,550✔
347

348
END:
4,154✔
349
  return code;
4,154✔
350
}
351

352
static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, uint32_t vLen, STqHandle* handle) {
2,711✔
353
  if (pTq == NULL || pVal == NULL || handle == NULL) {
2,711!
354
    return TSDB_CODE_INVALID_PARA;
×
355
  }
356
  int32_t  vgId = TD_VID(pTq->pVnode);
2,711✔
357
  SDecoder decoder = {0};
2,711✔
358
  int32_t  code = TDB_CODE_SUCCESS;
2,711✔
359

360
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
2,711✔
361
  TQ_ERR_GO_TO_END(tDecodeSTqHandle(&decoder, handle));
2,711!
362
  TQ_ERR_GO_TO_END(tqMetaInitHandle(pTq, handle));
2,711✔
363
  tqInfo("tqMetaRestoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
109!
364
  code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
109✔
365

366
END:
2,711✔
367
  tDecoderClear(&decoder);
2,711✔
368
  return code;
2,711✔
369
}
370

371
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
1,443✔
372
  if (pTq == NULL || req == NULL || handle == NULL) {
1,443!
373
    return TSDB_CODE_INVALID_PARA;
×
374
  }
375
  int32_t vgId = TD_VID(pTq->pVnode);
1,443✔
376

377
  (void)memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
1,443✔
378
  handle->consumerId = req->newConsumerId;
1,443✔
379

380
  handle->execHandle.subType = req->subType;
1,443✔
381
  handle->fetchMeta = req->withMeta;
1,443✔
382
  if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
1,443✔
383
    void* tmp = taosStrdup(req->qmsg);
1,148!
384
    if (tmp == NULL) {
1,148!
385
      return terrno;
×
386
    }
387
    handle->execHandle.execCol.qmsg = tmp;
1,148✔
388
  } else if (req->subType == TOPIC_SUB_TYPE__DB) {
295✔
389
    handle->execHandle.execDb.pFilterOutTbUid =
239✔
390
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
239✔
391
    if (handle->execHandle.execDb.pFilterOutTbUid == NULL) {
239!
392
      return terrno;
×
393
    }
394
  } else if (req->subType == TOPIC_SUB_TYPE__TABLE) {
56!
395
    handle->execHandle.execTb.suid = req->suid;
56✔
396
    void* tmp = taosStrdup(req->qmsg);
56!
397
    if (tmp == NULL) {
56!
398
      return terrno;
×
399
    }
400
    handle->execHandle.execTb.qmsg = tmp;
56✔
401
  }
402

403
  handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
1,443✔
404

405
  int32_t code = tqMetaInitHandle(pTq, handle);
1,443✔
406
  if (code != 0) {
1,441!
407
    return code;
×
408
  }
409
  tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
1,441!
410
         handle->consumerId, vgId, handle->snapshotVer);
411
  return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
1,443✔
412
}
413

414
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
×
415
  if (pMetaDB == NULL || pOld == NULL || pNew == NULL) {
×
416
    return TSDB_CODE_INVALID_PARA;
×
417
  }
418
  TBC*  pCur = NULL;
×
419
  void* pKey = NULL;
×
420
  int   kLen = 0;
×
421
  void* pVal = NULL;
×
422
  int   vLen = 0;
×
423
  TXN*  txn = NULL;
×
424

425
  int32_t code = TDB_CODE_SUCCESS;
×
426

427
  TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL));
×
428
  TQ_ERR_GO_TO_END(
×
429
      tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
430

431
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
×
432
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
433
    TQ_ERR_GO_TO_END(tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn));
×
434
  }
435

436
  TQ_ERR_GO_TO_END(tdbCommit(pMetaDB, txn));
×
437
  TQ_ERR_GO_TO_END(tdbPostCommit(pMetaDB, txn));
×
438

439
END:
×
440
  tdbFree(pKey);
×
441
  tdbFree(pVal);
×
442
  tdbTbcClose(pCur);
×
443
  return code;
×
444
}
445

446
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
3,180,394✔
447
  if (pTq == NULL || key == NULL || pHandle == NULL) {
3,180,394!
448
    return TSDB_CODE_INVALID_PARA;
×
449
  }
450
  void* data = taosHashGet(pTq->pHandle, key, strlen(key));
3,181,912✔
451
  if (data == NULL) {
3,181,972✔
452
    int vLen = 0;
6,767✔
453
    if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
6,767✔
454
      tdbFree(data);
1,393✔
455
      return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
3,994✔
456
    }
457
    STqHandle handle = {0};
2,710✔
458
    if (tqMetaRestoreHandle(pTq, data, vLen >= 0 ? vLen : 0, &handle) != 0) {
2,710✔
459
      tdbFree(data);
2,602✔
460
      tqDestroyTqHandle(&handle);
2,602✔
461
      return TSDB_CODE_OUT_OF_MEMORY;
2,602✔
462
    }
463
    tdbFree(data);
109✔
464
    *pHandle = taosHashGet(pTq->pHandle, key, strlen(key));
109✔
465
    if (*pHandle == NULL) {
109!
466
      return TSDB_CODE_OUT_OF_MEMORY;
×
467
    }
468
  } else {
469
    *pHandle = data;
3,175,205✔
470
  }
471
  return TDB_CODE_SUCCESS;
3,175,314✔
472
}
473

474
int32_t tqMetaOpenTdb(STQ* pTq) {
11,905✔
475
  if (pTq == NULL) {
11,905!
476
    return TSDB_CODE_INVALID_PARA;
×
477
  }
478
  int32_t code = TDB_CODE_SUCCESS;
11,905✔
479
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL));
11,905!
480
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0));
11,906!
481
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0));
11,905!
482
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0));
11,907!
483

484
END:
11,905✔
485
  return code;
11,905✔
486
}
487

488
static int32_t replaceTqPath(char** path) {
11,906✔
489
  if (path == NULL || *path == NULL) {
11,906!
490
    return TSDB_CODE_INVALID_PARA;
×
491
  }
492
  char*   tpath = NULL;
11,907✔
493
  int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME);
11,907✔
494
  if (code != 0) {
11,906!
495
    return code;
×
496
  }
497
  taosMemoryFree(*path);
11,906!
498
  *path = tpath;
11,906✔
499
  return TDB_CODE_SUCCESS;
11,906✔
500
}
501

502
static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
11,903✔
503
  if (pTq == NULL) {
11,903!
504
    return TSDB_CODE_INVALID_PARA;
×
505
  }
506
  TBC*         pCur = NULL;
11,903✔
507
  void*        pKey = NULL;
11,903✔
508
  int          kLen = 0;
11,903✔
509
  void*        pVal = NULL;
11,903✔
510
  int          vLen = 0;
11,903✔
511
  int32_t      code = 0;
11,903✔
512
  STqCheckInfo info = {0};
11,903✔
513

514
  TQ_ERR_GO_TO_END(tdbTbcOpen(pTq->pCheckStore, &pCur, NULL));
11,903!
515
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
11,902!
516

517
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
11,986✔
518
    TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen >= 0 ? vLen : 0));
85!
519
    TQ_ERR_GO_TO_END(taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)));
85!
520
  }
521
  info.colIdList = NULL;
11,905✔
522

523
END:
11,905✔
524
  tdbFree(pKey);
11,905✔
525
  tdbFree(pVal);
11,900✔
526
  tdbTbcClose(pCur);
11,903✔
527
  tDeleteSTqCheckInfo(&info);
11,906✔
528
  return code;
11,902✔
529
}
530

531
int32_t tqMetaOpen(STQ* pTq) {
11,895✔
532
  if (pTq == NULL) {
11,895!
533
    return TSDB_CODE_INVALID_PARA;
×
534
  }
535
  char*   maindb = NULL;
11,895✔
536
  char*   offsetNew = NULL;
11,895✔
537
  int32_t code = TDB_CODE_SUCCESS;
11,895✔
538
  TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
11,895!
539
  if (!taosCheckExistFile(maindb)) {
11,906!
540
    TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
11,906!
541
    TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
11,907!
542
  } else {
543
    TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
×
544
    TQ_ERR_GO_TO_END(taosRemoveFile(maindb));
×
545
  }
546

547
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
11,905!
548
  if (taosCheckExistFile(offsetNew)) {
11,906!
549
    TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
×
550
    TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
×
551
  }
552

553
  TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
11,905!
554

555
END:
11,901✔
556
  taosMemoryFree(maindb);
11,901!
557
  taosMemoryFree(offsetNew);
11,904!
558
  return code;
11,906✔
559
}
560

561
int32_t tqMetaTransform(STQ* pTq) {
×
562
  if (pTq == NULL) {
×
563
    return TSDB_CODE_INVALID_PARA;
×
564
  }
565
  int32_t code = TDB_CODE_SUCCESS;
×
566
  TDB*    pMetaDB = NULL;
×
567
  TTB*    pExecStore = NULL;
×
568
  TTB*    pCheckStore = NULL;
×
569
  char*   offsetNew = NULL;
×
570
  char*   offset = NULL;
×
571
  TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME));
×
572

573
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL));
×
574
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0));
×
575
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0));
×
576

577
  TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
×
578
  TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
×
579

580
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
×
581
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
×
582

583
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
×
584

585
  if (taosCheckExistFile(offset)) {
×
586
    if (taosCopyFile(offset, offsetNew) < 0) {
×
587
      tqError("copy offset file error");
×
588
    } else {
589
      TQ_ERR_GO_TO_END(taosRemoveFile(offset));
×
590
    }
591
  }
592

593
END:
×
594
  taosMemoryFree(offset);
×
595
  taosMemoryFree(offsetNew);
×
596

597
  tdbTbClose(pExecStore);
×
598
  tdbTbClose(pCheckStore);
×
599
  tdbClose(pMetaDB);
×
600
  return code;
×
601
}
602

603
void tqMetaClose(STQ* pTq) {
11,906✔
604
  if (pTq == NULL) {
11,906!
605
    return;
×
606
  }
607
  int32_t ret = 0;
11,906✔
608
  if (pTq->pExecStore) {
11,906!
609
    tdbTbClose(pTq->pExecStore);
11,907✔
610
  }
611
  if (pTq->pCheckStore) {
11,906!
612
    tdbTbClose(pTq->pCheckStore);
11,907✔
613
  }
614
  if (pTq->pOffsetStore) {
11,906!
615
    tdbTbClose(pTq->pOffsetStore);
11,908✔
616
  }
617
  tdbClose(pTq->pMetaDB);
11,906✔
618
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc