• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4911

04 Jan 2026 09:05AM UTC coverage: 65.028% (-0.8%) from 65.864%
#4911

push

travis-ci

web-flow
merge: from main to 3.0 branch #34156

1206 of 4524 new or added lines in 22 files covered. (26.66%)

1517 existing lines in 134 files now uncovered.

195276 of 300296 relevant lines covered (65.03%)

116931714.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.8
/source/dnode/vnode/src/tq/tqMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "tdbInt.h"
16
#include "tq.h"
17

18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
2,353,966✔
19
  if (pEncoder == NULL || pHandle == NULL) {
2,353,966✔
20
    return TSDB_CODE_INVALID_PARA;
×
21
  }
22
  int32_t code = 0;
2,354,561✔
23
  int32_t lino;
24

25
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
2,354,561✔
26
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->subKey));
4,708,558✔
27
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->fetchMeta));
4,708,248✔
28
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->consumerId));
4,707,143✔
29
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->snapshotVer));
4,707,107✔
30
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pHandle->epoch));
4,707,713✔
31
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->execHandle.subType));
4,708,060✔
32
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,354,015✔
33
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg));
3,878,081✔
34
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, &pHandle->execHandle.execCol.pSW));
3,879,769✔
35
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
414,232✔
36
    int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
354,992✔
37
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
354,992✔
38
    void* pIter = NULL;
354,992✔
39
    pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
354,992✔
40
    while (pIter) {
354,992✔
41
      int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
×
42
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *tbUid));
×
43
      pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
×
44
    }
45
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
59,240✔
46
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid));
118,480✔
47
    if (pHandle->execHandle.execTb.qmsg != NULL) {
59,240✔
48
      TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg));
118,480✔
49
    }
50
  }
51
  tEndEncode(pEncoder);
2,354,254✔
52
_exit:
2,351,161✔
53
  if (code) {
2,351,161✔
54
    return code;
×
55
  } else {
56
    return pEncoder->pos;
2,351,161✔
57
  }
58
}
59

60
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
69,897✔
61
  if (pDecoder == NULL || pHandle == NULL) {
69,897✔
62
    return TSDB_CODE_INVALID_PARA;
×
63
  }
64
  int32_t code = 0;
69,897✔
65
  int32_t lino;
66

67
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
69,897✔
68
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pHandle->subKey));
69,897✔
69
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->fetchMeta));
139,794✔
70
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->consumerId));
139,794✔
71
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->snapshotVer));
139,794✔
72
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pHandle->epoch));
139,794✔
73
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->execHandle.subType));
139,794✔
74
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
69,897✔
75
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg));
96,806✔
76
    if (!tDecodeIsEnd(pDecoder)) {
48,403✔
77
      TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, &pHandle->execHandle.execCol.pSW));
96,806✔
78
    }
79
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
21,494✔
80
    pHandle->execHandle.execDb.pFilterOutTbUid =
14,243✔
81
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
14,243✔
82
    if (pHandle->execHandle.execDb.pFilterOutTbUid == NULL) {
14,243✔
83
      TAOS_CHECK_EXIT(terrno);
×
84
    }
85
    int32_t size = 0;
14,243✔
86
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
14,243✔
87
    for (int32_t i = 0; i < size; i++) {
14,243✔
88
      int64_t tbUid = 0;
×
89
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &tbUid));
×
90
      TAOS_CHECK_EXIT(taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0));
×
91
    }
92
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
7,251✔
93
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid));
14,502✔
94
    if (!tDecodeIsEnd(pDecoder)) {
7,251✔
95
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg));
14,502✔
96
    }
97
  }
98
  tEndDecode(pDecoder);
69,897✔
99

100
_exit:
69,897✔
101
  return code;
69,897✔
102
}
103

104
int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, uint32_t vLen) {
30,636✔
105
  if (info == NULL || pVal == NULL) {
30,636✔
106
    return TSDB_CODE_INVALID_PARA;
×
107
  }
108
  SDecoder decoder = {0};
30,636✔
109
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
30,636✔
110
  int32_t code = tDecodeSTqOffset(&decoder, info);
30,636✔
111
  tDecoderClear(&decoder);
30,389✔
112

113
  if (code != 0) {
30,389✔
114
    tDeleteSTqOffset(info);
×
115
    return TSDB_CODE_OUT_OF_MEMORY;
×
116
  }
117
  return code;
30,389✔
118
}
119

120
int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
233,783✔
121
  if (pTq == NULL || pOffset == NULL) {
233,783✔
122
    return TSDB_CODE_INVALID_PARA;
×
123
  }
124
  void*    buf = NULL;
233,783✔
125
  int32_t  code = TDB_CODE_SUCCESS;
233,783✔
126
  uint32_t  vlen;
127
  SEncoder encoder = {0};
233,783✔
128
  tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
233,783✔
129
  if (code < 0) {
233,783✔
130
    goto END;
×
131
  }
132

133
  buf = taosMemoryCalloc(1, vlen);
233,783✔
134
  if (buf == NULL) {
233,783✔
135
    code = terrno;
×
136
    goto END;
×
137
  }
138

139
  tEncoderInit(&encoder, buf, vlen);
233,783✔
140
  code = tEncodeSTqOffset(&encoder, pOffset);
233,783✔
141
  if (code < 0) {
233,783✔
142
    goto END;
×
143
  }
144

145
  taosWLockLatch(&pTq->lock);
233,783✔
146
  code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), buf, vlen);
233,783✔
147
  taosWUnLockLatch(&pTq->lock);
233,783✔
148
END:
233,783✔
149
  tEncoderClear(&encoder);
233,783✔
150
  taosMemoryFree(buf);
233,783✔
151
  return code;
233,783✔
152
}
153

154
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen, const void* value, uint32_t vLen) {
1,434,216✔
155
  if (pTq == NULL || ttb == NULL || key == NULL || value == NULL) {
1,434,216✔
156
    return TSDB_CODE_INVALID_PARA;
×
157
  }
158
  int32_t code = TDB_CODE_SUCCESS;
1,434,216✔
159
  TXN*    txn = NULL;
1,434,216✔
160

161
  TQ_ERR_GO_TO_END(
1,434,216✔
162
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
163
  TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
1,433,402✔
164
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
1,433,366✔
165
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
1,433,726✔
166

167
  return 0;
1,430,006✔
168

169
END:
×
170
  tdbAbort(pTq->pMetaDB, txn);
×
171
  return code;
×
172
}
173

174
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen) {
542,294✔
175
  if (pTq == NULL || ttb == NULL || key == NULL) {
542,294✔
UNCOV
176
    return TSDB_CODE_INVALID_PARA;
×
177
  }
178
  int32_t code = TDB_CODE_SUCCESS;
542,597✔
179
  TXN*    txn = NULL;
542,597✔
180

181
  TQ_ERR_GO_TO_END(
542,886✔
182
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
183
  TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
542,633✔
184
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
287,455✔
185
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
286,637✔
186

187
  return 0;
286,636✔
188

189
END:
254,849✔
190
  tdbAbort(pTq->pMetaDB, txn);
254,521✔
191
  return code;
254,825✔
192
}
193

194
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) {
4,330,487✔
195
  if (pTq == NULL || subkey == NULL || pOffset == NULL) {
4,330,487✔
196
    return TSDB_CODE_INVALID_PARA;
×
197
  }
198
  void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
4,330,740✔
199
  if (data == NULL) {
4,330,487✔
200
    int vLen = 0;
862,243✔
201
    taosRLockLatch(&pTq->lock);
862,496✔
202
    if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) {
862,176✔
203
      taosRUnLockLatch(&pTq->lock);
842,444✔
204
      tdbFree(data);
842,466✔
205
      return TSDB_CODE_OUT_OF_MEMORY;
839,458✔
206
    }
207
    taosRUnLockLatch(&pTq->lock);
18,983✔
208

209
    STqOffset offset = {0};
18,983✔
210
    if (tqMetaDecodeOffsetInfo(&offset, data, vLen >= 0 ? vLen : 0) != TDB_CODE_SUCCESS) {
18,983✔
211
      tdbFree(data);
×
212
      return TSDB_CODE_OUT_OF_MEMORY;
×
213
    }
214

215
    if (taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0) {
18,736✔
216
      tDeleteSTqOffset(&offset);
×
217
      tdbFree(data);
×
218
      return terrno;
×
219
    }
220
    tdbFree(data);
18,983✔
221

222
    *pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
18,983✔
223
    if (*pOffset == NULL) {
18,983✔
224
      return TSDB_CODE_OUT_OF_MEMORY;
×
225
    }
226
  } else {
227
    *pOffset = data;
3,468,244✔
228
  }
229
  return 0;
3,487,227✔
230
}
231

232
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
1,177,179✔
233
  if (pTq == NULL || key == NULL || pHandle == NULL) {
1,177,179✔
UNCOV
234
    return TSDB_CODE_INVALID_PARA;
×
235
  }
236
  int32_t  code = TDB_CODE_SUCCESS;
1,177,179✔
237
  uint32_t  vlen;
238
  void*    buf = NULL;
1,177,179✔
239
  SEncoder encoder = {0};
1,177,179✔
240
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
1,177,179✔
241
  if (code < 0) {
1,176,133✔
242
    goto END;
×
243
  }
244

245
  tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
1,176,133✔
246
          (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
247

248
  buf = taosMemoryCalloc(1, vlen);
1,177,432✔
249
  if (buf == NULL) {
1,177,432✔
250
    code = terrno;
×
251
    goto END;
×
252
  }
253

254
  tEncoderInit(&encoder, buf, vlen);
1,177,432✔
255
  code = tEncodeSTqHandle(&encoder, pHandle);
1,177,432✔
256
  if (code < 0) {
1,177,432✔
257
    goto END;
×
258
  }
259

260
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pExecStore, key, strlen(key), buf, vlen));
1,177,432✔
261

262
END:
1,174,863✔
263
  tEncoderClear(&encoder);
1,175,451✔
264
  taosMemoryFree(buf);
1,174,252✔
265
  return code;
1,171,872✔
266
}
267

268
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
416,209✔
269
  if (pTq == NULL || handle == NULL) {
416,209✔
270
    return TSDB_CODE_INVALID_PARA;
×
271
  }
272
  int32_t code = TDB_CODE_SUCCESS;
416,209✔
273
  SArray* tbUidList = NULL;
416,209✔
274

275
  SVnode* pVnode = pTq->pVnode;
416,209✔
276
  int32_t vgId = TD_VID(pVnode);
416,209✔
277

278
  handle->pRef = walOpenRef(pVnode->pWal);
416,209✔
279
  TQ_NULL_GO_TO_END(handle->pRef);
415,680✔
280

281
  SReadHandle reader = {0};
415,933✔
282
  reader = (SReadHandle){
415,933✔
283
      .vnode = pVnode,
284
      .initTableReader = true,
285
      .initTqReader = true,
286
      .version = handle->snapshotVer,
415,933✔
287
  };
288

289
  initStorageAPI(&reader.api);
415,933✔
290

291
  if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
415,407✔
292
    handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, handle->consumerId);
307,635✔
293
    TQ_NULL_GO_TO_END(handle->execHandle.task);
307,878✔
294
    void* scanner = NULL;
307,878✔
295
    qExtractTmqScanner(handle->execHandle.task, &scanner);
307,878✔
296
    TQ_NULL_GO_TO_END(scanner);
307,878✔
297
    handle->execHandle.pTqReader = qExtractReaderFromTmqScanner(scanner);
307,878✔
298
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
307,878✔
299
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
108,331✔
300
    handle->pWalReader = walOpenReader(pVnode->pWal, 0);
89,396✔
301
    TQ_NULL_GO_TO_END(handle->pWalReader);
89,396✔
302
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
89,396✔
303
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
89,396✔
304
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
89,396✔
305
                                      (SSnapContext**)(&reader.sContext)));
306
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, handle->consumerId);
89,396✔
307
    TQ_NULL_GO_TO_END(handle->execHandle.task);
89,396✔
308
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
18,935✔
309
    handle->pWalReader = walOpenReader(pVnode->pWal, 0);
18,935✔
310
    TQ_NULL_GO_TO_END(handle->pWalReader);
18,935✔
311
    if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
18,935✔
312
      if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
6,969✔
313
        tqError("nodesStringToNode error in sub stable, since %s", terrstr());
×
314
        return TSDB_CODE_SCH_INTERNAL_ERROR;
×
315
      }
316
    }
317
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid,
18,935✔
318
                                      handle->execHandle.subType, handle->fetchMeta,
319
                                      (SSnapContext**)(&reader.sContext)));
320
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, handle->consumerId);
18,935✔
321
    TQ_NULL_GO_TO_END(handle->execHandle.task);
18,935✔
322
    TQ_ERR_GO_TO_END(qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList,
18,935✔
323
                                handle->execHandle.task));
324
    tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId,
18,935✔
325
           handle->execHandle.execTb.suid);
326
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
18,935✔
327
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
18,935✔
328
    TQ_ERR_GO_TO_END(tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL));
18,935✔
329
  }
330
  handle->tableCreateTimeHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
416,209✔
331

332
END:
416,209✔
333
  reader.api.snapshotFn.destroySnapshot(reader.sContext);
416,209✔
334
  taosArrayDestroy(tbUidList);
416,209✔
335
  return code;
416,209✔
336
}
337

338
static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, uint32_t vLen, STqHandle* handle) {
58,244✔
339
  if (pTq == NULL || pVal == NULL || handle == NULL) {
58,244✔
340
    return TSDB_CODE_INVALID_PARA;
×
341
  }
342
  int32_t  vgId = TD_VID(pTq->pVnode);
58,244✔
343
  SDecoder decoder = {0};
58,244✔
344
  int32_t  code = TDB_CODE_SUCCESS;
58,244✔
345

346
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
58,244✔
347
  TQ_ERR_GO_TO_END(tDecodeSTqHandle(&decoder, handle));
58,244✔
348
  TQ_ERR_GO_TO_END(tqMetaInitHandle(pTq, handle));
58,244✔
349
  tqInfo("tqMetaRestoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
58,244✔
350
  code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
58,244✔
351

352
END:
58,244✔
353
  tDecoderClear(&decoder);
58,244✔
354
  return code;
58,244✔
355
}
356

357
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
357,689✔
358
  if (pTq == NULL || req == NULL || handle == NULL) {
357,689✔
359
    return TSDB_CODE_INVALID_PARA;
×
360
  }
361
  int32_t vgId = TD_VID(pTq->pVnode);
357,965✔
362

363
  (void)memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
357,965✔
364
  handle->consumerId = req->newConsumerId;
357,965✔
365

366
  handle->execHandle.subType = req->subType;
357,965✔
367
  handle->fetchMeta = req->withMeta;
357,965✔
368
  if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
357,965✔
369
    void* tmp = taosStrdup(req->qmsg);
266,961✔
370
    if (tmp == NULL) {
266,961✔
371
      return terrno;
×
372
    }
373
    handle->execHandle.execCol.qmsg = tmp;
266,961✔
374
    handle->execHandle.execCol.pSW = req->schema;
266,961✔
375
    req->schema.pSchema = NULL;
266,961✔
376
  } else if (req->subType == TOPIC_SUB_TYPE__DB) {
91,004✔
377
    handle->execHandle.execDb.pFilterOutTbUid =
77,265✔
378
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
77,265✔
379
    if (handle->execHandle.execDb.pFilterOutTbUid == NULL) {
77,265✔
380
      return terrno;
×
381
    }
382
  } else if (req->subType == TOPIC_SUB_TYPE__TABLE) {
13,739✔
383
    handle->execHandle.execTb.suid = req->suid;
13,739✔
384
    void* tmp = taosStrdup(req->qmsg);
13,739✔
385
    if (tmp == NULL) {
13,739✔
386
      return terrno;
×
387
    }
388
    handle->execHandle.execTb.qmsg = tmp;
13,739✔
389
  }
390

391
  handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
357,369✔
392

393
  int32_t code = tqMetaInitHandle(pTq, handle);
357,965✔
394
  if (code != 0) {
357,965✔
395
    return code;
×
396
  }
397
  tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
357,965✔
398
         handle->consumerId, vgId, handle->snapshotVer);
399
  return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
357,965✔
400
}
401

402
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
×
403
  if (pMetaDB == NULL || pOld == NULL || pNew == NULL) {
×
404
    return TSDB_CODE_INVALID_PARA;
×
405
  }
406
  TBC*  pCur = NULL;
×
407
  void* pKey = NULL;
×
408
  int   kLen = 0;
×
409
  void* pVal = NULL;
×
410
  int   vLen = 0;
×
411
  TXN*  txn = NULL;
×
412

413
  int32_t code = TDB_CODE_SUCCESS;
×
414

415
  TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL));
×
416
  TQ_ERR_GO_TO_END(
×
417
      tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
418

419
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
×
420
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
421
    TQ_ERR_GO_TO_END(tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn));
×
422
  }
423

424
  TQ_ERR_GO_TO_END(tdbCommit(pMetaDB, txn));
×
425
  TQ_ERR_GO_TO_END(tdbPostCommit(pMetaDB, txn));
×
426

427
END:
×
428
  tdbFree(pKey);
×
429
  tdbFree(pVal);
×
430
  tdbTbcClose(pCur);
×
431
  return code;
×
432
}
433

434
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
26,453,020✔
435
  if (pTq == NULL || key == NULL || pHandle == NULL) {
26,453,020✔
436
    return TSDB_CODE_INVALID_PARA;
×
437
  }
438
  void* data = taosHashGet(pTq->pHandle, key, strlen(key));
26,455,610✔
439
  if (data == NULL) {
26,454,713✔
440
    int vLen = 0;
415,641✔
441
    if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
415,641✔
442
      tdbFree(data);
357,091✔
443
      return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
356,815✔
444
    }
445
    STqHandle handle = {0};
58,244✔
446
    int32_t code = tqMetaRestoreHandle(pTq, data, vLen >= 0 ? vLen : 0, &handle);
58,244✔
447
    if (code != 0) {
58,244✔
448
      tdbFree(data);
×
449
      tqDestroyTqHandle(&handle);
×
450
      return code;
×
451
    }
452
    tdbFree(data);
58,244✔
453
    *pHandle = taosHashGet(pTq->pHandle, key, strlen(key));
58,244✔
454
    if (*pHandle == NULL) {
58,244✔
455
      return terrno;
×
456
    }
457
  } else {
458
    *pHandle = data;
26,039,072✔
459
  }
460
  return TDB_CODE_SUCCESS;
26,098,188✔
461
}
462

463
int32_t tqMetaOpenTdb(STQ* pTq) {
3,600,942✔
464
  if (pTq == NULL) {
3,600,942✔
465
    return TSDB_CODE_INVALID_PARA;
×
466
  }
467
  int32_t code = TDB_CODE_SUCCESS;
3,600,942✔
468
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, NULL));
3,600,942✔
469
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0));
3,603,284✔
470
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0));
3,602,982✔
471

472
END:
3,603,363✔
473
  return code;
3,603,363✔
474
}
475

476
static int32_t replaceTqPath(char** path) {
3,603,363✔
477
  if (path == NULL || *path == NULL) {
3,603,363✔
478
    return TSDB_CODE_INVALID_PARA;
×
479
  }
480
  char*   tpath = NULL;
3,603,363✔
481
  int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME);
3,603,363✔
482
  if (code != 0) {
3,602,669✔
483
    return code;
×
484
  }
485
  taosMemoryFree(*path);
3,602,669✔
486
  *path = tpath;
3,601,300✔
487
  return TDB_CODE_SUCCESS;
3,601,300✔
488
}
489

490
int32_t tqMetaOpen(STQ* pTq) {
3,600,490✔
491
  if (pTq == NULL) {
3,600,490✔
492
    return TSDB_CODE_INVALID_PARA;
×
493
  }
494
  char*   maindb = NULL;
3,600,490✔
495
  char*   offsetNew = NULL;
3,603,363✔
496
  int32_t code = TDB_CODE_SUCCESS;
3,602,765✔
497
  TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
3,602,765✔
498
  if (!taosCheckExistFile(maindb)) {
3,602,754✔
499
    TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
3,603,363✔
500
    TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
3,601,300✔
501
  } else {
502
    TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
×
503
    TQ_ERR_GO_TO_END(taosRemoveFile(maindb));
×
504
  }
505

506
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
3,603,363✔
507
  if (taosCheckExistFile(offsetNew)) {
3,602,764✔
508
    TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
×
509
    TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
×
510
  }
511

512
END:
3,603,363✔
513
  taosMemoryFree(maindb);
3,603,363✔
514
  taosMemoryFree(offsetNew);
3,602,846✔
515
  return code;
3,603,363✔
516
}
517

518
int32_t tqMetaTransform(STQ* pTq) {
×
519
  if (pTq == NULL) {
×
520
    return TSDB_CODE_INVALID_PARA;
×
521
  }
522
  int32_t code = TDB_CODE_SUCCESS;
×
523
  TDB*    pMetaDB = NULL;
×
524
  TTB*    pExecStore = NULL;
×
525
  char*   offsetNew = NULL;
×
526
  char*   offset = NULL;
×
527
  TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME));
×
528

529
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, NULL));
×
530
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0));
×
531

532
  TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
×
533
  TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
×
534

535
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
×
536

537
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
×
538

539
  if (taosCheckExistFile(offset)) {
×
540
    if (taosCopyFile(offset, offsetNew) < 0) {
×
541
      tqError("copy offset file error");
×
542
    } else {
543
      TQ_ERR_GO_TO_END(taosRemoveFile(offset));
×
544
    }
545
  }
546

547
END:
×
548
  taosMemoryFree(offset);
×
549
  taosMemoryFree(offsetNew);
×
550

551
  tdbTbClose(pExecStore);
×
552
  tdbClose(pMetaDB);
×
553
  return code;
×
554
}
555

556
void tqMetaClose(STQ* pTq) {
3,603,134✔
557
  if (pTq == NULL) {
3,603,134✔
558
    return;
×
559
  }
560
  int32_t ret = 0;
3,603,134✔
561
  if (pTq->pExecStore) {
3,603,134✔
562
    tdbTbClose(pTq->pExecStore);
3,603,134✔
563
  }
564
  if (pTq->pOffsetStore) {
3,603,592✔
565
    tdbTbClose(pTq->pOffsetStore);
3,603,363✔
566
  }
567
  tdbClose(pTq->pMetaDB);
3,603,363✔
568
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc