• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4106

19 May 2025 07:15AM UTC coverage: 62.857% (-0.2%) from 63.042%
#4106

push

travis-ci

GitHub
Merge pull request #31115 from taosdata/merge/mainto3.0

156749 of 318088 branches covered (49.28%)

Branch coverage included in aggregate %.

242535 of 317143 relevant lines covered (76.47%)

18746393.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.67
/source/dnode/vnode/src/tq/tqOffset.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16

17
#include "tq.h"
18

19
int32_t tqBuildFName(char** data, const char* path, char* name) {
44,442✔
20
  int32_t code = 0;
44,442✔
21
  int32_t lino = 0;
44,442✔
22
  char*   fname = NULL;
44,442✔
23
  TSDB_CHECK_NULL(data, code, lino, END, TSDB_CODE_INVALID_MSG);
44,442!
24
  TSDB_CHECK_NULL(path, code, lino, END, TSDB_CODE_INVALID_MSG);
44,442!
25
  TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
44,442!
26
  int32_t len = strlen(path) + strlen(name) + 2;
44,442✔
27
  fname = taosMemoryCalloc(1, len);
44,442!
28
  TSDB_CHECK_NULL(fname, code, lino, END, terrno);
44,459!
29
  (void)tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name);
44,459✔
30

31
  *data = fname;
44,457✔
32
  fname = NULL;
44,457✔
33

34
END:
44,457✔
35
  if (code != 0) {
44,457!
36
    tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
×
37
  }
38
  taosMemoryFree(fname);
44,459!
39
  return code;
44,458✔
40
}
41

42
int32_t tqCommitOffset(void* p) {
32,855✔
43
  STQ*    pTq = (STQ*)p;
32,855✔
44
  int32_t code = TDB_CODE_SUCCESS;
32,855✔
45
  void*   pIter = NULL;
32,855✔
46
  int32_t vgId = pTq->pVnode != NULL ? pTq->pVnode->config.vgId : -1;
32,855!
47
  while ((pIter = taosHashIterate(pTq->pOffset, pIter))) {
33,171✔
48
    STqOffset* offset = (STqOffset*)pIter;
316✔
49
    int32_t    ret = tqMetaSaveOffset(pTq, offset);
316✔
50
    if (ret != TDB_CODE_SUCCESS) {
316!
51
      code = ret;
×
52
      tqError("tq commit offset error subkey:%s, vgId:%d", offset->subKey, vgId);
×
53
    } else {
54
      if (offset->val.type == TMQ_OFFSET__LOG) {
316✔
55
        tqInfo("tq commit offset success subkey:%s vgId:%d, offset(type:log) version:%" PRId64, offset->subKey, vgId,
304!
56
               offset->val.version);
57
      }
58
    }
59
  }
60
  return code;
32,856✔
61
}
62

63
int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
1✔
64
  int32_t    code = TDB_CODE_SUCCESS;
1✔
65
  int32_t    lino = 0;
1✔
66
  void*      pMemBuf = NULL;
1✔
67
  TdFilePtr  pFile = NULL;
1✔
68
  STqOffset* pOffset = NULL;
1✔
69
  void*      pIter = NULL;
1✔
70

71
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_MSG);
1!
72
  TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
1!
73

74
  pFile = taosOpenFile(name, TD_FILE_READ);
1✔
75
  TSDB_CHECK_NULL(pFile, code, lino, END, TDB_CODE_SUCCESS);
1!
76

77
  int64_t ret = 0;
1✔
78
  int32_t size = 0;
1✔
79
  int32_t total = 0;
1✔
80
  while (1) {
1✔
81
    if ((ret = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
2✔
82
      if (ret != 0) {
1!
83
        code = TSDB_CODE_INVALID_MSG;
×
84
      }
85
      break;
1✔
86
    }
87
    total += INT_BYTES;
1✔
88
    size = htonl(size);
1✔
89
    TSDB_CHECK_CONDITION(size > 0, code, lino, END, TSDB_CODE_INVALID_MSG);
1!
90

91
    pMemBuf = taosMemoryCalloc(1, size);
1!
92
    TSDB_CHECK_NULL(pMemBuf, code, lino, END, terrno);
1!
93
    TSDB_CHECK_CONDITION(taosReadFile(pFile, pMemBuf, size) == size, code, lino, END, TSDB_CODE_INVALID_MSG);
1!
94

95
    total += size;
1✔
96
    STqOffset offset = {0};
1✔
97
    code = tqMetaDecodeOffsetInfo(&offset, pMemBuf, size);
1✔
98
    TSDB_CHECK_CODE(code, lino, END);
1!
99
    pOffset = &offset;
1✔
100
    code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
1✔
101
    TSDB_CHECK_CODE(code, lino, END);
1!
102
    pOffset = NULL;
1✔
103

104
    tqInfo("tq: offset restore from file to tdb, size:%d, hash size:%d subkey:%s", total, taosHashGetSize(pTq->pOffset),
1!
105
           offset.subKey);
106
    taosMemoryFree(pMemBuf);
1!
107
    pMemBuf = NULL;
1✔
108
  }
109

110
  code = tqCommitOffset(pTq);
1✔
111
  TSDB_CHECK_CODE(code, lino, END);
1!
112

113
END:
1✔
114
  if (code != 0) {
1!
115
    tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
×
116
  }
117
  (void)taosCloseFile(&pFile);
1✔
118
  taosMemoryFree(pMemBuf);
1!
119

120
  tDeleteSTqOffset(pOffset);
1✔
121
  taosHashCancelIterate(pTq->pOffset, pIter);
1✔
122

123
  return code;
1✔
124
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc