• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4879

11 Dec 2025 02:43AM UTC coverage: 64.544% (-0.03%) from 64.569%
#4879

push

travis-ci

guanshengliang
feat(TS-7270): internal dependence

307 of 617 new or added lines in 24 files covered. (49.76%)

3883 existing lines in 125 files now uncovered.

163565 of 253417 relevant lines covered (64.54%)

105600506.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.29
/source/dnode/vnode/src/tq/tqMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "tdbInt.h"
16
#include "tq.h"
17

18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
1,098,983✔
19
  if (pEncoder == NULL || pHandle == NULL) {
1,098,983✔
20
    return TSDB_CODE_INVALID_PARA;
×
21
  }
22
  int32_t code = 0;
1,099,074✔
23
  int32_t lino;
24

25
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
1,099,074✔
26
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->subKey));
2,198,148✔
27
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->fetchMeta));
2,197,957✔
28
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->consumerId));
2,197,706✔
29
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->snapshotVer));
2,197,482✔
30
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pHandle->epoch));
2,197,515✔
31
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->execHandle.subType));
2,197,684✔
32
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,098,828✔
33
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg));
2,038,369✔
34
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
79,662✔
35
    int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
66,830✔
36
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
66,830✔
37
    void* pIter = NULL;
66,830✔
38
    pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
66,830✔
39
    while (pIter) {
66,830✔
40
      int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
×
41
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *tbUid));
×
42
      pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
×
43
    }
44
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
12,832✔
45
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid));
25,664✔
46
    if (pHandle->execHandle.execTb.qmsg != NULL) {
12,832✔
47
      TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg));
25,664✔
48
    }
49
  }
50
  tEndEncode(pEncoder);
1,098,854✔
51
_exit:
1,098,712✔
52
  if (code) {
1,098,712✔
53
    return code;
×
54
  } else {
55
    return pEncoder->pos;
1,098,712✔
56
  }
57
}
58

59
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
3,665✔
60
  if (pDecoder == NULL || pHandle == NULL) {
3,665✔
61
    return TSDB_CODE_INVALID_PARA;
×
62
  }
63
  int32_t code = 0;
3,665✔
64
  int32_t lino;
65

66
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
3,665✔
67
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pHandle->subKey));
3,665✔
68
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->fetchMeta));
7,330✔
69
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->consumerId));
7,330✔
70
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->snapshotVer));
7,330✔
71
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pHandle->epoch));
7,330✔
72
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->execHandle.subType));
7,330✔
73
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
3,665✔
74
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg));
5,290✔
75
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
1,020✔
76
    pHandle->execHandle.execDb.pFilterOutTbUid =
313✔
77
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
313✔
78
    if (pHandle->execHandle.execDb.pFilterOutTbUid == NULL) {
313✔
79
      TAOS_CHECK_EXIT(terrno);
×
80
    }
81
    int32_t size = 0;
313✔
82
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
313✔
83
    for (int32_t i = 0; i < size; i++) {
313✔
84
      int64_t tbUid = 0;
×
85
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &tbUid));
×
86
      TAOS_CHECK_EXIT(taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0));
×
87
    }
88
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
707✔
89
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid));
1,414✔
90
    if (!tDecodeIsEnd(pDecoder)) {
707✔
91
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg));
1,414✔
92
    }
93
  }
94
  tEndDecode(pDecoder);
3,665✔
95

96
_exit:
3,665✔
97
  return code;
3,665✔
98
}
99

100
int32_t tqMetaDecodeCheckInfo(STqCheckInfo* info, void* pVal, uint32_t vLen) {
11,224✔
101
  if (info == NULL || pVal == NULL) {
11,224✔
102
    return TSDB_CODE_INVALID_PARA;
×
103
  }
104
  SDecoder decoder = {0};
11,258✔
105
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
11,258✔
106
  int32_t code = tDecodeSTqCheckInfo(&decoder, info);
11,258✔
107
  tDecoderClear(&decoder);
11,258✔
108

109
  if (code != 0) {
11,258✔
110
    tDeleteSTqCheckInfo(info);
×
111
    return TSDB_CODE_OUT_OF_MEMORY;
×
112
  }
113
  return code;
11,258✔
114
}
115

116
int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, uint32_t vLen) {
2,098✔
117
  if (info == NULL || pVal == NULL) {
2,098✔
118
    return TSDB_CODE_INVALID_PARA;
×
119
  }
120
  SDecoder decoder = {0};
2,098✔
121
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
2,098✔
122
  int32_t code = tDecodeSTqOffset(&decoder, info);
2,098✔
123
  tDecoderClear(&decoder);
2,098✔
124

125
  if (code != 0) {
2,098✔
126
    tDeleteSTqOffset(info);
×
127
    return TSDB_CODE_OUT_OF_MEMORY;
×
128
  }
129
  return code;
2,098✔
130
}
131

132
int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
165,907✔
133
  if (pTq == NULL || pOffset == NULL) {
165,907✔
134
    return TSDB_CODE_INVALID_PARA;
×
135
  }
136
  void*    buf = NULL;
165,907✔
137
  int32_t  code = TDB_CODE_SUCCESS;
165,907✔
138
  uint32_t  vlen;
139
  SEncoder encoder = {0};
165,907✔
140
  tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
165,907✔
141
  if (code < 0) {
165,907✔
142
    goto END;
×
143
  }
144

145
  buf = taosMemoryCalloc(1, vlen);
165,907✔
146
  if (buf == NULL) {
165,907✔
147
    code = terrno;
×
148
    goto END;
×
149
  }
150

151
  tEncoderInit(&encoder, buf, vlen);
165,907✔
152
  code = tEncodeSTqOffset(&encoder, pOffset);
165,907✔
153
  if (code < 0) {
165,805✔
154
    goto END;
×
155
  }
156

157
  taosWLockLatch(&pTq->lock);
165,805✔
158
  code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), buf, vlen);
165,907✔
159
  taosWUnLockLatch(&pTq->lock);
165,907✔
160
END:
165,907✔
161
  tEncoderClear(&encoder);
165,907✔
162
  taosMemoryFree(buf);
165,907✔
163
  return code;
165,907✔
164
}
165

166
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen, const void* value, uint32_t vLen) {
724,799✔
167
  if (pTq == NULL || ttb == NULL || key == NULL || value == NULL) {
724,799✔
168
    return TSDB_CODE_INVALID_PARA;
×
169
  }
170
  int32_t code = TDB_CODE_SUCCESS;
724,799✔
171
  TXN*    txn = NULL;
724,799✔
172

173
  TQ_ERR_GO_TO_END(
724,799✔
174
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
175
  TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
724,180✔
176
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
724,667✔
177
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
724,639✔
178

179
  return 0;
724,238✔
180

181
END:
×
182
  tdbAbort(pTq->pMetaDB, txn);
×
183
  return code;
×
184
}
185

186
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen) {
177,053✔
187
  if (pTq == NULL || ttb == NULL || key == NULL) {
177,053✔
188
    return TSDB_CODE_INVALID_PARA;
×
189
  }
190
  int32_t code = TDB_CODE_SUCCESS;
177,053✔
191
  TXN*    txn = NULL;
177,053✔
192

193
  TQ_ERR_GO_TO_END(
177,053✔
194
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
195
  TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
176,879✔
196
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
91,969✔
197
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
92,052✔
198

199
  return 0;
91,970✔
200

201
END:
84,960✔
202
  tdbAbort(pTq->pMetaDB, txn);
84,914✔
203
  return code;
84,905✔
204
}
205

206
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) {
749,276✔
207
  if (pTq == NULL || subkey == NULL || pOffset == NULL) {
749,276✔
208
    return TSDB_CODE_INVALID_PARA;
×
209
  }
210
  void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
749,276✔
211
  if (data == NULL) {
749,276✔
212
    int vLen = 0;
259,488✔
213
    taosRLockLatch(&pTq->lock);
259,488✔
214
    if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) {
259,488✔
215
      taosRUnLockLatch(&pTq->lock);
257,798✔
216
      tdbFree(data);
258,014✔
217
      return TSDB_CODE_OUT_OF_MEMORY;
257,099✔
218
    }
219
    taosRUnLockLatch(&pTq->lock);
1,278✔
220

221
    STqOffset offset = {0};
1,278✔
222
    if (tqMetaDecodeOffsetInfo(&offset, data, vLen >= 0 ? vLen : 0) != TDB_CODE_SUCCESS) {
1,278✔
223
      tdbFree(data);
×
224
      return TSDB_CODE_OUT_OF_MEMORY;
×
225
    }
226

227
    if (taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0) {
1,346✔
228
      tDeleteSTqOffset(&offset);
×
229
      tdbFree(data);
×
230
      return terrno;
×
231
    }
232
    tdbFree(data);
1,346✔
233

234
    *pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
1,346✔
235
    if (*pOffset == NULL) {
1,346✔
236
      return TSDB_CODE_OUT_OF_MEMORY;
×
237
    }
238
  } else {
239
    *pOffset = data;
489,788✔
240
  }
241
  return 0;
491,134✔
242
}
243

244
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
549,537✔
245
  if (pTq == NULL || key == NULL || pHandle == NULL) {
549,537✔
246
    return TSDB_CODE_INVALID_PARA;
×
247
  }
248
  int32_t  code = TDB_CODE_SUCCESS;
549,537✔
249
  uint32_t  vlen;
250
  void*    buf = NULL;
549,537✔
251
  SEncoder encoder = {0};
549,537✔
252
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
549,537✔
253
  if (code < 0) {
549,537✔
254
    goto END;
×
255
  }
256

257
  tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
549,537✔
258
          (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
259

260
  buf = taosMemoryCalloc(1, vlen);
549,537✔
261
  if (buf == NULL) {
549,537✔
262
    code = terrno;
×
263
    goto END;
×
264
  }
265

266
  tEncoderInit(&encoder, buf, vlen);
549,537✔
267
  code = tEncodeSTqHandle(&encoder, pHandle);
549,537✔
268
  if (code < 0) {
549,433✔
269
    goto END;
×
270
  }
271

272
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pExecStore, key, strlen(key), buf, vlen));
549,433✔
273

274
END:
548,471✔
275
  tEncoderClear(&encoder);
548,471✔
276
  taosMemoryFree(buf);
549,041✔
277
  return code;
548,514✔
278
}
279

280
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
126,997✔
281
  if (pTq == NULL || handle == NULL) {
126,997✔
282
    return TSDB_CODE_INVALID_PARA;
×
283
  }
284
  int32_t code = TDB_CODE_SUCCESS;
126,997✔
285
  SArray* tbUidList = NULL;
126,997✔
286

287
  SVnode* pVnode = pTq->pVnode;
126,997✔
288
  int32_t vgId = TD_VID(pVnode);
126,997✔
289

290
  handle->pRef = walOpenRef(pVnode->pWal);
126,997✔
291
  TQ_NULL_GO_TO_END(handle->pRef);
126,937✔
292

293
  SReadHandle reader = {0};
126,927✔
294
  reader = (SReadHandle){
126,867✔
295
      .vnode = pVnode,
296
      .initTableReader = true,
297
      .initTqReader = true,
298
      .version = handle->snapshotVer,
126,997✔
299
  };
300

301
  initStorageAPI(&reader.api);
126,867✔
302

303
  if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
126,860✔
304
    handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId,
107,578✔
305
                                                       &handle->execHandle.numOfCols, handle->consumerId);
107,391✔
306
    TQ_NULL_GO_TO_END(handle->execHandle.task);
107,391✔
307
    void* scanner = NULL;
107,391✔
308
    qExtractTmqScanner(handle->execHandle.task, &scanner);
107,391✔
309
    TQ_NULL_GO_TO_END(scanner);
107,391✔
310
    handle->execHandle.pTqReader = qExtractReaderFromTmqScanner(scanner);
107,391✔
311
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
107,391✔
312
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
19,606✔
313
    handle->pWalReader = walOpenReader(pVnode->pWal, 0);
15,927✔
314
    TQ_NULL_GO_TO_END(handle->pWalReader);
15,927✔
315
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
15,927✔
316
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
15,927✔
317
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
15,927✔
318
                                      (SSnapContext**)(&reader.sContext)));
319
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
15,845✔
320
    TQ_NULL_GO_TO_END(handle->execHandle.task);
15,927✔
321
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
3,679✔
322
    handle->pWalReader = walOpenReader(pVnode->pWal, 0);
3,679✔
323
    TQ_NULL_GO_TO_END(handle->pWalReader);
3,679✔
324
    if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
3,679✔
325
      if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
1,815✔
326
        tqError("nodesStringToNode error in sub stable, since %s", terrstr());
×
327
        return TSDB_CODE_SCH_INTERNAL_ERROR;
×
328
      }
329
    }
330
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid,
3,679✔
331
                                      handle->execHandle.subType, handle->fetchMeta,
332
                                      (SSnapContext**)(&reader.sContext)));
333
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
3,679✔
334
    TQ_NULL_GO_TO_END(handle->execHandle.task);
3,679✔
335
    TQ_ERR_GO_TO_END(qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList,
3,679✔
336
                                handle->execHandle.task));
337
    tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId,
3,679✔
338
           handle->execHandle.execTb.suid);
339
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
3,679✔
340
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
3,607✔
341
    TQ_ERR_GO_TO_END(tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL));
3,679✔
342
  }
343
  handle->tableCreateTimeHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
126,997✔
344

345
END:
126,997✔
346
  reader.api.snapshotFn.destroySnapshot(reader.sContext);
126,997✔
347
  taosArrayDestroy(tbUidList);
126,997✔
348
  return code;
126,997✔
349
}
350

351
static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, uint32_t vLen, STqHandle* handle) {
2,873✔
352
  if (pTq == NULL || pVal == NULL || handle == NULL) {
2,873✔
UNCOV
353
    return TSDB_CODE_INVALID_PARA;
×
354
  }
355
  int32_t  vgId = TD_VID(pTq->pVnode);
2,873✔
356
  SDecoder decoder = {0};
2,873✔
357
  int32_t  code = TDB_CODE_SUCCESS;
2,873✔
358

359
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
2,873✔
360
  TQ_ERR_GO_TO_END(tDecodeSTqHandle(&decoder, handle));
2,873✔
361
  TQ_ERR_GO_TO_END(tqMetaInitHandle(pTq, handle));
2,873✔
362
  tqInfo("tqMetaRestoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
2,873✔
363
  code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
2,873✔
364

365
END:
2,873✔
366
  tDecoderClear(&decoder);
2,873✔
367
  return code;
2,873✔
368
}
369

370
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
124,124✔
371
  if (pTq == NULL || req == NULL || handle == NULL) {
124,124✔
UNCOV
372
    return TSDB_CODE_INVALID_PARA;
×
373
  }
374
  int32_t vgId = TD_VID(pTq->pVnode);
124,124✔
375

376
  (void)memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
124,124✔
377
  handle->consumerId = req->newConsumerId;
124,124✔
378

379
  handle->execHandle.subType = req->subType;
124,124✔
380
  handle->fetchMeta = req->withMeta;
124,124✔
381
  if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
124,124✔
382
    void* tmp = taosStrdup(req->qmsg);
105,299✔
383
    if (tmp == NULL) {
105,299✔
UNCOV
384
      return terrno;
×
385
    }
386
    handle->execHandle.execCol.qmsg = tmp;
105,299✔
387
  } else if (req->subType == TOPIC_SUB_TYPE__DB) {
18,825✔
388
    handle->execHandle.execDb.pFilterOutTbUid =
15,678✔
389
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
15,678✔
390
    if (handle->execHandle.execDb.pFilterOutTbUid == NULL) {
15,678✔
UNCOV
391
      return terrno;
×
392
    }
393
  } else if (req->subType == TOPIC_SUB_TYPE__TABLE) {
3,147✔
394
    handle->execHandle.execTb.suid = req->suid;
3,147✔
395
    void* tmp = taosStrdup(req->qmsg);
3,147✔
396
    if (tmp == NULL) {
3,147✔
UNCOV
397
      return terrno;
×
398
    }
399
    handle->execHandle.execTb.qmsg = tmp;
3,147✔
400
  }
401

402
  handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
124,124✔
403

404
  int32_t code = tqMetaInitHandle(pTq, handle);
124,124✔
405
  if (code != 0) {
124,124✔
UNCOV
406
    return code;
×
407
  }
408
  tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
124,124✔
409
         handle->consumerId, vgId, handle->snapshotVer);
410
  return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
124,124✔
411
}
412

UNCOV
413
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
×
414
  if (pMetaDB == NULL || pOld == NULL || pNew == NULL) {
×
415
    return TSDB_CODE_INVALID_PARA;
×
416
  }
UNCOV
417
  TBC*  pCur = NULL;
×
418
  void* pKey = NULL;
×
419
  int   kLen = 0;
×
420
  void* pVal = NULL;
×
421
  int   vLen = 0;
×
422
  TXN*  txn = NULL;
×
423

UNCOV
424
  int32_t code = TDB_CODE_SUCCESS;
×
425

UNCOV
426
  TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL));
×
427
  TQ_ERR_GO_TO_END(
×
428
      tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
429

UNCOV
430
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
×
431
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
432
    TQ_ERR_GO_TO_END(tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn));
×
433
  }
434

UNCOV
435
  TQ_ERR_GO_TO_END(tdbCommit(pMetaDB, txn));
×
436
  TQ_ERR_GO_TO_END(tdbPostCommit(pMetaDB, txn));
×
437

UNCOV
438
END:
×
439
  tdbFree(pKey);
×
440
  tdbFree(pVal);
×
441
  tdbTbcClose(pCur);
×
442
  return code;
×
443
}
444

445
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
4,828,806✔
446
  if (pTq == NULL || key == NULL || pHandle == NULL) {
4,828,806✔
UNCOV
447
    return TSDB_CODE_INVALID_PARA;
×
448
  }
449
  void* data = taosHashGet(pTq->pHandle, key, strlen(key));
4,829,066✔
450
  if (data == NULL) {
4,828,671✔
451
    int vLen = 0;
127,277✔
452
    if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
127,277✔
453
      tdbFree(data);
124,204✔
454
      return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
124,321✔
455
    }
456
    STqHandle handle = {0};
2,873✔
457
    int32_t code = tqMetaRestoreHandle(pTq, data, vLen >= 0 ? vLen : 0, &handle);
2,873✔
458
    if (code != 0) {
2,873✔
UNCOV
459
      tdbFree(data);
×
460
      tqDestroyTqHandle(&handle);
×
461
      return code;
×
462
    }
463
    tdbFree(data);
2,873✔
464
    *pHandle = taosHashGet(pTq->pHandle, key, strlen(key));
2,873✔
465
    if (*pHandle == NULL) {
2,873✔
UNCOV
466
      return terrno;
×
467
    }
468
  } else {
469
    *pHandle = data;
4,701,394✔
470
  }
471
  return TDB_CODE_SUCCESS;
4,704,772✔
472
}
473

474
int32_t tqMetaOpenTdb(STQ* pTq) {
4,112,236✔
475
  if (pTq == NULL) {
4,112,236✔
UNCOV
476
    return TSDB_CODE_INVALID_PARA;
×
477
  }
478
  int32_t code = TDB_CODE_SUCCESS;
4,112,236✔
479
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, NULL));
4,112,236✔
480
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0));
4,114,923✔
481
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0));
4,114,012✔
482
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0));
4,114,640✔
483

484
END:
4,115,275✔
485
  return code;
4,115,275✔
486
}
487

488
static int32_t replaceTqPath(char** path) {
4,114,037✔
489
  if (path == NULL || *path == NULL) {
4,114,037✔
UNCOV
490
    return TSDB_CODE_INVALID_PARA;
×
491
  }
492
  char*   tpath = NULL;
4,114,804✔
493
  int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME);
4,114,895✔
494
  if (code != 0) {
4,115,486✔
UNCOV
495
    return code;
×
496
  }
497
  taosMemoryFree(*path);
4,115,486✔
498
  *path = tpath;
4,113,757✔
499
  return TDB_CODE_SUCCESS;
4,113,757✔
500
}
501

502
static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
4,115,275✔
503
  if (pTq == NULL) {
4,115,275✔
UNCOV
504
    return TSDB_CODE_INVALID_PARA;
×
505
  }
506
  TBC*         pCur = NULL;
4,115,275✔
507
  void*        pKey = NULL;
4,115,486✔
508
  int          kLen = 0;
4,115,486✔
509
  void*        pVal = NULL;
4,115,275✔
510
  int          vLen = 0;
4,115,486✔
511
  int32_t      code = 0;
4,115,203✔
512
  STqCheckInfo info = {0};
4,115,203✔
513

514
  TQ_ERR_GO_TO_END(tdbTbcOpen(pTq->pCheckStore, &pCur, NULL));
4,115,203✔
515
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
4,115,275✔
516

517
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
4,118,722✔
518
    TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen >= 0 ? vLen : 0));
3,447✔
519
    TQ_ERR_GO_TO_END(taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)));
3,447✔
520
  }
521
  info.colIdList = NULL;
4,114,222✔
522

523
END:
4,114,222✔
524
  tdbFree(pKey);
4,114,433✔
525
  tdbFree(pVal);
4,114,989✔
526
  tdbTbcClose(pCur);
4,115,200✔
527
  tDeleteSTqCheckInfo(&info);
4,115,275✔
528
  return code;
4,115,486✔
529
}
530

531
int32_t tqMetaOpen(STQ* pTq) {
4,113,540✔
532
  if (pTq == NULL) {
4,113,540✔
UNCOV
533
    return TSDB_CODE_INVALID_PARA;
×
534
  }
535
  char*   maindb = NULL;
4,113,540✔
536
  char*   offsetNew = NULL;
4,115,407✔
537
  int32_t code = TDB_CODE_SUCCESS;
4,115,407✔
538
  TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
4,115,407✔
539
  if (!taosCheckExistFile(maindb)) {
4,114,312✔
540
    TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
4,114,719✔
541
    TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
4,113,037✔
542
  } else {
UNCOV
543
    TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
×
544
    TQ_ERR_GO_TO_END(taosRemoveFile(maindb));
×
545
  }
546

547
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
4,114,992✔
548
  if (taosCheckExistFile(offsetNew)) {
4,114,466✔
UNCOV
549
    TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
×
550
    TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
×
551
  }
552

553
  TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
4,115,275✔
554

555
END:
4,114,719✔
556
  taosMemoryFree(maindb);
4,114,719✔
557
  taosMemoryFree(offsetNew);
4,113,664✔
558
  return code;
4,114,527✔
559
}
560

UNCOV
561
int32_t tqMetaTransform(STQ* pTq) {
×
562
  if (pTq == NULL) {
×
563
    return TSDB_CODE_INVALID_PARA;
×
564
  }
UNCOV
565
  int32_t code = TDB_CODE_SUCCESS;
×
566
  TDB*    pMetaDB = NULL;
×
567
  TTB*    pExecStore = NULL;
×
568
  TTB*    pCheckStore = NULL;
×
569
  char*   offsetNew = NULL;
×
570
  char*   offset = NULL;
×
571
  TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME));
×
572

UNCOV
573
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, NULL));
×
574
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0));
×
575
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0));
×
576

UNCOV
577
  TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
×
578
  TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
×
579

UNCOV
580
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
×
581
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
×
582

UNCOV
583
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
×
584

UNCOV
585
  if (taosCheckExistFile(offset)) {
×
586
    if (taosCopyFile(offset, offsetNew) < 0) {
×
587
      tqError("copy offset file error");
×
588
    } else {
UNCOV
589
      TQ_ERR_GO_TO_END(taosRemoveFile(offset));
×
590
    }
591
  }
592

UNCOV
593
END:
×
594
  taosMemoryFree(offset);
×
595
  taosMemoryFree(offsetNew);
×
596

UNCOV
597
  tdbTbClose(pExecStore);
×
598
  tdbTbClose(pCheckStore);
×
599
  tdbClose(pMetaDB);
×
600
  return code;
×
601
}
602

603
void tqMetaClose(STQ* pTq) {
4,112,196✔
604
  if (pTq == NULL) {
4,112,196✔
UNCOV
605
    return;
×
606
  }
607
  int32_t ret = 0;
4,112,196✔
608
  if (pTq->pExecStore) {
4,112,196✔
609
    tdbTbClose(pTq->pExecStore);
4,115,486✔
610
  }
611
  if (pTq->pCheckStore) {
4,115,486✔
612
    tdbTbClose(pTq->pCheckStore);
4,115,486✔
613
  }
614
  if (pTq->pOffsetStore) {
4,115,486✔
615
    tdbTbClose(pTq->pOffsetStore);
4,114,432✔
616
  }
617
  tdbClose(pTq->pMetaDB);
4,116,540✔
618
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc