• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4800

16 Oct 2025 09:19AM UTC coverage: 53.935% (-7.1%) from 61.083%
#4800

push

travis-ci

web-flow
Merge b32e3a393 into a190048d5

134724 of 323629 branches covered (41.63%)

Branch coverage included in aggregate %.

184803 of 268802 relevant lines covered (68.75%)

69058627.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.33
/source/dnode/vnode/src/tq/tqOffset.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16

17
#include "tq.h"
18

19
int32_t tqBuildFName(char** data, const char* path, char* name) {
4,303,630✔
20
  int32_t code = 0;
4,303,630✔
21
  int32_t lino = 0;
4,303,630✔
22
  char*   fname = NULL;
4,303,630✔
23
  TSDB_CHECK_NULL(data, code, lino, END, TSDB_CODE_INVALID_MSG);
4,303,630!
24
  TSDB_CHECK_NULL(path, code, lino, END, TSDB_CODE_INVALID_MSG);
4,303,630!
25
  TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
4,303,630!
26
  int32_t len = strlen(path) + strlen(name) + 2;
4,303,630!
27
  fname = taosMemoryCalloc(1, len);
4,303,630!
28
  TSDB_CHECK_NULL(fname, code, lino, END, terrno);
4,305,503!
29
  (void)tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name);
4,305,503✔
30

31
  *data = fname;
4,306,284✔
32
  fname = NULL;
4,306,284✔
33

34
END:
4,306,284✔
35
  if (code != 0) {
4,306,284!
36
    tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
×
37
  }
38
  taosMemoryFree(fname);
4,306,284!
39
  return code;
4,306,284✔
40
}
41

42
int32_t tqCommitOffset(void* p) {
1,984,576✔
43
  STQ*    pTq = (STQ*)p;
1,984,576✔
44
  int32_t code = TDB_CODE_SUCCESS;
1,984,576✔
45
  void*   pIter = NULL;
1,984,576✔
46
  int32_t vgId = pTq->pVnode != NULL ? pTq->pVnode->config.vgId : -1;
1,984,576!
47
  while ((pIter = taosHashIterate(pTq->pOffset, pIter))) {
1,985,176✔
48
    STqOffset* offset = (STqOffset*)pIter;
600✔
49
    int32_t    ret = tqMetaSaveOffset(pTq, offset);
600✔
50
    if (ret != TDB_CODE_SUCCESS) {
600!
51
      code = ret;
×
52
      tqError("tq commit offset error subkey:%s, vgId:%d", offset->subKey, vgId);
×
53
    } else {
54
      if (offset->val.type == TMQ_OFFSET__LOG) {
600!
55
        tqInfo("tq commit offset success subkey:%s vgId:%d, offset(type:log) version:%" PRId64, offset->subKey, vgId,
600!
56
               offset->val.version);
57
      }
58
    }
59
  }
60
  return code;
1,984,576✔
61
}
62

63
int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
×
64
  int32_t    code = TDB_CODE_SUCCESS;
×
65
  int32_t    lino = 0;
×
66
  void*      pMemBuf = NULL;
×
67
  TdFilePtr  pFile = NULL;
×
68
  STqOffset* pOffset = NULL;
×
69
  void*      pIter = NULL;
×
70

71
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_MSG);
×
72
  TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
×
73

74
  pFile = taosOpenFile(name, TD_FILE_READ);
×
75
  TSDB_CHECK_NULL(pFile, code, lino, END, TDB_CODE_SUCCESS);
×
76

77
  int64_t ret = 0;
×
78
  int32_t size = 0;
×
79
  int32_t total = 0;
×
80
  while (1) {
×
81
    if ((ret = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
×
82
      if (ret != 0) {
×
83
        code = TSDB_CODE_INVALID_MSG;
×
84
      }
85
      break;
×
86
    }
87
    total += INT_BYTES;
×
88
    size = htonl(size);
×
89
    TSDB_CHECK_CONDITION(size > 0, code, lino, END, TSDB_CODE_INVALID_MSG);
×
90

91
    pMemBuf = taosMemoryCalloc(1, size);
×
92
    TSDB_CHECK_NULL(pMemBuf, code, lino, END, terrno);
×
93
    TSDB_CHECK_CONDITION(taosReadFile(pFile, pMemBuf, size) == size, code, lino, END, TSDB_CODE_INVALID_MSG);
×
94

95
    total += size;
×
96
    STqOffset offset = {0};
×
97
    code = tqMetaDecodeOffsetInfo(&offset, pMemBuf, size);
×
98
    TSDB_CHECK_CODE(code, lino, END);
×
99
    pOffset = &offset;
×
100
    code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
×
101
    TSDB_CHECK_CODE(code, lino, END);
×
102
    pOffset = NULL;
×
103

104
    tqInfo("tq: offset restore from file to tdb, size:%d, hash size:%d subkey:%s", total, taosHashGetSize(pTq->pOffset),
×
105
           offset.subKey);
106
    taosMemoryFree(pMemBuf);
×
107
    pMemBuf = NULL;
×
108
  }
109

110
  code = tqCommitOffset(pTq);
×
111
  TSDB_CHECK_CODE(code, lino, END);
×
112

113
END:
×
114
  if (code != 0) {
×
115
    tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
×
116
  }
117
  (void)taosCloseFile(&pFile);
×
118
  taosMemoryFree(pMemBuf);
×
119

120
  tDeleteSTqOffset(pOffset);
×
121
  taosHashCancelIterate(pTq->pOffset, pIter);
×
122

123
  return code;
×
124
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc