• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5058

17 May 2026 01:15AM UTC coverage: 73.387% (-0.02%) from 73.406%
#5058

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281656 of 383795 relevant lines covered (73.39%)

135114337.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.46
/source/dnode/vnode/src/tq/tqMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "tdbInt.h"
16
#include "tq.h"
17

18
int32_t tqEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
2,795,722✔
19
  if (pEncoder == NULL || pHandle == NULL) {
2,795,722✔
20
    return TSDB_CODE_INVALID_PARA;
×
21
  }
22
  int32_t code = 0;
2,796,054✔
23
  int32_t lino;
24

25
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
2,796,054✔
26
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->subKey));
5,590,954✔
27
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->fetchMeta));
5,589,446✔
28
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->consumerId));
5,590,228✔
29
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->snapshotVer));
5,589,863✔
30
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pHandle->epoch));
5,588,397✔
31
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->execHandle.subType));
5,588,339✔
32
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,794,123✔
33
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg));
4,489,600✔
34
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, &pHandle->execHandle.execCol.pSW));
4,490,634✔
35
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
550,016✔
36
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
451,684✔
37
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
98,332✔
38
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid));
196,664✔
39
    if (pHandle->execHandle.execTb.qmsg != NULL) {
98,332✔
40
      TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg));
196,664✔
41
    }
42
  }
43
  tEndEncode(pEncoder);
2,795,645✔
44
_exit:
2,791,426✔
45
  if (code) {
2,791,426✔
46
    return code;
×
47
  } else {
48
    return pEncoder->pos;
2,791,426✔
49
  }
50
}
51

52
int32_t tqDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
84,094✔
53
  if (pDecoder == NULL || pHandle == NULL) {
84,094✔
54
    return TSDB_CODE_INVALID_PARA;
×
55
  }
56
  int32_t code = 0;
84,094✔
57
  int32_t lino;
58

59
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
84,094✔
60
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pHandle->subKey));
84,094✔
61
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->fetchMeta));
168,188✔
62
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->consumerId));
168,188✔
63
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->snapshotVer));
168,188✔
64
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pHandle->epoch));
168,188✔
65
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->execHandle.subType));
168,188✔
66
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
84,094✔
67
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg));
112,004✔
68
    if (!tDecodeIsEnd(pDecoder)) {
56,002✔
69
      TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, &pHandle->execHandle.execCol.pSW));
112,004✔
70
    }
71
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
28,092✔
72
    int32_t size = 0;
19,547✔
73
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
19,547✔
74
    for (int32_t i = 0; i < size; i++) {
19,547✔
75
      int64_t tbUid = 0;
×
76
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &tbUid));
×
77
    }
78
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
8,545✔
79
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid));
17,090✔
80
    if (!tDecodeIsEnd(pDecoder)) {
8,545✔
81
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg));
17,090✔
82
    }
83
  }
84
  tEndDecode(pDecoder);
84,094✔
85

86
_exit:
84,094✔
87
  return code;
84,094✔
88
}
89

90
void tqDestroySTqHandle(void* data) {
562,611✔
91
  if (data == NULL) return;
562,611✔
92
  STqHandle* pData = (STqHandle*)data;
562,611✔
93
  qDestroyTask(pData->execHandle.task);
562,611✔
94

95
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
561,455✔
96
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
406,211✔
97
    taosMemoryFreeClear(pData->execHandle.execCol.pSW.pSchema);
403,978✔
98
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
155,993✔
99
    tqReaderClose(pData->execHandle.pTqReader);
124,312✔
100
    walCloseReader(pData->pWalReader);
124,312✔
101
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
31,681✔
102
    walCloseReader(pData->pWalReader);
31,681✔
103
    tqReaderClose(pData->execHandle.pTqReader);
31,681✔
104
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
31,681✔
105
    nodesDestroyNode(pData->execHandle.execTb.node);
31,681✔
106
  }
107
  if (pData->msg != NULL) {
559,576✔
108
    rpcFreeCont(pData->msg->pCont);
×
109
    taosMemoryFree(pData->msg);
×
110
    pData->msg = NULL;
×
111
  }
112
  if (pData->block != NULL) {
561,858✔
113
    blockDataDestroy(pData->block);
×
114
  }
115
  if (pData->pRef) {
561,831✔
116
    walCloseRef(pData->pRef->pWal, pData->pRef->refId);
545,892✔
117
  }
118
  taosHashCleanup(pData->tableCreateTimeHash);
561,033✔
119
}
120

121
int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, uint32_t vLen) {
76,817✔
122
  if (info == NULL || pVal == NULL) {
76,817✔
123
    return TSDB_CODE_INVALID_PARA;
×
124
  }
125
  SDecoder decoder = {0};
76,817✔
126
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
76,817✔
127
  int32_t code = tDecodeSTqOffset(&decoder, info);
76,817✔
128
  tDecoderClear(&decoder);
76,817✔
129

130
  if (code != 0) {
76,817✔
131
    tDeleteSTqOffset(info);
×
132
    return TSDB_CODE_OUT_OF_MEMORY;
×
133
  }
134
  return code;
76,817✔
135
}
136

137
int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
257,654✔
138
  if (pTq == NULL || pOffset == NULL) {
257,654✔
139
    return TSDB_CODE_INVALID_PARA;
×
140
  }
141
  void*    buf = NULL;
257,654✔
142
  int32_t  code = TDB_CODE_SUCCESS;
257,654✔
143
  uint32_t  vlen;
144
  SEncoder encoder = {0};
257,654✔
145
  tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
257,654✔
146
  if (code < 0) {
257,654✔
147
    goto END;
×
148
  }
149

150
  buf = taosMemoryCalloc(1, vlen);
257,654✔
151
  if (buf == NULL) {
257,654✔
152
    code = terrno;
×
153
    goto END;
×
154
  }
155

156
  tEncoderInit(&encoder, buf, vlen);
257,654✔
157
  code = tEncodeSTqOffset(&encoder, pOffset);
257,654✔
158
  if (code < 0) {
257,654✔
159
    goto END;
×
160
  }
161

162
  taosWLockLatch(&pTq->lock);
257,654✔
163
  code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), buf, vlen);
257,654✔
164
  taosWUnLockLatch(&pTq->lock);
257,654✔
165
END:
257,654✔
166
  tEncoderClear(&encoder);
257,654✔
167
  taosMemoryFree(buf);
257,654✔
168
  return code;
257,654✔
169
}
170

171
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen, const void* value, uint32_t vLen) {
1,684,180✔
172
  if (pTq == NULL || ttb == NULL || key == NULL || value == NULL) {
1,684,180✔
173
    return TSDB_CODE_INVALID_PARA;
×
174
  }
175
  int32_t code = TDB_CODE_SUCCESS;
1,685,339✔
176
  TXN*    txn = NULL;
1,685,339✔
177

178
  TQ_ERR_GO_TO_END(
1,685,339✔
179
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
180
  TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
1,683,412✔
181
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
1,683,144✔
182
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
1,679,672✔
183

184
  return 0;
1,682,509✔
185

186
END:
×
187
  if (txn != NULL) {
×
188
    tdbAbort(pTq->pMetaDB, txn);
×
189
  }
190
  return code;
×
191
}
192

193
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen) {
691,966✔
194
  if (pTq == NULL || ttb == NULL || key == NULL) {
691,966✔
195
    return TSDB_CODE_INVALID_PARA;
52✔
196
  }
197
  int32_t code = TDB_CODE_SUCCESS;
691,914✔
198
  TXN*    txn = NULL;
691,914✔
199

200
  TQ_ERR_GO_TO_END(
691,914✔
201
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
202
  TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
689,933✔
203
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
365,678✔
204
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
363,276✔
205

206
  return 0;
364,853✔
207

208
END:
322,125✔
209
  if (txn != NULL) {
321,356✔
210
    tdbAbort(pTq->pMetaDB, txn);
323,603✔
211
  }
212
  return code;
326,664✔
213
}
214

215
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) {
19,044,858✔
216
  if (pTq == NULL || subkey == NULL || pOffset == NULL) {
19,044,858✔
217
    return TSDB_CODE_INVALID_PARA;
×
218
  }
219
  void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
19,044,858✔
220
  if (data == NULL) {
19,044,461✔
221
    int vLen = 0;
5,911,787✔
222
    taosRLockLatch(&pTq->lock);
5,911,787✔
223
    if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) {
5,911,380✔
224
      taosRUnLockLatch(&pTq->lock);
5,839,038✔
225
      tdbFree(data);
5,846,118✔
226
      return TSDB_CODE_OUT_OF_MEMORY;
5,834,547✔
227
    }
228
    taosRUnLockLatch(&pTq->lock);
61,385✔
229

230
    STqOffset offset = {0};
61,385✔
231
    if (tqMetaDecodeOffsetInfo(&offset, data, vLen >= 0 ? vLen : 0) != TDB_CODE_SUCCESS) {
61,385✔
232
      tdbFree(data);
×
233
      return TSDB_CODE_OUT_OF_MEMORY;
×
234
    }
235

236
    if (taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0) {
61,385✔
237
      tDeleteSTqOffset(&offset);
×
238
      tdbFree(data);
×
239
      return terrno;
×
240
    }
241
    tdbFree(data);
61,385✔
242

243
    *pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
61,385✔
244
    if (*pOffset == NULL) {
61,385✔
245
      return TSDB_CODE_OUT_OF_MEMORY;
×
246
    }
247
  } else {
248
    *pOffset = data;
13,132,674✔
249
  }
250
  return 0;
13,194,059✔
251
}
252

253
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
1,397,695✔
254
  if (pTq == NULL || key == NULL || pHandle == NULL) {
1,397,695✔
255
    return TSDB_CODE_INVALID_PARA;
×
256
  }
257
  int32_t  code = TDB_CODE_SUCCESS;
1,397,695✔
258
  uint32_t  vlen;
259
  void*    buf = NULL;
1,397,695✔
260
  SEncoder encoder = {0};
1,397,695✔
261
  tEncodeSize(tqEncodeSTqHandle, pHandle, vlen, code);
1,398,027✔
262
  if (code < 0) {
1,396,529✔
263
    goto END;
×
264
  }
265

266
  tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
1,396,529✔
267
          (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
268

269
  buf = taosMemoryCalloc(1, vlen);
1,398,027✔
270
  if (buf == NULL) {
1,398,027✔
271
    code = terrno;
×
272
    goto END;
×
273
  }
274

275
  tEncoderInit(&encoder, buf, vlen);
1,398,027✔
276
  code = tqEncodeSTqHandle(&encoder, pHandle);
1,398,027✔
277
  if (code < 0) {
1,397,626✔
278
    goto END;
×
279
  }
280

281
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pExecStore, key, strlen(key), buf, vlen));
1,397,626✔
282

283
END:
1,394,696✔
284
  tEncoderClear(&encoder);
1,394,333✔
285
  taosMemoryFree(buf);
1,395,237✔
286
  return code;
1,390,898✔
287
}
288

289
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
546,971✔
290
  if (pTq == NULL || handle == NULL) {
546,971✔
291
    return TSDB_CODE_INVALID_PARA;
×
292
  }
293
  int32_t code = TDB_CODE_SUCCESS;
546,971✔
294
  SArray* tbUidList = NULL;
546,971✔
295

296
  SVnode* pVnode = pTq->pVnode;
546,971✔
297
  int32_t vgId = TD_VID(pVnode);
547,377✔
298

299
  handle->pRef = walOpenRef(pVnode->pWal);
547,377✔
300
  TQ_NULL_GO_TO_END(handle->pRef);
546,575✔
301

302
  SReadHandle reader = {0};
546,575✔
303
  reader = (SReadHandle){
546,650✔
304
      .vnode = pVnode,
305
      .initTableReader = true,
306
      .initTqReader = true,
307
      .version = handle->snapshotVer,
546,575✔
308
  };
309

310
  initStorageAPI(&reader.api);
546,650✔
311

312
  if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
545,870✔
313
    handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, handle->consumerId);
396,688✔
314
    TQ_NULL_GO_TO_END(handle->execHandle.task);
397,107✔
315
    void* scanner = NULL;
397,107✔
316
    qExtractTmqScanner(handle->execHandle.task, &scanner);
397,107✔
317
    TQ_NULL_GO_TO_END(scanner);
397,107✔
318
    handle->execHandle.pTqReader = qExtractReaderFromTmqScanner(scanner);
397,107✔
319
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
397,107✔
320
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
149,532✔
321
    handle->pWalReader = walOpenReader(pVnode->pWal, 0);
121,425✔
322
    TQ_NULL_GO_TO_END(handle->pWalReader);
121,425✔
323
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
121,425✔
324
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
121,425✔
325
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
121,425✔
326
                                      (SSnapContext**)(&reader.sContext)));
327
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, handle->consumerId);
121,425✔
328
    TQ_NULL_GO_TO_END(handle->execHandle.task);
121,425✔
329
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
28,845✔
330
    handle->pWalReader = walOpenReader(pVnode->pWal, 0);
28,845✔
331
    TQ_NULL_GO_TO_END(handle->pWalReader);
28,845✔
332
    if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
28,845✔
333
      if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
11,162✔
334
        tqError("nodesStringToNode error in sub stable, since %s", terrstr());
×
335
        return TSDB_CODE_SCH_INTERNAL_ERROR;
×
336
      }
337
    }
338
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid,
28,845✔
339
                                      handle->execHandle.subType, handle->fetchMeta,
340
                                      (SSnapContext**)(&reader.sContext)));
341
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, handle->consumerId);
28,845✔
342
    TQ_NULL_GO_TO_END(handle->execHandle.task);
28,845✔
343
    TQ_ERR_GO_TO_END(qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList,
28,845✔
344
                                handle->execHandle.task));
345
    tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId,
28,845✔
346
           handle->execHandle.execTb.suid);
347
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
28,845✔
348
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
28,845✔
349
    TQ_ERR_GO_TO_END(tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL));
28,845✔
350
  }
351
  handle->tableCreateTimeHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
547,377✔
352

353
END:
547,005✔
354
  reader.api.snapshotFn.destroySnapshot(reader.sContext);
547,377✔
355
  taosArrayDestroy(tbUidList);
547,377✔
356
  return code;
547,377✔
357
}
358

359
static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, uint32_t vLen, STqHandle* handle) {
68,860✔
360
  if (pTq == NULL || pVal == NULL || handle == NULL) {
68,860✔
361
    return TSDB_CODE_INVALID_PARA;
×
362
  }
363
  int32_t  vgId = TD_VID(pTq->pVnode);
68,860✔
364
  SDecoder decoder = {0};
68,860✔
365
  int32_t  code = TDB_CODE_SUCCESS;
68,860✔
366

367
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
68,860✔
368
  TQ_ERR_GO_TO_END(tqDecodeSTqHandle(&decoder, handle));
68,860✔
369
  TQ_ERR_GO_TO_END(tqMetaInitHandle(pTq, handle));
68,860✔
370
  tqInfo("tqMetaRestoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
68,860✔
371
  code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
68,860✔
372

373
END:
68,860✔
374
  tDecoderClear(&decoder);
68,860✔
375
  return code;
68,860✔
376
}
377

378
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
478,517✔
379
  if (pTq == NULL || req == NULL || handle == NULL) {
478,517✔
380
    return TSDB_CODE_INVALID_PARA;
×
381
  }
382
  int32_t vgId = TD_VID(pTq->pVnode);
478,517✔
383

384
  (void)memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
478,517✔
385
  handle->consumerId = req->newConsumerId;
478,517✔
386

387
  handle->execHandle.subType = req->subType;
478,517✔
388
  handle->fetchMeta = req->withMeta;
478,517✔
389
  if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
478,517✔
390
    void* tmp = taosStrdup(req->qmsg);
350,616✔
391
    if (tmp == NULL) {
350,616✔
392
      return terrno;
×
393
    }
394
    handle->execHandle.execCol.qmsg = tmp;
350,616✔
395
    handle->execHandle.execCol.pSW = req->schema;
350,616✔
396
    req->schema.pSchema = NULL;
350,616✔
397
  } else if (req->subType == TOPIC_SUB_TYPE__TABLE) {
127,901✔
398
    handle->execHandle.execTb.suid = req->suid;
23,136✔
399
    void* tmp = taosStrdup(req->qmsg);
23,136✔
400
    if (tmp == NULL) {
23,136✔
401
      return terrno;
×
402
    }
403
    handle->execHandle.execTb.qmsg = tmp;
23,136✔
404
  }
405

406
  handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
478,517✔
407

408
  int32_t code = tqMetaInitHandle(pTq, handle);
478,517✔
409
  if (code != 0) {
478,517✔
410
    return code;
×
411
  }
412
  tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
478,517✔
413
         handle->consumerId, vgId, handle->snapshotVer);
414
  return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
478,926✔
415
}
416

417
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
×
418
  if (pMetaDB == NULL || pOld == NULL || pNew == NULL) {
×
419
    return TSDB_CODE_INVALID_PARA;
×
420
  }
421
  TBC*  pCur = NULL;
×
422
  void* pKey = NULL;
×
423
  int   kLen = 0;
×
424
  void* pVal = NULL;
×
425
  int   vLen = 0;
×
426
  TXN*  txn = NULL;
×
427

428
  int32_t code = TDB_CODE_SUCCESS;
×
429

430
  TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL));
×
431
  TQ_ERR_GO_TO_END(
×
432
      tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
433

434
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
×
435
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
436
    TQ_ERR_GO_TO_END(tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn));
×
437
  }
438

439
  TQ_ERR_GO_TO_END(tdbCommit(pMetaDB, txn));
×
440
  TQ_ERR_GO_TO_END(tdbPostCommit(pMetaDB, txn));
×
441

442
END:
×
443
  tdbFree(pKey);
×
444
  tdbFree(pVal);
×
445
  tdbTbcClose(pCur);
×
446
  return code;
×
447
}
448

449
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
352,123,017✔
450
  if (pTq == NULL || key == NULL || pHandle == NULL) {
352,123,017✔
451
    return TSDB_CODE_INVALID_PARA;
×
452
  }
453
  void* data = taosHashGet(pTq->pHandle, key, strlen(key));
352,202,591✔
454
  if (data == NULL) {
352,129,062✔
455
    int vLen = 0;
547,752✔
456
    if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
547,752✔
457
      tdbFree(data);
477,533✔
458
      return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
478,892✔
459
    }
460
    STqHandle handle = {0};
68,860✔
461
    int32_t code = tqMetaRestoreHandle(pTq, data, vLen >= 0 ? vLen : 0, &handle);
68,860✔
462
    if (code != 0) {
68,860✔
463
      tdbFree(data);
×
464
      tqDestroySTqHandle(&handle);
×
465
      return code;
×
466
    }
467
    tdbFree(data);
68,860✔
468
    *pHandle = taosHashGet(pTq->pHandle, key, strlen(key));
68,860✔
469
    if (*pHandle == NULL) {
68,860✔
470
      return terrno;
×
471
    }
472
  } else {
473
    *pHandle = data;
351,581,310✔
474
  }
475
  return TDB_CODE_SUCCESS;
351,703,232✔
476
}
477

478
int32_t tqMetaOpenTdb(STQ* pTq) {
4,997,850✔
479
  if (pTq == NULL) {
4,997,850✔
480
    return TSDB_CODE_INVALID_PARA;
×
481
  }
482
  int32_t code = TDB_CODE_SUCCESS;
4,997,850✔
483
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, NULL));
4,997,850✔
484
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0));
4,996,508✔
485
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0));
4,998,123✔
486

487
END:
4,996,423✔
488
  if (code != TDB_CODE_SUCCESS) {
4,996,423✔
489
    if (pTq->pExecStore) {
×
490
      tdbTbClose(pTq->pExecStore);
×
491
      pTq->pExecStore = NULL;
×
492
    }
493
    if (pTq->pOffsetStore) {
×
494
      tdbTbClose(pTq->pOffsetStore);
×
495
      pTq->pOffsetStore = NULL;
×
496
    }
497
    tdbClose(pTq->pMetaDB);
×
498
    pTq->pMetaDB = NULL;
×
499
  }
500
  return code;
4,993,290✔
501
}
502

503
static int32_t tqBuildFName(char** data, const char* path, char* name) {
14,993,422✔
504
  int32_t code = 0;
14,993,422✔
505
  int32_t lino = 0;
14,993,422✔
506
  char*   fname = NULL;
14,993,422✔
507
  TSDB_CHECK_NULL(data, code, lino, END, TSDB_CODE_INVALID_MSG);
14,993,422✔
508
  TSDB_CHECK_NULL(path, code, lino, END, TSDB_CODE_INVALID_MSG);
14,993,422✔
509
  TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
14,993,422✔
510
  int32_t len = strlen(path) + strlen(name) + 2;
14,993,422✔
511
  fname = taosMemoryCalloc(1, len);
14,993,422✔
512
  TSDB_CHECK_NULL(fname, code, lino, END, terrno);
14,991,442✔
513
  (void)snprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name);
14,991,442✔
514

515
  *data = fname;
14,991,442✔
516
  fname = NULL;
14,993,941✔
517

518
END:
14,993,941✔
519
  if (code != 0) {
14,993,941✔
520
    tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
×
521
  }
522
  taosMemoryFree(fname);
14,991,911✔
523
  return code;
14,991,863✔
524
}
525

526
static int32_t tqReplacePath(char** path) {
4,997,059✔
527
  if (path == NULL || *path == NULL) {
4,997,059✔
528
    return TSDB_CODE_INVALID_PARA;
×
529
  }
530
  char*   tpath = NULL;
4,997,059✔
531
  int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME);
4,998,216✔
532
  if (code != 0) {
4,998,318✔
533
    return code;
×
534
  }
535
  taosMemoryFree(*path);
4,998,318✔
536
  *path = tpath;
4,997,571✔
537
  return TDB_CODE_SUCCESS;
4,997,571✔
538
}
539

540
static int32_t tqMetaTransform(STQ* pTq) {
×
541
  if (pTq == NULL) {
×
542
    return TSDB_CODE_INVALID_PARA;
×
543
  }
544
  int32_t code = TDB_CODE_SUCCESS;
×
545
  TDB*    pMetaDB = NULL;
×
546
  TTB*    pExecStore = NULL;
×
547
  char*   offsetNew = NULL;
×
548
  char*   offset = NULL;
×
549
  TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME));
×
550

551
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, NULL));
×
552
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0));
×
553

554
  TQ_ERR_GO_TO_END(tqReplacePath(&pTq->path));
×
555
  TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
×
556

557
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
×
558

559
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
×
560

561
  if (taosCheckExistFile(offset)) {
×
562
    if (taosCopyFile(offset, offsetNew) < 0) {
×
563
      tqError("copy offset file error");
×
564
    } else {
565
      TQ_ERR_GO_TO_END(taosRemoveFile(offset));
×
566
    }
567
  }
568

569
END:
×
570
  taosMemoryFree(offset);
×
571
  taosMemoryFree(offsetNew);
×
572

573
  tdbTbClose(pExecStore);
×
574
  tdbClose(pMetaDB);
×
575
  return code;
×
576
}
577

578
int32_t tqMetaOpen(STQ* pTq) {
4,996,281✔
579
  if (pTq == NULL) {
4,996,281✔
580
    return TSDB_CODE_INVALID_PARA;
×
581
  }
582
  char*   maindb = NULL;
4,996,281✔
583
  char*   offsetNew = NULL;
4,998,318✔
584
  int32_t code = TDB_CODE_SUCCESS;
4,998,318✔
585
  TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
4,998,318✔
586
  if (!taosCheckExistFile(maindb)) {
4,997,401✔
587
    TQ_ERR_GO_TO_END(tqReplacePath(&pTq->path));
4,998,216✔
588
    TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
4,997,571✔
589
  } else {
590
    TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
×
591
    TQ_ERR_GO_TO_END(taosRemoveFile(maindb));
×
592
  }
593

594
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
4,996,880✔
595
  if (taosCheckExistFile(offsetNew)) {
4,996,144✔
596
    TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
×
597
    TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
×
598
  }
599

600
END:
4,997,655✔
601
  taosMemoryFree(maindb);
4,996,320✔
602
  taosMemoryFree(offsetNew);
4,996,889✔
603
  return code;
4,996,665✔
604
}
605

606
void tqMetaClose(STQ* pTq) {
4,998,408✔
607
  if (pTq == NULL) {
4,998,408✔
608
    return;
×
609
  }
610
  int32_t ret = 0;
4,998,408✔
611
  if (pTq->pExecStore) {
4,998,408✔
612
    tdbTbClose(pTq->pExecStore);
4,998,318✔
613
  }
614
  if (pTq->pOffsetStore) {
4,998,516✔
615
    tdbTbClose(pTq->pOffsetStore);
4,998,516✔
616
  }
617
  tdbClose(pTq->pMetaDB);
4,998,516✔
618
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc