• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.93
/source/dnode/vnode/src/tq/tqOffset.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16

17
#include "tq.h"
18

19
int32_t tqBuildFName(char** data, const char* path, char* name) {
2,094,379✔
20
  int32_t code = 0;
2,094,379✔
21
  int32_t lino = 0;
2,094,379✔
22
  char*   fname = NULL;
2,094,379✔
23
  TSDB_CHECK_NULL(data, code, lino, END, TSDB_CODE_INVALID_MSG);
2,094,379!
24
  TSDB_CHECK_NULL(path, code, lino, END, TSDB_CODE_INVALID_MSG);
2,094,379!
25
  TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
2,094,379!
26
  int32_t len = strlen(path) + strlen(name) + 2;
2,094,379!
27
  fname = taosMemoryCalloc(1, len);
2,094,379!
28
  TSDB_CHECK_NULL(fname, code, lino, END, terrno);
2,094,867!
29
  (void)tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name);
2,094,867✔
30

31
  *data = fname;
2,095,623✔
32
  fname = NULL;
2,095,452✔
33

34
END:
2,095,452✔
35
  if (code != 0) {
2,095,452!
36
    tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
×
37
  }
38
  taosMemoryFree(fname);
2,095,623!
39
  return code;
2,095,794✔
40
}
41

42
int32_t tqCommitOffset(void* p) {
837,980✔
43
  STQ*    pTq = (STQ*)p;
837,980✔
44
  int32_t code = TDB_CODE_SUCCESS;
837,980✔
45
  void*   pIter = NULL;
837,980✔
46
  int32_t vgId = pTq->pVnode != NULL ? pTq->pVnode->config.vgId : -1;
837,980!
47
  while ((pIter = taosHashIterate(pTq->pOffset, pIter))) {
841,543✔
48
    STqOffset* offset = (STqOffset*)pIter;
2,635✔
49
    int32_t    ret = tqMetaSaveOffset(pTq, offset);
2,635✔
50
    if (ret != TDB_CODE_SUCCESS) {
2,635!
51
      code = ret;
×
52
      tqError("tq commit offset error subkey:%s, vgId:%d", offset->subKey, vgId);
×
53
    } else {
54
      if (offset->val.type == TMQ_OFFSET__LOG) {
2,635✔
55
        tqInfo("tq commit offset success subkey:%s vgId:%d, offset(type:log) version:%" PRId64, offset->subKey, vgId,
2,377!
56
               offset->val.version);
57
      }
58
    }
59
  }
60
  return code;
838,908✔
61
}
62

UNCOV
63
int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
×
UNCOV
64
  int32_t    code = TDB_CODE_SUCCESS;
×
UNCOV
65
  int32_t    lino = 0;
×
UNCOV
66
  void*      pMemBuf = NULL;
×
UNCOV
67
  TdFilePtr  pFile = NULL;
×
UNCOV
68
  STqOffset* pOffset = NULL;
×
UNCOV
69
  void*      pIter = NULL;
×
70

UNCOV
71
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_MSG);
×
UNCOV
72
  TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
×
73

UNCOV
74
  pFile = taosOpenFile(name, TD_FILE_READ);
×
UNCOV
75
  TSDB_CHECK_NULL(pFile, code, lino, END, TDB_CODE_SUCCESS);
×
76

UNCOV
77
  int64_t ret = 0;
×
UNCOV
78
  int32_t size = 0;
×
UNCOV
79
  int32_t total = 0;
×
UNCOV
80
  while (1) {
×
UNCOV
81
    if ((ret = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
×
UNCOV
82
      if (ret != 0) {
×
83
        code = TSDB_CODE_INVALID_MSG;
×
84
      }
UNCOV
85
      break;
×
86
    }
UNCOV
87
    total += INT_BYTES;
×
UNCOV
88
    size = htonl(size);
×
UNCOV
89
    TSDB_CHECK_CONDITION(size > 0, code, lino, END, TSDB_CODE_INVALID_MSG);
×
90

UNCOV
91
    pMemBuf = taosMemoryCalloc(1, size);
×
UNCOV
92
    TSDB_CHECK_NULL(pMemBuf, code, lino, END, terrno);
×
UNCOV
93
    TSDB_CHECK_CONDITION(taosReadFile(pFile, pMemBuf, size) == size, code, lino, END, TSDB_CODE_INVALID_MSG);
×
94

UNCOV
95
    total += size;
×
UNCOV
96
    STqOffset offset = {0};
×
UNCOV
97
    code = tqMetaDecodeOffsetInfo(&offset, pMemBuf, size);
×
UNCOV
98
    TSDB_CHECK_CODE(code, lino, END);
×
UNCOV
99
    pOffset = &offset;
×
UNCOV
100
    code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
×
UNCOV
101
    TSDB_CHECK_CODE(code, lino, END);
×
UNCOV
102
    pOffset = NULL;
×
103

UNCOV
104
    tqInfo("tq: offset restore from file to tdb, size:%d, hash size:%d subkey:%s", total, taosHashGetSize(pTq->pOffset),
×
105
           offset.subKey);
UNCOV
106
    taosMemoryFree(pMemBuf);
×
UNCOV
107
    pMemBuf = NULL;
×
108
  }
109

UNCOV
110
  code = tqCommitOffset(pTq);
×
UNCOV
111
  TSDB_CHECK_CODE(code, lino, END);
×
112

UNCOV
113
END:
×
UNCOV
114
  if (code != 0) {
×
115
    tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
×
116
  }
UNCOV
117
  (void)taosCloseFile(&pFile);
×
UNCOV
118
  taosMemoryFree(pMemBuf);
×
119

UNCOV
120
  tDeleteSTqOffset(pOffset);
×
UNCOV
121
  taosHashCancelIterate(pTq->pOffset, pIter);
×
122

UNCOV
123
  return code;
×
124
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc