• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4788

14 Oct 2025 11:21AM UTC coverage: 60.992% (-2.3%) from 63.264%
#4788

push

travis-ci

web-flow
Merge 7ca9b50f9 into 19574fe21

154868 of 324306 branches covered (47.75%)

Branch coverage included in aggregate %.

207304 of 269498 relevant lines covered (76.92%)

125773493.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.08
/source/dnode/vnode/src/tq/tqOffset.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16

17
#include "tq.h"
18

19
int32_t tqBuildFName(char** data, const char* path, char* name) {
12,069,740✔
20
  int32_t code = 0;
12,069,740✔
21
  int32_t lino = 0;
12,069,740✔
22
  char*   fname = NULL;
12,069,740✔
23
  TSDB_CHECK_NULL(data, code, lino, END, TSDB_CODE_INVALID_MSG);
12,069,740!
24
  TSDB_CHECK_NULL(path, code, lino, END, TSDB_CODE_INVALID_MSG);
12,069,740!
25
  TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
12,069,740!
26
  int32_t len = strlen(path) + strlen(name) + 2;
12,069,740!
27
  fname = taosMemoryCalloc(1, len);
12,069,740!
28
  TSDB_CHECK_NULL(fname, code, lino, END, terrno);
12,072,904!
29
  (void)tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name);
12,072,904✔
30

31
  *data = fname;
12,078,309✔
32
  fname = NULL;
12,077,325✔
33

34
END:
12,077,325✔
35
  if (code != 0) {
12,077,325!
36
    tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
×
37
  }
38
  taosMemoryFree(fname);
12,075,953!
39
  return code;
12,075,083✔
40
}
41

42
int32_t tqCommitOffset(void* p) {
6,278,063✔
43
  STQ*    pTq = (STQ*)p;
6,278,063✔
44
  int32_t code = TDB_CODE_SUCCESS;
6,278,063✔
45
  void*   pIter = NULL;
6,278,063✔
46
  int32_t vgId = pTq->pVnode != NULL ? pTq->pVnode->config.vgId : -1;
6,278,063✔
47
  while ((pIter = taosHashIterate(pTq->pOffset, pIter))) {
6,286,871✔
48
    STqOffset* offset = (STqOffset*)pIter;
8,808✔
49
    int32_t    ret = tqMetaSaveOffset(pTq, offset);
8,808✔
50
    if (ret != TDB_CODE_SUCCESS) {
8,808!
51
      code = ret;
×
52
      tqError("tq commit offset error subkey:%s, vgId:%d", offset->subKey, vgId);
×
53
    } else {
54
      if (offset->val.type == TMQ_OFFSET__LOG) {
8,808✔
55
        tqInfo("tq commit offset success subkey:%s vgId:%d, offset(type:log) version:%" PRId64, offset->subKey, vgId,
8,461!
56
               offset->val.version);
57
      }
58
    }
59
  }
60
  return code;
6,278,063✔
61
}
62

63
int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
9✔
64
  int32_t    code = TDB_CODE_SUCCESS;
9✔
65
  int32_t    lino = 0;
9✔
66
  void*      pMemBuf = NULL;
9✔
67
  TdFilePtr  pFile = NULL;
9✔
68
  STqOffset* pOffset = NULL;
9✔
69
  void*      pIter = NULL;
9✔
70

71
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_MSG);
9!
72
  TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
9!
73

74
  pFile = taosOpenFile(name, TD_FILE_READ);
9✔
75
  TSDB_CHECK_NULL(pFile, code, lino, END, TDB_CODE_SUCCESS);
9!
76

77
  int64_t ret = 0;
9✔
78
  int32_t size = 0;
9✔
79
  int32_t total = 0;
9✔
80
  while (1) {
9✔
81
    if ((ret = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
18✔
82
      if (ret != 0) {
9!
83
        code = TSDB_CODE_INVALID_MSG;
×
84
      }
85
      break;
9✔
86
    }
87
    total += INT_BYTES;
9✔
88
    size = htonl(size);
9✔
89
    TSDB_CHECK_CONDITION(size > 0, code, lino, END, TSDB_CODE_INVALID_MSG);
9!
90

91
    pMemBuf = taosMemoryCalloc(1, size);
9!
92
    TSDB_CHECK_NULL(pMemBuf, code, lino, END, terrno);
9!
93
    TSDB_CHECK_CONDITION(taosReadFile(pFile, pMemBuf, size) == size, code, lino, END, TSDB_CODE_INVALID_MSG);
9!
94

95
    total += size;
9✔
96
    STqOffset offset = {0};
9✔
97
    code = tqMetaDecodeOffsetInfo(&offset, pMemBuf, size);
9✔
98
    TSDB_CHECK_CODE(code, lino, END);
9!
99
    pOffset = &offset;
9✔
100
    code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
9!
101
    TSDB_CHECK_CODE(code, lino, END);
9!
102
    pOffset = NULL;
9✔
103

104
    tqInfo("tq: offset restore from file to tdb, size:%d, hash size:%d subkey:%s", total, taosHashGetSize(pTq->pOffset),
9!
105
           offset.subKey);
106
    taosMemoryFree(pMemBuf);
9!
107
    pMemBuf = NULL;
9✔
108
  }
109

110
  code = tqCommitOffset(pTq);
9✔
111
  TSDB_CHECK_CODE(code, lino, END);
9!
112

113
END:
9✔
114
  if (code != 0) {
9!
115
    tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
×
116
  }
117
  (void)taosCloseFile(&pFile);
9✔
118
  taosMemoryFree(pMemBuf);
9!
119

120
  tDeleteSTqOffset(pOffset);
9✔
121
  taosHashCancelIterate(pTq->pOffset, pIter);
9✔
122

123
  return code;
9✔
124
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc