• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3544

30 Nov 2024 03:06AM UTC coverage: 60.88% (+0.04%) from 60.842%
#3544

push

travis-ci

web-flow
Merge pull request #28988 from taosdata/main

merge: from main to 3.0 branch

120724 of 253479 branches covered (47.63%)

Branch coverage included in aggregate %.

407 of 489 new or added lines in 21 files covered. (83.23%)

1148 existing lines in 113 files now uncovered.

201919 of 276488 relevant lines covered (73.03%)

18898587.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.63
/source/dnode/vnode/src/tq/tqMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "tdbInt.h"
16
#include "tq.h"
17

18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
8,255✔
19
  int32_t code = 0;
8,255✔
20
  int32_t lino;
21

22
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
8,255!
23
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->subKey));
16,510!
24
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->fetchMeta));
16,510!
25
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->consumerId));
16,510!
26
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->snapshotVer));
16,510!
27
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pHandle->epoch));
16,510!
28
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->execHandle.subType));
16,510!
29
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
8,255✔
30
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg));
13,904!
31
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
1,303✔
32
    int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
1,076✔
33
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
1,076!
34
    void* pIter = NULL;
1,076✔
35
    pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
1,076✔
36
    while (pIter) {
3,784✔
37
      int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
2,710✔
38
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *tbUid));
5,406!
39
      pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
2,703✔
40
    }
41
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
227!
42
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid));
456!
43
    if (pHandle->execHandle.execTb.qmsg != NULL) {
228!
44
      TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg));
456!
45
    }
46
  }
47
  tEndEncode(pEncoder);
8,253✔
48
_exit:
8,256✔
49
  if (code) {
8,256!
50
    return code;
×
51
  } else {
52
    return pEncoder->pos;
8,256✔
53
  }
54
}
55

56
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
261✔
57
  int32_t code = 0;
261✔
58
  int32_t lino;
59

60
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
261!
61
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pHandle->subKey));
261!
62
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->fetchMeta));
522!
63
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->consumerId));
522!
64
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->snapshotVer));
522!
65
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pHandle->epoch));
522!
66
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->execHandle.subType));
522!
67
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
261✔
68
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg));
332!
69
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
95✔
70
    pHandle->execHandle.execDb.pFilterOutTbUid =
69✔
71
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
69✔
72
    if (pHandle->execHandle.execDb.pFilterOutTbUid == NULL) {
69!
73
      TAOS_CHECK_EXIT(terrno);
×
74
    }
75
    int32_t size = 0;
69✔
76
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
69!
77
    for (int32_t i = 0; i < size; i++) {
69!
78
      int64_t tbUid = 0;
×
79
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &tbUid));
×
80
      TAOS_CHECK_EXIT(taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0));
×
81
    }
82
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
26!
83
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid));
52!
84
    if (!tDecodeIsEnd(pDecoder)) {
26!
85
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg));
52!
86
    }
87
  }
88
  tEndDecode(pDecoder);
261✔
89

90
_exit:
261✔
91
  return code;
261✔
92
}
93

94
int32_t tqMetaDecodeCheckInfo(STqCheckInfo* info, void* pVal, int32_t vLen) {
202✔
95
  SDecoder decoder = {0};
202✔
96
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
202✔
97
  int32_t code = tDecodeSTqCheckInfo(&decoder, info);
202✔
98
  tDecoderClear(&decoder);
202✔
99

100
  if (code != 0) {
202!
101
    tDeleteSTqCheckInfo(info);
×
102
    return TSDB_CODE_OUT_OF_MEMORY;
×
103
  }
104
  return code;
202✔
105
}
106

107
int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, int32_t vLen) {
198✔
108
  SDecoder decoder = {0};
198✔
109
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
198✔
110
  int32_t code = tDecodeSTqOffset(&decoder, info);
198✔
111
  tDecoderClear(&decoder);
198✔
112

113
  if (code != 0) {
198!
114
    tDeleteSTqOffset(info);
×
115
    return TSDB_CODE_OUT_OF_MEMORY;
×
116
  }
117
  return code;
198✔
118
}
119

120
int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
×
121
  void*    buf = NULL;
×
122
  int32_t  code = TDB_CODE_SUCCESS;
×
123
  int32_t  vlen;
124
  SEncoder encoder = {0};
×
125
  tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
×
126
  if (code < 0) {
×
127
    goto END;
×
128
  }
129

130
  buf = taosMemoryCalloc(1, vlen);
×
131
  if (buf == NULL) {
×
132
    code = terrno;
×
133
    goto END;
×
134
  }
135

136
  tEncoderInit(&encoder, buf, vlen);
×
137
  code = tEncodeSTqOffset(&encoder, pOffset);
×
138
  if (code < 0) {
×
139
    goto END;
×
140
  }
141

142
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), buf, vlen));
×
143

144
END:
×
145
  tEncoderClear(&encoder);
×
146
  taosMemoryFree(buf);
×
147
  return code;
×
148
}
149

150
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) {
13,634✔
151
  int32_t code = TDB_CODE_SUCCESS;
13,634✔
152
  TXN*    txn = NULL;
13,634✔
153

154
  TQ_ERR_GO_TO_END(
13,634!
155
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
156
  TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
13,619!
157
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
13,615!
158
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
13,633!
159

160
  return 0;
13,630✔
161

162
END:
×
163
  tdbAbort(pTq->pMetaDB, txn);
×
164
  return code;
×
165
}
166

167
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) {
1,781✔
168
  int32_t code = TDB_CODE_SUCCESS;
1,781✔
169
  TXN*    txn = NULL;
1,781✔
170

171
  TQ_ERR_GO_TO_END(
1,781!
172
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
173
  TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
1,779✔
174
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
1,522!
175
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
1,528!
176

177
  return 0;
1,528✔
178

179
END:
255✔
180
  tdbAbort(pTq->pMetaDB, txn);
255✔
181
  return code;
255✔
182
}
183

184
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) {
12,119✔
185
  void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
12,119✔
186
  if (data == NULL) {
12,119✔
187
    int vLen = 0;
3,567✔
188
    if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) {
3,567✔
189
      tdbFree(data);
3,410✔
190
      return TSDB_CODE_OUT_OF_MEMORY;
3,409✔
191
    }
192

193
    STqOffset offset = {0};
156✔
194
    if (tqMetaDecodeOffsetInfo(&offset, data, vLen) != TDB_CODE_SUCCESS) {
156!
195
      tdbFree(data);
×
196
      return TSDB_CODE_OUT_OF_MEMORY;
×
197
    }
198

199
    if (taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0) {
156!
200
      tDeleteSTqOffset(&offset);
×
201
      tdbFree(data);
×
202
      return terrno;
×
203
    }
204
    tdbFree(data);
156✔
205

206
    *pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
156✔
207
    if (*pOffset == NULL) {
156!
208
      return TSDB_CODE_OUT_OF_MEMORY;
×
209
    }
210
  } else {
211
    *pOffset = data;
8,552✔
212
  }
213
  return 0;
8,708✔
214
}
215

216
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
4,128✔
217
  int32_t  code = TDB_CODE_SUCCESS;
4,128✔
218
  int32_t  vlen;
219
  void*    buf = NULL;
4,128✔
220
  SEncoder encoder = {0};
4,128✔
221
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
4,128!
222
  if (code < 0) {
4,128!
223
    goto END;
×
224
  }
225

226
  tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
4,128✔
227
          (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
228

229
  buf = taosMemoryCalloc(1, vlen);
4,129✔
230
  if (buf == NULL) {
4,128!
231
    code = terrno;
×
232
    goto END;
×
233
  }
234

235
  tEncoderInit(&encoder, buf, vlen);
4,128✔
236
  code = tEncodeSTqHandle(&encoder, pHandle);
4,128✔
237
  if (code < 0) {
4,128!
238
    goto END;
×
239
  }
240

241
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pExecStore, key, (int)strlen(key), buf, vlen));
4,128!
242

243
END:
4,127✔
244
  tEncoderClear(&encoder);
4,127✔
245
  taosMemoryFree(buf);
4,126✔
246
  return code;
4,127✔
247
}
248

249
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
1,672✔
250
  int32_t code = TDB_CODE_SUCCESS;
1,672✔
251

252
  SVnode* pVnode = pTq->pVnode;
1,672✔
253
  int32_t vgId = TD_VID(pVnode);
1,672✔
254

255
  handle->pRef = walOpenRef(pVnode->pWal);
1,672✔
256

257
  TQ_NULL_GO_TO_END(handle->pRef);
1,672!
258
  TQ_ERR_GO_TO_END(walSetRefVer(handle->pRef, handle->snapshotVer));
1,672✔
259

260
  SReadHandle reader = {
1,563✔
261
      .vnode = pVnode,
262
      .initTableReader = true,
263
      .initTqReader = true,
264
      .version = handle->snapshotVer,
1,563✔
265
  };
266

267
  initStorageAPI(&reader.api);
1,563✔
268

269
  if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,562✔
270
    handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId,
2,479✔
271
                                                       &handle->execHandle.numOfCols, handle->consumerId);
1,239✔
272
    TQ_NULL_GO_TO_END(handle->execHandle.task);
1,240!
273
    void* scanner = NULL;
1,240✔
274
    qExtractStreamScanner(handle->execHandle.task, &scanner);
1,240✔
275
    TQ_NULL_GO_TO_END(scanner);
1,240!
276
    handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
1,240✔
277
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
1,239!
278
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
323✔
279
    handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
258✔
280
    TQ_NULL_GO_TO_END(handle->pWalReader);
258!
281
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
258✔
282
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
258!
283
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
258!
284
                                      (SSnapContext**)(&reader.sContext)));
285
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
258✔
286
    TQ_NULL_GO_TO_END(handle->execHandle.task);
258!
287
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
65!
288
    handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
65✔
289
    TQ_NULL_GO_TO_END(handle->pWalReader);
65!
290
    if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
65!
291
      if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
21!
292
        tqError("nodesStringToNode error in sub stable, since %s", terrstr());
×
293
        return TSDB_CODE_SCH_INTERNAL_ERROR;
×
294
      }
295
    }
296
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid,
65!
297
                                      handle->execHandle.subType, handle->fetchMeta,
298
                                      (SSnapContext**)(&reader.sContext)));
299
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
65✔
300
    TQ_NULL_GO_TO_END(handle->execHandle.task);
65!
301
    SArray* tbUidList = NULL;
65✔
302
    int     ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList,
65✔
303
                                handle->execHandle.task);
304
    if (ret != TDB_CODE_SUCCESS) {
65!
305
      tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId);
×
306
      taosArrayDestroy(tbUidList);
×
307
      return TSDB_CODE_SCH_INTERNAL_ERROR;
×
308
    }
309
    tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId,
65!
310
           handle->execHandle.execTb.suid);
311
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
65✔
312
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
65!
313
    tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
65✔
314
    taosArrayDestroy(tbUidList);
65✔
315
  }
316

317
END:
×
318
  return code;
1,671✔
319
}
320

321
static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle) {
218✔
322
  int32_t  vgId = TD_VID(pTq->pVnode);
218✔
323
  SDecoder decoder = {0};
218✔
324
  int32_t  code = TDB_CODE_SUCCESS;
218✔
325

326
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
218✔
327
  TQ_ERR_GO_TO_END(tDecodeSTqHandle(&decoder, handle));
218!
328
  TQ_ERR_GO_TO_END(tqMetaInitHandle(pTq, handle));
218✔
329
  tqInfo("tqMetaRestoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
109!
330
  code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
109✔
331

332
END:
218✔
333
  tDecoderClear(&decoder);
218✔
334
  return code;
218✔
335
}
336

337
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
1,454✔
338
  int32_t vgId = TD_VID(pTq->pVnode);
1,454✔
339

340
  (void)memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
1,454✔
341
  handle->consumerId = req->newConsumerId;
1,454✔
342

343
  handle->execHandle.subType = req->subType;
1,454✔
344
  handle->fetchMeta = req->withMeta;
1,454✔
345
  if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
1,454✔
346
    void* tmp = taosStrdup(req->qmsg);
1,147✔
347
    if (tmp == NULL) {
1,147!
UNCOV
348
      return terrno;
×
349
    }
350
    handle->execHandle.execCol.qmsg = tmp;
1,147✔
351
  } else if (req->subType == TOPIC_SUB_TYPE__DB) {
307✔
352
    handle->execHandle.execDb.pFilterOutTbUid =
248✔
353
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
248✔
354
    if (handle->execHandle.execDb.pFilterOutTbUid == NULL) {
248!
355
      return terrno;
×
356
    }
357
  } else if (req->subType == TOPIC_SUB_TYPE__TABLE) {
59!
358
    handle->execHandle.execTb.suid = req->suid;
59✔
359
    void* tmp = taosStrdup(req->qmsg);
59✔
360
    if (tmp == NULL) {
59!
361
      return terrno;
×
362
    }
363
    handle->execHandle.execTb.qmsg = tmp;
59✔
364
  }
365

366
  handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
1,454✔
367

368
  int32_t code = tqMetaInitHandle(pTq, handle);
1,454✔
369
  if (code != 0) {
1,453!
370
    return code;
×
371
  }
372
  tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
1,453!
373
         handle->consumerId, vgId, handle->snapshotVer);
374
  return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
1,454✔
375
}
376

377
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
×
378
  TBC*  pCur = NULL;
×
379
  void* pKey = NULL;
×
380
  int   kLen = 0;
×
381
  void* pVal = NULL;
×
382
  int   vLen = 0;
×
383
  TXN*  txn = NULL;
×
384

385
  int32_t code = TDB_CODE_SUCCESS;
×
386

387
  TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL));
×
388
  TQ_ERR_GO_TO_END(
×
389
      tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
390

391
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
×
392
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
393
    TQ_ERR_GO_TO_END(tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn));
×
394
  }
395

396
  TQ_ERR_GO_TO_END(tdbCommit(pMetaDB, txn));
×
397
  TQ_ERR_GO_TO_END(tdbPostCommit(pMetaDB, txn));
×
398

399
END:
×
400
  tdbFree(pKey);
×
401
  tdbFree(pVal);
×
402
  tdbTbcClose(pCur);
×
403
  return code;
×
404
}
405

406
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
79,759✔
407
  void* data = taosHashGet(pTq->pHandle, key, strlen(key));
79,759✔
408
  if (data == NULL) {
79,792✔
409
    int vLen = 0;
1,657✔
410
    if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
1,657✔
411
      tdbFree(data);
1,398✔
412
      return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
1,507✔
413
    }
414
    STqHandle handle = {0};
218✔
415
    if (tqMetaRestoreHandle(pTq, data, vLen, &handle) != 0) {
218✔
416
      tdbFree(data);
109✔
417
      tqDestroyTqHandle(&handle);
109✔
418
      return TSDB_CODE_OUT_OF_MEMORY;
109✔
419
    }
420
    tdbFree(data);
109✔
421
    *pHandle = taosHashGet(pTq->pHandle, key, strlen(key));
109✔
422
    if (*pHandle == NULL) {
109!
423
      return TSDB_CODE_OUT_OF_MEMORY;
×
424
    }
425
  } else {
426
    *pHandle = data;
78,135✔
427
  }
428
  return TDB_CODE_SUCCESS;
78,244✔
429
}
430

431
int32_t tqMetaOpenTdb(STQ* pTq) {
13,799✔
432
  int32_t code = TDB_CODE_SUCCESS;
13,799✔
433
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL));
13,799!
434
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0));
13,799!
435
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0));
13,797!
436
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0));
13,784!
437

438
END:
13,799✔
439
  return code;
13,799✔
440
}
441

442
static int32_t replaceTqPath(char** path) {
13,799✔
443
  char*   tpath = NULL;
13,799✔
444
  int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME);
13,799✔
445
  if (code != 0) {
13,799!
446
    return code;
×
447
  }
448
  taosMemoryFree(*path);
13,799✔
449
  *path = tpath;
13,799✔
450
  return TDB_CODE_SUCCESS;
13,799✔
451
}
452

453
static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
13,799✔
454
  TBC*         pCur = NULL;
13,799✔
455
  void*        pKey = NULL;
13,799✔
456
  int          kLen = 0;
13,799✔
457
  void*        pVal = NULL;
13,799✔
458
  int          vLen = 0;
13,799✔
459
  int32_t      code = 0;
13,799✔
460
  STqCheckInfo info = {0};
13,799✔
461

462
  TQ_ERR_GO_TO_END(tdbTbcOpen(pTq->pCheckStore, &pCur, NULL));
13,799!
463
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
13,798!
464

465
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
13,869✔
466
    TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen));
70!
467
    TQ_ERR_GO_TO_END(taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)));
71!
468
  }
469
  info.colIdList = NULL;
13,799✔
470

471
END:
13,799✔
472
  tdbFree(pKey);
13,799✔
473
  tdbFree(pVal);
13,799✔
474
  tdbTbcClose(pCur);
13,799✔
475
  tDeleteSTqCheckInfo(&info);
13,799✔
476
  return code;
13,795✔
477
}
478

479
int32_t tqMetaOpen(STQ* pTq) {
13,784✔
480
  char*   maindb = NULL;
13,784✔
481
  char*   offsetNew = NULL;
13,784✔
482
  int32_t code = TDB_CODE_SUCCESS;
13,784✔
483
  TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
13,784!
484
  if (!taosCheckExistFile(maindb)) {
13,798!
485
    TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
13,799!
486
    TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
13,799!
487
  } else {
488
    TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
×
489
    TQ_ERR_GO_TO_END(taosRemoveFile(maindb));
×
490
  }
491

492
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
13,799!
493
  if (taosCheckExistFile(offsetNew)) {
13,799!
494
    TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
×
495
    TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
×
496
  }
497

498
  TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
13,799!
499

500
END:
13,795✔
501
  taosMemoryFree(maindb);
13,795✔
502
  taosMemoryFree(offsetNew);
13,796✔
503
  return code;
13,796✔
504
}
505

506
int32_t tqMetaTransform(STQ* pTq) {
×
507
  int32_t code = TDB_CODE_SUCCESS;
×
508
  TDB*    pMetaDB = NULL;
×
509
  TTB*    pExecStore = NULL;
×
510
  TTB*    pCheckStore = NULL;
×
511
  char*   offsetNew = NULL;
×
512
  char*   offset = NULL;
×
513
  TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME));
×
514

515
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL));
×
516
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0));
×
517
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0));
×
518

519
  TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
×
520
  TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
×
521

522
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
×
523
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
×
524

525
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
×
526

527
  if (taosCheckExistFile(offset)) {
×
528
    if (taosCopyFile(offset, offsetNew) < 0) {
×
529
      tqError("copy offset file error");
×
530
    } else {
531
      TQ_ERR_GO_TO_END(taosRemoveFile(offset));
×
532
    }
533
  }
534

535
END:
×
536
  taosMemoryFree(offset);
×
537
  taosMemoryFree(offsetNew);
×
538

539
  tdbTbClose(pExecStore);
×
540
  tdbTbClose(pCheckStore);
×
541
  tdbClose(pMetaDB);
×
542
  return code;
×
543
}
544

545
void tqMetaClose(STQ* pTq) {
13,798✔
546
  int32_t ret = 0;
13,798✔
547
  if (pTq->pExecStore) {
13,798!
548
    tdbTbClose(pTq->pExecStore);
13,799✔
549
  }
550
  if (pTq->pCheckStore) {
13,798!
551
    tdbTbClose(pTq->pCheckStore);
13,799✔
552
  }
553
  if (pTq->pOffsetStore) {
13,798!
554
    tdbTbClose(pTq->pOffsetStore);
13,799✔
555
  }
556
  tdbClose(pTq->pMetaDB);
13,798✔
557
}
13,796✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc