• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.07
/source/dnode/vnode/src/tq/tqMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "tdbInt.h"
16
#include "tq.h"
17

18
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
5,751✔
19
  if (pEncoder == NULL || pHandle == NULL) {
5,751!
UNCOV
20
    return TSDB_CODE_INVALID_PARA;
×
21
  }
22
  int32_t code = 0;
5,751✔
23
  int32_t lino;
24

25
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
5,751!
26
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->subKey));
11,506!
27
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->fetchMeta));
11,506!
28
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->consumerId));
11,506!
29
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->snapshotVer));
11,506!
30
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pHandle->epoch));
11,506!
31
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->execHandle.subType));
11,506!
32
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
5,753✔
33
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg));
9,040!
34
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
1,233✔
35
    int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
1,006✔
36
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
1,006!
37
    void* pIter = NULL;
1,006✔
38
    pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
1,006✔
39
    while (pIter) {
1,006!
UNCOV
40
      int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
×
UNCOV
41
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *tbUid));
×
UNCOV
42
      pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
×
43
    }
44
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
227✔
45
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid));
452!
46
    if (pHandle->execHandle.execTb.qmsg != NULL) {
226!
47
      TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg));
452!
48
    }
49
  }
50
  tEndEncode(pEncoder);
5,753✔
51
_exit:
5,754✔
52
  if (code) {
5,754!
UNCOV
53
    return code;
×
54
  } else {
55
    return pEncoder->pos;
5,754✔
56
  }
57
}
58

59
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
227✔
60
  if (pDecoder == NULL || pHandle == NULL) {
227!
UNCOV
61
    return TSDB_CODE_INVALID_PARA;
×
62
  }
63
  int32_t code = 0;
227✔
64
  int32_t lino;
65

66
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
227!
67
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pHandle->subKey));
227!
68
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->fetchMeta));
454!
69
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->consumerId));
454!
70
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->snapshotVer));
454!
71
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pHandle->epoch));
454!
72
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->execHandle.subType));
454!
73
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
227✔
74
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg));
336!
75
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
59✔
76
    pHandle->execHandle.execDb.pFilterOutTbUid =
31✔
77
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
31✔
78
    if (pHandle->execHandle.execDb.pFilterOutTbUid == NULL) {
31!
79
      TAOS_CHECK_EXIT(terrno);
×
80
    }
81
    int32_t size = 0;
31✔
82
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
31!
83
    for (int32_t i = 0; i < size; i++) {
31!
UNCOV
84
      int64_t tbUid = 0;
×
UNCOV
85
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &tbUid));
×
UNCOV
86
      TAOS_CHECK_EXIT(taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0));
×
87
    }
88
  } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
28!
89
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid));
56!
90
    if (!tDecodeIsEnd(pDecoder)) {
28!
91
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg));
56!
92
    }
93
  }
94
  tEndDecode(pDecoder);
227✔
95

96
_exit:
227✔
97
  return code;
227✔
98
}
99

100
int32_t tqMetaDecodeCheckInfo(STqCheckInfo* info, void* pVal, uint32_t vLen) {
188✔
101
  if (info == NULL || pVal == NULL) {
188!
102
    return TSDB_CODE_INVALID_PARA;
×
103
  }
104
  SDecoder decoder = {0};
188✔
105
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
188✔
106
  int32_t code = tDecodeSTqCheckInfo(&decoder, info);
188✔
107
  tDecoderClear(&decoder);
189✔
108

109
  if (code != 0) {
189!
UNCOV
110
    tDeleteSTqCheckInfo(info);
×
UNCOV
111
    return TSDB_CODE_OUT_OF_MEMORY;
×
112
  }
113
  return code;
189✔
114
}
115

116
int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, uint32_t vLen) {
202✔
117
  if (info == NULL || pVal == NULL) {
202!
UNCOV
118
    return TSDB_CODE_INVALID_PARA;
×
119
  }
120
  SDecoder decoder = {0};
202✔
121
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
202✔
122
  int32_t code = tDecodeSTqOffset(&decoder, info);
202✔
123
  tDecoderClear(&decoder);
202✔
124

125
  if (code != 0) {
202!
126
    tDeleteSTqOffset(info);
×
127
    return TSDB_CODE_OUT_OF_MEMORY;
×
128
  }
129
  return code;
202✔
130
}
131

132
int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
×
133
  if (pTq == NULL || pOffset == NULL) {
×
UNCOV
134
    return TSDB_CODE_INVALID_PARA;
×
135
  }
136
  void*    buf = NULL;
×
137
  int32_t  code = TDB_CODE_SUCCESS;
×
138
  uint32_t  vlen;
139
  SEncoder encoder = {0};
×
UNCOV
140
  tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
×
UNCOV
141
  if (code < 0) {
×
142
    goto END;
×
143
  }
144

145
  buf = taosMemoryCalloc(1, vlen);
×
146
  if (buf == NULL) {
×
147
    code = terrno;
×
UNCOV
148
    goto END;
×
149
  }
150

UNCOV
151
  tEncoderInit(&encoder, buf, vlen);
×
UNCOV
152
  code = tEncodeSTqOffset(&encoder, pOffset);
×
UNCOV
153
  if (code < 0) {
×
UNCOV
154
    goto END;
×
155
  }
156

UNCOV
157
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), buf, vlen));
×
158

UNCOV
159
END:
×
UNCOV
160
  tEncoderClear(&encoder);
×
UNCOV
161
  taosMemoryFree(buf);
×
162
  return code;
×
163
}
164

165
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen, const void* value, uint32_t vLen) {
12,314✔
166
  if (pTq == NULL || ttb == NULL || key == NULL || value == NULL) {
12,314!
UNCOV
167
    return TSDB_CODE_INVALID_PARA;
×
168
  }
169
  int32_t code = TDB_CODE_SUCCESS;
12,315✔
170
  TXN*    txn = NULL;
12,315✔
171

172
  TQ_ERR_GO_TO_END(
12,315!
173
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
174
  TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
12,304!
175
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
12,300!
176
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
12,312!
177

178
  return 0;
12,312✔
179

UNCOV
180
END:
×
UNCOV
181
  tdbAbort(pTq->pMetaDB, txn);
×
UNCOV
182
  return code;
×
183
}
184

185
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, uint32_t kLen) {
1,722✔
186
  if (pTq == NULL || ttb == NULL || key == NULL) {
1,722!
UNCOV
187
    return TSDB_CODE_INVALID_PARA;
×
188
  }
189
  int32_t code = TDB_CODE_SUCCESS;
1,722✔
190
  TXN*    txn = NULL;
1,722✔
191

192
  TQ_ERR_GO_TO_END(
1,722!
193
      tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
194
  TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
1,722✔
195
  TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
1,471!
196
  TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
1,471!
197

198
  return 0;
1,471✔
199

200
END:
247✔
201
  tdbAbort(pTq->pMetaDB, txn);
247✔
202
  return code;
250✔
203
}
204

205
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) {
11,688✔
206
  if (pTq == NULL || subkey == NULL || pOffset == NULL) {
11,688!
UNCOV
207
    return TSDB_CODE_INVALID_PARA;
×
208
  }
209
  void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
11,688✔
210
  if (data == NULL) {
11,688✔
211
    int vLen = 0;
3,395✔
212
    if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) {
3,395✔
213
      tdbFree(data);
3,235✔
214
      return TSDB_CODE_OUT_OF_MEMORY;
3,234✔
215
    }
216

217
    STqOffset offset = {0};
159✔
218
    if (tqMetaDecodeOffsetInfo(&offset, data, vLen >= 0 ? vLen : 0) != TDB_CODE_SUCCESS) {
159!
UNCOV
219
      tdbFree(data);
×
UNCOV
220
      return TSDB_CODE_OUT_OF_MEMORY;
×
221
    }
222

223
    if (taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0) {
159!
UNCOV
224
      tDeleteSTqOffset(&offset);
×
UNCOV
225
      tdbFree(data);
×
UNCOV
226
      return terrno;
×
227
    }
228
    tdbFree(data);
159✔
229

230
    *pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
159✔
231
    if (*pOffset == NULL) {
159!
232
      return TSDB_CODE_OUT_OF_MEMORY;
×
233
    }
234
  } else {
235
    *pOffset = data;
8,293✔
236
  }
237
  return 0;
8,452✔
238
}
239

240
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
2,876✔
241
  if (pTq == NULL || key == NULL || pHandle == NULL) {
2,876!
UNCOV
242
    return TSDB_CODE_INVALID_PARA;
×
243
  }
244
  int32_t  code = TDB_CODE_SUCCESS;
2,877✔
245
  uint32_t  vlen;
246
  void*    buf = NULL;
2,877✔
247
  SEncoder encoder = {0};
2,877✔
248
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
2,877!
249
  if (code < 0) {
2,877!
UNCOV
250
    goto END;
×
251
  }
252

253
  tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
2,877!
254
          (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
255

256
  buf = taosMemoryCalloc(1, vlen);
2,877!
257
  if (buf == NULL) {
2,876!
UNCOV
258
    code = terrno;
×
UNCOV
259
    goto END;
×
260
  }
261

262
  tEncoderInit(&encoder, buf, vlen);
2,876✔
263
  code = tEncodeSTqHandle(&encoder, pHandle);
2,876✔
264
  if (code < 0) {
2,877!
UNCOV
265
    goto END;
×
266
  }
267

268
  TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pExecStore, key, strlen(key), buf, vlen));
2,877!
269

270
END:
2,877✔
271
  tEncoderClear(&encoder);
2,877✔
272
  taosMemoryFree(buf);
2,877!
273
  return code;
2,877✔
274
}
275

276
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
1,483✔
277
  if (pTq == NULL || handle == NULL) {
1,483!
UNCOV
278
    return TSDB_CODE_INVALID_PARA;
×
279
  }
280
  int32_t code = TDB_CODE_SUCCESS;
1,488✔
281

282
  SVnode* pVnode = pTq->pVnode;
1,488✔
283
  int32_t vgId = TD_VID(pVnode);
1,488✔
284

285
  handle->pRef = walOpenRef(pVnode->pWal);
1,488✔
286

287
  TQ_NULL_GO_TO_END(handle->pRef);
1,487!
288
  TQ_ERR_GO_TO_END(walSetRefVer(handle->pRef, handle->snapshotVer));
1,487✔
289

290
  SReadHandle reader = {
1,414✔
291
      .vnode = pVnode,
292
      .initTableReader = true,
293
      .initTqReader = true,
294
      .version = handle->snapshotVer,
1,414✔
295
  };
296

297
  initStorageAPI(&reader.api);
1,414✔
298

299
  if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,414✔
300
    handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId,
2,214✔
301
                                                       &handle->execHandle.numOfCols, handle->consumerId);
1,107✔
302
    TQ_NULL_GO_TO_END(handle->execHandle.task);
1,107!
303
    void* scanner = NULL;
1,107✔
304
    qExtractStreamScanner(handle->execHandle.task, &scanner);
1,107✔
305
    TQ_NULL_GO_TO_END(scanner);
1,107!
306
    handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
1,107✔
307
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
1,106!
308
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
307✔
309
    handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
240✔
310
    TQ_NULL_GO_TO_END(handle->pWalReader);
240!
311
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
240✔
312
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
240!
313
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
240!
314
                                      (SSnapContext**)(&reader.sContext)));
315
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
240✔
316
    TQ_NULL_GO_TO_END(handle->execHandle.task);
240!
317
  } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
67!
318
    handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
67✔
319
    TQ_NULL_GO_TO_END(handle->pWalReader);
67!
320
    if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
67!
321
      if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
21!
UNCOV
322
        tqError("nodesStringToNode error in sub stable, since %s", terrstr());
×
UNCOV
323
        return TSDB_CODE_SCH_INTERNAL_ERROR;
×
324
      }
325
    }
326
    TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid,
67!
327
                                      handle->execHandle.subType, handle->fetchMeta,
328
                                      (SSnapContext**)(&reader.sContext)));
329
    handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
67✔
330
    TQ_NULL_GO_TO_END(handle->execHandle.task);
67!
331
    SArray* tbUidList = NULL;
67✔
332
    int     ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList,
67✔
333
                                handle->execHandle.task);
334
    if (ret != TDB_CODE_SUCCESS) {
67!
UNCOV
335
      tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId);
×
UNCOV
336
      taosArrayDestroy(tbUidList);
×
UNCOV
337
      return TSDB_CODE_SCH_INTERNAL_ERROR;
×
338
    }
339
    tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId,
67!
340
           handle->execHandle.execTb.suid);
341
    handle->execHandle.pTqReader = tqReaderOpen(pVnode);
67✔
342
    TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
67!
343
    tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
67✔
344
    taosArrayDestroy(tbUidList);
67✔
345
  }
346

UNCOV
347
END:
×
348
  return code;
1,486✔
349
}
350

351
static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, uint32_t vLen, STqHandle* handle) {
184✔
352
  if (pTq == NULL || pVal == NULL || handle == NULL) {
184!
UNCOV
353
    return TSDB_CODE_INVALID_PARA;
×
354
  }
355
  int32_t  vgId = TD_VID(pTq->pVnode);
184✔
356
  SDecoder decoder = {0};
184✔
357
  int32_t  code = TDB_CODE_SUCCESS;
184✔
358

359
  tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
184✔
360
  TQ_ERR_GO_TO_END(tDecodeSTqHandle(&decoder, handle));
184!
361
  TQ_ERR_GO_TO_END(tqMetaInitHandle(pTq, handle));
184✔
362
  tqInfo("tqMetaRestoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
111!
363
  code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
111✔
364

365
END:
184✔
366
  tDecoderClear(&decoder);
184✔
367
  return code;
184✔
368
}
369

370
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
1,304✔
371
  if (pTq == NULL || req == NULL || handle == NULL) {
1,304!
UNCOV
372
    return TSDB_CODE_INVALID_PARA;
×
373
  }
374
  int32_t vgId = TD_VID(pTq->pVnode);
1,304✔
375

376
  (void)memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
1,304✔
377
  handle->consumerId = req->newConsumerId;
1,304✔
378

379
  handle->execHandle.subType = req->subType;
1,304✔
380
  handle->fetchMeta = req->withMeta;
1,304✔
381
  if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
1,304✔
382
    void* tmp = taosStrdup(req->qmsg);
1,015!
383
    if (tmp == NULL) {
1,015!
UNCOV
384
      return terrno;
×
385
    }
386
    handle->execHandle.execCol.qmsg = tmp;
1,015✔
387
  } else if (req->subType == TOPIC_SUB_TYPE__DB) {
289✔
388
    handle->execHandle.execDb.pFilterOutTbUid =
231✔
389
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
231✔
390
    if (handle->execHandle.execDb.pFilterOutTbUid == NULL) {
231!
391
      return terrno;
×
392
    }
393
  } else if (req->subType == TOPIC_SUB_TYPE__TABLE) {
58!
394
    handle->execHandle.execTb.suid = req->suid;
58✔
395
    void* tmp = taosStrdup(req->qmsg);
58!
396
    if (tmp == NULL) {
58!
397
      return terrno;
×
398
    }
399
    handle->execHandle.execTb.qmsg = tmp;
58✔
400
  }
401

402
  handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
1,304✔
403

404
  int32_t code = tqMetaInitHandle(pTq, handle);
1,304✔
405
  if (code != 0) {
1,302!
UNCOV
406
    return code;
×
407
  }
408
  tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
1,302!
409
         handle->consumerId, vgId, handle->snapshotVer);
410
  return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
1,303✔
411
}
412

UNCOV
413
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
×
UNCOV
414
  if (pMetaDB == NULL || pOld == NULL || pNew == NULL) {
×
UNCOV
415
    return TSDB_CODE_INVALID_PARA;
×
416
  }
UNCOV
417
  TBC*  pCur = NULL;
×
UNCOV
418
  void* pKey = NULL;
×
UNCOV
419
  int   kLen = 0;
×
UNCOV
420
  void* pVal = NULL;
×
UNCOV
421
  int   vLen = 0;
×
UNCOV
422
  TXN*  txn = NULL;
×
423

UNCOV
424
  int32_t code = TDB_CODE_SUCCESS;
×
425

UNCOV
426
  TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL));
×
UNCOV
427
  TQ_ERR_GO_TO_END(
×
428
      tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
429

UNCOV
430
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
×
UNCOV
431
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
UNCOV
432
    TQ_ERR_GO_TO_END(tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn));
×
433
  }
434

UNCOV
435
  TQ_ERR_GO_TO_END(tdbCommit(pMetaDB, txn));
×
UNCOV
436
  TQ_ERR_GO_TO_END(tdbPostCommit(pMetaDB, txn));
×
437

UNCOV
438
END:
×
UNCOV
439
  tdbFree(pKey);
×
UNCOV
440
  tdbFree(pVal);
×
UNCOV
441
  tdbTbcClose(pCur);
×
UNCOV
442
  return code;
×
443
}
444

445
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
72,248✔
446
  if (pTq == NULL || key == NULL || pHandle == NULL) {
72,248!
UNCOV
447
    return TSDB_CODE_INVALID_PARA;
×
448
  }
449
  void* data = taosHashGet(pTq->pHandle, key, strlen(key));
72,269✔
450
  if (data == NULL) {
72,269✔
451
    int vLen = 0;
1,430✔
452
    if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
1,430✔
453
      tdbFree(data);
1,249✔
454
      return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
1,322✔
455
    }
456
    STqHandle handle = {0};
184✔
457
    if (tqMetaRestoreHandle(pTq, data, vLen >= 0 ? vLen : 0, &handle) != 0) {
184✔
458
      tdbFree(data);
73✔
459
      tqDestroyTqHandle(&handle);
73✔
460
      return TSDB_CODE_OUT_OF_MEMORY;
73✔
461
    }
462
    tdbFree(data);
111✔
463
    *pHandle = taosHashGet(pTq->pHandle, key, strlen(key));
111✔
464
    if (*pHandle == NULL) {
111!
UNCOV
465
      return TSDB_CODE_OUT_OF_MEMORY;
×
466
    }
467
  } else {
468
    *pHandle = data;
70,839✔
469
  }
470
  return TDB_CODE_SUCCESS;
70,950✔
471
}
472

473
int32_t tqMetaOpenTdb(STQ* pTq) {
9,593✔
474
  if (pTq == NULL) {
9,593!
UNCOV
475
    return TSDB_CODE_INVALID_PARA;
×
476
  }
477
  int32_t code = TDB_CODE_SUCCESS;
9,593✔
478
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL));
9,593!
479
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0));
9,596!
480
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0));
9,597!
481
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0));
9,596!
482

483
END:
9,597✔
484
  return code;
9,597✔
485
}
486

487
static int32_t replaceTqPath(char** path) {
9,596✔
488
  if (path == NULL || *path == NULL) {
9,596!
489
    return TSDB_CODE_INVALID_PARA;
×
490
  }
491
  char*   tpath = NULL;
9,597✔
492
  int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME);
9,597✔
493
  if (code != 0) {
9,597!
494
    return code;
×
495
  }
496
  taosMemoryFree(*path);
9,597!
497
  *path = tpath;
9,596✔
498
  return TDB_CODE_SUCCESS;
9,596✔
499
}
500

501
static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
9,597✔
502
  if (pTq == NULL) {
9,597!
UNCOV
503
    return TSDB_CODE_INVALID_PARA;
×
504
  }
505
  TBC*         pCur = NULL;
9,597✔
506
  void*        pKey = NULL;
9,597✔
507
  int          kLen = 0;
9,597✔
508
  void*        pVal = NULL;
9,597✔
509
  int          vLen = 0;
9,597✔
510
  int32_t      code = 0;
9,597✔
511
  STqCheckInfo info = {0};
9,597✔
512

513
  TQ_ERR_GO_TO_END(tdbTbcOpen(pTq->pCheckStore, &pCur, NULL));
9,597!
514
  TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
9,597!
515

516
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
9,667✔
517
    TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen >= 0 ? vLen : 0));
70!
518
    TQ_ERR_GO_TO_END(taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)));
71!
519
  }
520
  info.colIdList = NULL;
9,597✔
521

522
END:
9,597✔
523
  tdbFree(pKey);
9,597✔
524
  tdbFree(pVal);
9,596✔
525
  tdbTbcClose(pCur);
9,597✔
526
  tDeleteSTqCheckInfo(&info);
9,597✔
527
  return code;
9,597✔
528
}
529

530
int32_t tqMetaOpen(STQ* pTq) {
9,585✔
531
  if (pTq == NULL) {
9,585!
UNCOV
532
    return TSDB_CODE_INVALID_PARA;
×
533
  }
534
  char*   maindb = NULL;
9,585✔
535
  char*   offsetNew = NULL;
9,585✔
536
  int32_t code = TDB_CODE_SUCCESS;
9,585✔
537
  TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
9,585!
538
  if (!taosCheckExistFile(maindb)) {
9,595!
539
    TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
9,597!
540
    TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
9,597!
541
  } else {
542
    TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
×
UNCOV
543
    TQ_ERR_GO_TO_END(taosRemoveFile(maindb));
×
544
  }
545

546
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
9,597!
547
  if (taosCheckExistFile(offsetNew)) {
9,597!
UNCOV
548
    TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
×
UNCOV
549
    TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
×
550
  }
551

552
  TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
9,597!
553

554
END:
9,597✔
555
  taosMemoryFree(maindb);
9,597!
556
  taosMemoryFree(offsetNew);
9,597!
557
  return code;
9,596✔
558
}
559

UNCOV
560
int32_t tqMetaTransform(STQ* pTq) {
×
UNCOV
561
  if (pTq == NULL) {
×
UNCOV
562
    return TSDB_CODE_INVALID_PARA;
×
563
  }
UNCOV
564
  int32_t code = TDB_CODE_SUCCESS;
×
UNCOV
565
  TDB*    pMetaDB = NULL;
×
UNCOV
566
  TTB*    pExecStore = NULL;
×
UNCOV
567
  TTB*    pCheckStore = NULL;
×
UNCOV
568
  char*   offsetNew = NULL;
×
UNCOV
569
  char*   offset = NULL;
×
UNCOV
570
  TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME));
×
571

UNCOV
572
  TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL));
×
UNCOV
573
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0));
×
UNCOV
574
  TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0));
×
575

UNCOV
576
  TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
×
UNCOV
577
  TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
×
578

UNCOV
579
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
×
UNCOV
580
  TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
×
581

UNCOV
582
  TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
×
583

UNCOV
584
  if (taosCheckExistFile(offset)) {
×
UNCOV
585
    if (taosCopyFile(offset, offsetNew) < 0) {
×
UNCOV
586
      tqError("copy offset file error");
×
587
    } else {
UNCOV
588
      TQ_ERR_GO_TO_END(taosRemoveFile(offset));
×
589
    }
590
  }
591

UNCOV
592
END:
×
UNCOV
593
  taosMemoryFree(offset);
×
UNCOV
594
  taosMemoryFree(offsetNew);
×
595

UNCOV
596
  tdbTbClose(pExecStore);
×
UNCOV
597
  tdbTbClose(pCheckStore);
×
UNCOV
598
  tdbClose(pMetaDB);
×
UNCOV
599
  return code;
×
600
}
601

602
void tqMetaClose(STQ* pTq) {
9,597✔
603
  if (pTq == NULL) {
9,597!
UNCOV
604
    return;
×
605
  }
606
  int32_t ret = 0;
9,597✔
607
  if (pTq->pExecStore) {
9,597!
608
    tdbTbClose(pTq->pExecStore);
9,597✔
609
  }
610
  if (pTq->pCheckStore) {
9,597!
611
    tdbTbClose(pTq->pCheckStore);
9,597✔
612
  }
613
  if (pTq->pOffsetStore) {
9,597!
614
    tdbTbClose(pTq->pOffsetStore);
9,597✔
615
  }
616
  tdbClose(pTq->pMetaDB);
9,597✔
617
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc