• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4837

07 Nov 2025 09:40AM UTC coverage: 58.963% (+0.2%) from 58.728%
#4837

push

travis-ci

DuanKuanJun
coverity: cases_other.task add -R -Q2 -Q3 -Q4

150245 of 324452 branches covered (46.31%)

Branch coverage included in aggregate %.

200054 of 269646 relevant lines covered (74.19%)

317833830.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.57
/source/dnode/vnode/src/tq/tqMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "tdbInt.h"
16
#include "tq.h"
17

18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
19,410,090✔
19
  if (pEncoder == NULL || pHandle == NULL) {
19,410,090!
20
    return TSDB_CODE_INVALID_PARA;
×
21
  }
22
  int32_t code = 0;
19,424,518✔
23
  int32_t lino;
24

25
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
19,424,518!
26
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->subKey));
38,833,476!
27
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->fetchMeta));
38,829,063!
28
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->consumerId));
38,818,291!
29
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->snapshotVer));
38,818,312!
30
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pHandle->epoch));
38,823,935!
31
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->execHandle.subType));
38,822,873!
32
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
19,413,132✔
33
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg));
31,311,778!
34
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
3,754,548✔
35
    int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
3,224,235✔
36
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
3,221,378!
37
    void* pIter = NULL;
3,221,378✔
38
    pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
3,221,378✔
39
    while (pIter) {
3,222,034!
40
      int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
×
41
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *tbUid));
×
42
      pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
×
43
    }
44
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
536,144!
45
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid));
1,072,288!
46
    if (pHandle->execHandle.execTb.qmsg != NULL) {
536,144!
47
      TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg));
1,072,288!
48
    }
49
  }
50
  tEndEncode(pEncoder);
19,412,589✔
51
_exit:
19,413,792✔
52
  if (code) {
19,413,792!
53
    return code;
×
54
  } else {
55
    return pEncoder->pos;
19,413,792✔
56
  }
57
}
58

59
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
631,417✔
60
  if (pDecoder == NULL || pHandle == NULL) {
631,417!
61
    return TSDB_CODE_INVALID_PARA;
×
62
  }
63
  int32_t code = 0;
632,514✔
64
  int32_t lino;
65

66
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
632,514!
67
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pHandle->subKey));
632,514!
68
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->fetchMeta));
1,265,028!
69
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->consumerId));
1,265,028!
70
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->snapshotVer));
1,265,028!
71
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pHandle->epoch));
1,265,028!
72
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->execHandle.subType));
1,263,931!
73
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
631,417✔
74
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg));
936,680!
75
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
164,174✔
76
    pHandle->execHandle.execDb.pFilterOutTbUid =
83,828✔
77
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
83,828✔
78
    if (pHandle->execHandle.execDb.pFilterOutTbUid == NULL) {
83,828!
79
      TAOS_CHECK_EXIT(terrno);
×
80
    }
81
    int32_t size = 0;
83,828✔
82
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
83,828!
83
    for (int32_t i = 0; i < size; i++) {
83,828!
84
      int64_t tbUid = 0;
×
85
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &tbUid));
×
86
      TAOS_CHECK_EXIT(taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0));
×
87
    }
88
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
80,346!
89
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid));
160,692!
90
    if (!tDecodeIsEnd(pDecoder)) {
80,346!
91
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg));
160,692!
92
    }
93
  }
94
  tEndDecode(pDecoder);
632,514✔
95

96
_exit:
632,514✔
97
  return code;
632,514✔
98
}
99

100
int32_t tqMetaDecodeCheckInfo(STqCheckInfo* info, void* pVal, uint32_t vLen) {
440,534✔
101
  if (info == NULL || pVal == NULL) {
440,534!
102
    return TSDB_CODE_INVALID_PARA;
×
103
  }
104
  SDecoder decoder = {0};
440,534✔
105
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
440,534✔
106
  int32_t code = tDecodeSTqCheckInfo(&decoder, info);
440,534✔
107
  tDecoderClear(&decoder);
440,534✔
108

109
  if (code != 0) {
440,534!
110
    tDeleteSTqCheckInfo(info);
×
111
    return TSDB_CODE_OUT_OF_MEMORY;
×
112
  }
113
  return code;
440,534✔
114
}
115

116
int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, uint32_t vLen) {
323,549✔
117
  if (info == NULL || pVal == NULL) {
323,549!
118
    return TSDB_CODE_INVALID_PARA;
×
119
  }
120
  SDecoder decoder = {0};
323,549✔
121
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
323,549✔
122
  int32_t code = tDecodeSTqOffset(&decoder, info);
323,549✔
123
  tDecoderClear(&decoder);
323,549✔
124

125
  if (code != 0) {
323,549!
126
    tDeleteSTqOffset(info);
×
127
    return TSDB_CODE_OUT_OF_MEMORY;
×
128
  }
129
  return code;
323,549✔
130
}
131

132
int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
1,332,503✔
133
  if (pTq == NULL || pOffset == NULL) {
1,332,503!
134
    return TSDB_CODE_INVALID_PARA;
×
135
  }
136
  void*    buf = NULL;
1,333,597✔
137
  int32_t  code = TDB_CODE_SUCCESS;
1,333,597✔
138
  uint32_t  vlen;
139
  SEncoder encoder = {0};
1,333,597✔
140
  tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
1,333,597!
141
  if (code < 0) {
1,331,843!
142
    goto END;
×
143
  }
144

145
  buf = taosMemoryCalloc(1, vlen);
1,331,843!
146
  if (buf == NULL) {
1,332,503!
147
    code = terrno;
×
148
    goto END;
×
149
  }
150

151
  tEncoderInit(&encoder, buf, vlen);
1,332,503✔
152
  code = tEncodeSTqOffset(&encoder, pOffset);
1,333,597✔
153
  if (code < 0) {
1,333,597!
154
    goto END;
×
155
  }
156

157
  taosWLockLatch(&pTq->lock);
1,333,597✔
158
  code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), buf, vlen);
1,330,749!
159
  taosWUnLockLatch(&pTq->lock);
1,332,503✔
160
END:
1,333,597✔
161
  tEncoderClear(&encoder);
1,333,597✔
162
  taosMemoryFree(buf);
1,333,597!
163
  return code;
1,332,503✔
164
}
165

166
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen, const void* value, uint32_t vLen) {
11,497,126✔
167
  if (pTq == NULL || ttb == NULL || key == NULL || value == NULL) {
11,497,126!
168
    return TSDB_CODE_INVALID_PARA;
43✔
169
  }
170
  int32_t code = TDB_CODE_SUCCESS;
11,497,083✔
171
  TXN*    txn = NULL;
11,497,083✔
172

173
  TQ_ERR_GO_TO_END(
11,498,220!
174
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
175
  TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
11,481,085!
176
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
11,489,192!
177
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
11,481,895!
178

179
  return 0;
11,479,533✔
180

181
END:
×
182
  tdbAbort(pTq->pMetaDB, txn);
×
183
  return code;
×
184
}
185

186
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen) {
5,534,232✔
187
  if (pTq == NULL || ttb == NULL || key == NULL) {
5,534,232!
188
    return TSDB_CODE_INVALID_PARA;
×
189
  }
190
  int32_t code = TDB_CODE_SUCCESS;
5,536,677✔
191
  TXN*    txn = NULL;
5,536,677✔
192

193
  TQ_ERR_GO_TO_END(
5,536,677!
194
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
195
  TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
5,524,971✔
196
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
2,938,307!
197
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
2,941,235!
198

199
  return 0;
2,937,170✔
200

201
END:
2,584,761✔
202
  tdbAbort(pTq->pMetaDB, txn);
2,586,644✔
203
  return code;
2,588,086✔
204
}
205

206
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) {
45,470,163✔
207
  if (pTq == NULL || subkey == NULL || pOffset == NULL) {
45,470,163!
208
    return TSDB_CODE_INVALID_PARA;
×
209
  }
210
  void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
45,472,031!
211
  if (data == NULL) {
45,472,031✔
212
    int vLen = 0;
8,809,878✔
213
    taosRLockLatch(&pTq->lock);
8,809,878✔
214
    if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) {
8,809,878✔
215
      taosRUnLockLatch(&pTq->lock);
8,595,821✔
216
      tdbFree(data);
8,601,814✔
217
      return TSDB_CODE_OUT_OF_MEMORY;
8,575,852✔
218
    }
219
    taosRUnLockLatch(&pTq->lock);
198,744✔
220

221
    STqOffset offset = {0};
198,744✔
222
    if (tqMetaDecodeOffsetInfo(&offset, data, vLen >= 0 ? vLen : 0) != TDB_CODE_SUCCESS) {
198,744!
223
      tdbFree(data);
×
224
      return TSDB_CODE_OUT_OF_MEMORY;
×
225
    }
226

227
    if (taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0) {
198,744!
228
      tDeleteSTqOffset(&offset);
×
229
      tdbFree(data);
×
230
      return terrno;
×
231
    }
232
    tdbFree(data);
198,744✔
233

234
    *pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
198,744!
235
    if (*pOffset == NULL) {
198,744!
236
      return TSDB_CODE_OUT_OF_MEMORY;
×
237
    }
238
  } else {
239
    *pOffset = data;
36,662,153✔
240
  }
241
  return 0;
36,860,897✔
242
}
243

244
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
9,712,259✔
245
  if (pTq == NULL || key == NULL || pHandle == NULL) {
9,712,259!
246
    return TSDB_CODE_INVALID_PARA;
×
247
  }
248
  int32_t  code = TDB_CODE_SUCCESS;
9,712,259✔
249
  uint32_t  vlen;
250
  void*    buf = NULL;
9,712,259✔
251
  SEncoder encoder = {0};
9,712,259✔
252
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
9,711,158!
253
  if (code < 0) {
9,705,302!
254
    goto END;
×
255
  }
256

257
  tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
9,705,302!
258
          (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
259

260
  buf = taosMemoryCalloc(1, vlen);
9,712,259!
261
  if (buf == NULL) {
9,712,259!
262
    code = terrno;
×
263
    goto END;
×
264
  }
265

266
  tEncoderInit(&encoder, buf, vlen);
9,712,259✔
267
  code = tEncodeSTqHandle(&encoder, pHandle);
9,712,259✔
268
  if (code < 0) {
9,712,259!
269
    goto END;
×
270
  }
271

272
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pExecStore, key, strlen(key), buf, vlen));
9,712,259!
273

274
END:
9,699,888✔
275
  tEncoderClear(&encoder);
9,695,816✔
276
  taosMemoryFree(buf);
9,695,578!
277
  return code;
9,678,483✔
278
}
279

280
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
4,153,097✔
281
  if (pTq == NULL || handle == NULL) {
4,153,097!
282
    return TSDB_CODE_INVALID_PARA;
×
283
  }
284
  int32_t code = TDB_CODE_SUCCESS;
4,154,194✔
285

286
  SVnode* pVnode = pTq->pVnode;
4,154,194✔
287
  int32_t vgId = TD_VID(pVnode);
4,154,194✔
288

289
  handle->pRef = walOpenRef(pVnode->pWal);
4,154,194✔
290
  TQ_NULL_GO_TO_END(handle->pRef);
4,152,723!
291

292
  SReadHandle reader = {
4,149,770✔
293
      .vnode = pVnode,
294
      .initTableReader = true,
295
      .initTqReader = true,
296
      .version = handle->snapshotVer,
4,150,893✔
297
  };
298

299
  initStorageAPI(&reader.api);
4,149,368✔
300

301
  if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4,130,161✔
302
    handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId,
3,140,829✔
303
                                                       &handle->execHandle.numOfCols, handle->consumerId);
3,140,634✔
304
    TQ_NULL_GO_TO_END(handle->execHandle.task);
3,152,084!
305
    void* scanner = NULL;
3,152,084✔
306
    qExtractTmqScanner(handle->execHandle.task, &scanner);
3,152,084✔
307
    TQ_NULL_GO_TO_END(scanner);
3,152,084!
308
    handle->execHandle.pTqReader = qExtractReaderFromTmqScanner(scanner);
3,152,084✔
309
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
3,152,084!
310
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
999,123✔
311
    handle->pWalReader = walOpenReader(pVnode->pWal, 0);
819,162✔
312
    TQ_NULL_GO_TO_END(handle->pWalReader);
819,162!
313
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
819,162✔
314
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
819,162!
315
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
819,162!
316
                                      (SSnapContext**)(&reader.sContext)));
317
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
817,298✔
318
    TQ_NULL_GO_TO_END(handle->execHandle.task);
819,162!
319
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
181,058!
320
    handle->pWalReader = walOpenReader(pVnode->pWal, 0);
182,948✔
321
    TQ_NULL_GO_TO_END(handle->pWalReader);
182,948!
322
    if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
182,948!
323
      if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
66,070!
324
        tqError("nodesStringToNode error in sub stable, since %s", terrstr());
×
325
        return TSDB_CODE_SCH_INTERNAL_ERROR;
×
326
      }
327
    }
328
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid,
182,948!
329
                                      handle->execHandle.subType, handle->fetchMeta,
330
                                      (SSnapContext**)(&reader.sContext)));
331
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
182,948✔
332
    TQ_NULL_GO_TO_END(handle->execHandle.task);
182,948!
333
    SArray* tbUidList = NULL;
182,948✔
334
    int     ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList,
182,948✔
335
                                handle->execHandle.task);
336
    if (ret != TDB_CODE_SUCCESS) {
182,948!
337
      tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId);
×
338
      taosArrayDestroy(tbUidList);
×
339
      return TSDB_CODE_SCH_INTERNAL_ERROR;
×
340
    }
341
    tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId,
182,948!
342
           handle->execHandle.execTb.suid);
343
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
182,948✔
344
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
182,948!
345
    tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
182,948✔
346
    taosArrayDestroy(tbUidList);
182,948✔
347
  }
348
  handle->tableCreateTimeHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
4,152,304✔
349

350
END:
4,154,194✔
351
  return code;
4,152,723✔
352
}
353

354
static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, uint32_t vLen, STqHandle* handle) {
498,379✔
355
  if (pTq == NULL || pVal == NULL || handle == NULL) {
498,379!
356
    return TSDB_CODE_INVALID_PARA;
×
357
  }
358
  int32_t  vgId = TD_VID(pTq->pVnode);
498,379✔
359
  SDecoder decoder = {0};
498,379✔
360
  int32_t  code = TDB_CODE_SUCCESS;
498,379✔
361

362
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
498,379✔
363
  TQ_ERR_GO_TO_END(tDecodeSTqHandle(&decoder, handle));
498,379!
364
  TQ_ERR_GO_TO_END(tqMetaInitHandle(pTq, handle));
498,379!
365
  tqInfo("tqMetaRestoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
498,379!
366
  code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
498,379!
367

368
END:
498,379✔
369
  tDecoderClear(&decoder);
498,379✔
370
  return code;
498,379✔
371
}
372

373
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
3,649,894✔
374
  if (pTq == NULL || req == NULL || handle == NULL) {
3,649,894!
375
    return TSDB_CODE_INVALID_PARA;
×
376
  }
377
  int32_t vgId = TD_VID(pTq->pVnode);
3,655,815✔
378

379
  (void)memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
3,655,815!
380
  handle->consumerId = req->newConsumerId;
3,655,815✔
381

382
  handle->execHandle.subType = req->subType;
3,655,815✔
383
  handle->fetchMeta = req->withMeta;
3,655,815✔
384
  if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
3,655,815✔
385
    void* tmp = taosStrdup(req->qmsg);
2,769,267!
386
    if (tmp == NULL) {
2,769,267!
387
      return terrno;
×
388
    }
389
    handle->execHandle.execCol.qmsg = tmp;
2,769,267✔
390
  } else if (req->subType == TOPIC_SUB_TYPE__DB) {
886,548✔
391
    handle->execHandle.execDb.pFilterOutTbUid =
761,058✔
392
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
761,058✔
393
    if (handle->execHandle.execDb.pFilterOutTbUid == NULL) {
761,058!
394
      return terrno;
×
395
    }
396
  } else if (req->subType == TOPIC_SUB_TYPE__TABLE) {
125,490!
397
    handle->execHandle.execTb.suid = req->suid;
125,490✔
398
    void* tmp = taosStrdup(req->qmsg);
125,490!
399
    if (tmp == NULL) {
125,490!
400
      return terrno;
×
401
    }
402
    handle->execHandle.execTb.qmsg = tmp;
125,490✔
403
  }
404

405
  handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
3,655,815✔
406

407
  int32_t code = tqMetaInitHandle(pTq, handle);
3,655,815✔
408
  if (code != 0) {
3,655,815!
409
    return code;
×
410
  }
411
  tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
3,655,815!
412
         handle->consumerId, vgId, handle->snapshotVer);
413
  return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
3,654,721!
414
}
415

416
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
×
417
  if (pMetaDB == NULL || pOld == NULL || pNew == NULL) {
×
418
    return TSDB_CODE_INVALID_PARA;
×
419
  }
420
  TBC*  pCur = NULL;
×
421
  void* pKey = NULL;
×
422
  int   kLen = 0;
×
423
  void* pVal = NULL;
×
424
  int   vLen = 0;
×
425
  TXN*  txn = NULL;
×
426

427
  int32_t code = TDB_CODE_SUCCESS;
×
428

429
  TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL));
×
430
  TQ_ERR_GO_TO_END(
×
431
      tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
432

433
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
×
434
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
435
    TQ_ERR_GO_TO_END(tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn));
×
436
  }
437

438
  TQ_ERR_GO_TO_END(tdbCommit(pMetaDB, txn));
×
439
  TQ_ERR_GO_TO_END(tdbPostCommit(pMetaDB, txn));
×
440

441
END:
×
442
  tdbFree(pKey);
×
443
  tdbFree(pVal);
×
444
  tdbTbcClose(pCur);
×
445
  return code;
×
446
}
447

448
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
226,558,255✔
449
  if (pTq == NULL || key == NULL || pHandle == NULL) {
226,558,255!
450
    return TSDB_CODE_INVALID_PARA;
×
451
  }
452
  void* data = taosHashGet(pTq->pHandle, key, strlen(key));
226,589,476!
453
  if (data == NULL) {
226,563,719✔
454
    int vLen = 0;
4,193,814✔
455
    if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
4,193,814✔
456
      tdbFree(data);
3,692,454✔
457
      return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
3,689,484✔
458
    }
459
    STqHandle handle = {0};
498,379✔
460
    int32_t code = tqMetaRestoreHandle(pTq, data, vLen >= 0 ? vLen : 0, &handle);
498,379✔
461
    if (code != 0) {
498,379!
462
      tdbFree(data);
×
463
      tqDestroyTqHandle(&handle);
×
464
      return code;
×
465
    }
466
    tdbFree(data);
498,379✔
467
    *pHandle = taosHashGet(pTq->pHandle, key, strlen(key));
498,379!
468
    if (*pHandle == NULL) {
498,379!
469
      return terrno;
×
470
    }
471
  } else {
472
    *pHandle = data;
222,369,905✔
473
  }
474
  return TDB_CODE_SUCCESS;
222,915,455✔
475
}
476

477
int32_t tqMetaOpenTdb(STQ* pTq) {
38,393,725✔
478
  if (pTq == NULL) {
38,393,725!
479
    return TSDB_CODE_INVALID_PARA;
×
480
  }
481
  int32_t code = TDB_CODE_SUCCESS;
38,393,725✔
482
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL));
38,393,725!
483
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0));
38,396,798!
484
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0));
38,402,025!
485
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0));
38,403,093!
486

487
END:
38,400,309✔
488
  return code;
38,400,309✔
489
}
490

491
static int32_t replaceTqPath(char** path) {
38,404,361✔
492
  if (path == NULL || *path == NULL) {
38,404,361!
493
    return TSDB_CODE_INVALID_PARA;
×
494
  }
495
  char*   tpath = NULL;
38,404,062✔
496
  int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME);
38,399,808✔
497
  if (code != 0) {
38,403,546!
498
    return code;
×
499
  }
500
  taosMemoryFree(*path);
38,403,546!
501
  *path = tpath;
38,393,184✔
502
  return TDB_CODE_SUCCESS;
38,393,184✔
503
}
504

505
static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
38,398,379✔
506
  if (pTq == NULL) {
38,398,379!
507
    return TSDB_CODE_INVALID_PARA;
×
508
  }
509
  TBC*         pCur = NULL;
38,398,379✔
510
  void*        pKey = NULL;
38,400,677✔
511
  int          kLen = 0;
38,396,290✔
512
  void*        pVal = NULL;
38,401,015✔
513
  int          vLen = 0;
38,398,779✔
514
  int32_t      code = 0;
38,402,117✔
515
  STqCheckInfo info = {0};
38,402,117✔
516

517
  TQ_ERR_GO_TO_END(tdbTbcOpen(pTq->pCheckStore, &pCur, NULL));
38,397,774!
518
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
38,397,122!
519

520
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
38,644,914✔
521
    TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen >= 0 ? vLen : 0));
245,645!
522
    TQ_ERR_GO_TO_END(taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)));
247,110!
523
  }
524
  info.colIdList = NULL;
38,394,728✔
525

526
END:
38,394,728✔
527
  tdbFree(pKey);
38,397,035✔
528
  tdbFree(pVal);
38,394,453✔
529
  tdbTbcClose(pCur);
38,393,063✔
530
  tDeleteSTqCheckInfo(&info);
38,403,023✔
531
  return code;
38,390,420✔
532
}
533

534
int32_t tqMetaOpen(STQ* pTq) {
38,372,940✔
535
  if (pTq == NULL) {
38,372,940!
536
    return TSDB_CODE_INVALID_PARA;
×
537
  }
538
  char*   maindb = NULL;
38,372,940✔
539
  char*   offsetNew = NULL;
38,405,329✔
540
  int32_t code = TDB_CODE_SUCCESS;
38,405,329✔
541
  TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
38,405,329!
542
  if (!taosCheckExistFile(maindb)) {
38,404,557!
543
    TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
38,405,329!
544
    TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
38,393,602!
545
  } else {
546
    TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
×
547
    TQ_ERR_GO_TO_END(taosRemoveFile(maindb));
×
548
  }
549

550
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
38,400,309!
551
  if (taosCheckExistFile(offsetNew)) {
38,398,560!
552
    TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
×
553
    TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
×
554
  }
555

556
  TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
38,403,146!
557

558
END:
38,398,042✔
559
  taosMemoryFree(maindb);
38,403,171!
560
  taosMemoryFree(offsetNew);
38,395,336!
561
  return code;
38,391,787✔
562
}
563

564
int32_t tqMetaTransform(STQ* pTq) {
×
565
  if (pTq == NULL) {
×
566
    return TSDB_CODE_INVALID_PARA;
×
567
  }
568
  int32_t code = TDB_CODE_SUCCESS;
×
569
  TDB*    pMetaDB = NULL;
×
570
  TTB*    pExecStore = NULL;
×
571
  TTB*    pCheckStore = NULL;
×
572
  char*   offsetNew = NULL;
×
573
  char*   offset = NULL;
×
574
  TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME));
×
575

576
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL));
×
577
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0));
×
578
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0));
×
579

580
  TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
×
581
  TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
×
582

583
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
×
584
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
×
585

586
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
×
587

588
  if (taosCheckExistFile(offset)) {
×
589
    if (taosCopyFile(offset, offsetNew) < 0) {
×
590
      tqError("copy offset file error");
×
591
    } else {
592
      TQ_ERR_GO_TO_END(taosRemoveFile(offset));
×
593
    }
594
  }
595

596
END:
×
597
  taosMemoryFree(offset);
×
598
  taosMemoryFree(offsetNew);
×
599

600
  tdbTbClose(pExecStore);
×
601
  tdbTbClose(pCheckStore);
×
602
  tdbClose(pMetaDB);
×
603
  return code;
×
604
}
605

606
void tqMetaClose(STQ* pTq) {
38,404,206✔
607
  if (pTq == NULL) {
38,404,206!
608
    return;
×
609
  }
610
  int32_t ret = 0;
38,404,206✔
611
  if (pTq->pExecStore) {
38,404,206✔
612
    tdbTbClose(pTq->pExecStore);
38,403,196✔
613
  }
614
  if (pTq->pCheckStore) {
38,406,339!
615
    tdbTbClose(pTq->pCheckStore);
38,404,206✔
616
  }
617
  if (pTq->pOffsetStore) {
38,402,878!
618
    tdbTbClose(pTq->pOffsetStore);
38,402,878✔
619
  }
620
  tdbClose(pTq->pMetaDB);
38,404,206✔
621
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc