• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3942

25 Apr 2025 11:21AM UTC coverage: 62.853% (+0.3%) from 62.507%
#3942

push

travis-ci

web-flow
docs: jdbc tmq supports database subscription. [TS-6222] (#30819)

* docs: jdbc tmq supports database subscription. [TS-6222]

* Update docs/zh/07-develop/07-tmq.md

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* Update 07-tmq.md

---------

Co-authored-by: haoranchen <haoran920c@163.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

156603 of 317531 branches covered (49.32%)

Branch coverage included in aggregate %.

241895 of 316485 relevant lines covered (76.43%)

6664240.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.1
/source/libs/sync/src/syncIndexMgr.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncIndexMgr.h"
18
#include "syncUtil.h"
19

20
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pNode) {
31,205✔
21
  SSyncIndexMgr *pIndexMgr = taosMemoryCalloc(1, sizeof(SSyncIndexMgr));
31,205!
22
  if (pIndexMgr == NULL) {
31,209!
23
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
24
    return NULL;
×
25
  }
26

27
  pIndexMgr->replicas = &pNode->replicasId;
31,209✔
28
  pIndexMgr->replicaNum = pNode->replicaNum;
31,209✔
29
  pIndexMgr->totalReplicaNum = pNode->totalReplicaNum;
31,209✔
30
  pIndexMgr->pNode = pNode;
31,209✔
31
  syncIndexMgrClear(pIndexMgr);
31,209✔
32

33
  return pIndexMgr;
31,209✔
34
}
35

36
void syncIndexMgrUpdate(SSyncIndexMgr *pIndexMgr, SSyncNode *pNode) {
620✔
37
  pIndexMgr->replicas = &pNode->replicasId;
620✔
38
  pIndexMgr->replicaNum = pNode->replicaNum;
620✔
39
  pIndexMgr->totalReplicaNum = pNode->totalReplicaNum;
620✔
40
  pIndexMgr->pNode = pNode;
620✔
41
  syncIndexMgrClear(pIndexMgr);
620✔
42
}
620✔
43

44
void syncIndexMgrDestroy(SSyncIndexMgr *pIndexMgr) {
31,198✔
45
  if (pIndexMgr != NULL) {
31,198!
46
    taosMemoryFree(pIndexMgr);
31,198!
47
  }
48
}
31,196✔
49

50
void syncIndexMgrClear(SSyncIndexMgr *pIndexMgr) {
31,827✔
51
  memset(pIndexMgr->index, 0, sizeof(pIndexMgr->index));
31,827✔
52
  memset(pIndexMgr->privateTerm, 0, sizeof(pIndexMgr->privateTerm));
31,827✔
53

54
  int64_t timeNow = taosGetTimestampMs();
31,829✔
55
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
80,967✔
56
    pIndexMgr->startTimeArr[i] = 0;
49,138✔
57
    pIndexMgr->recvTimeArr[i] = timeNow;
49,138✔
58
  }
59
}
31,829✔
60

61
void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncIndex index) {
2,911,370✔
62
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
3,940,952!
63
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
3,941,058✔
64
      (pIndexMgr->index)[i] = index;
2,911,600✔
65
      return;
2,911,600✔
66
    }
67
  }
68

69
  sError("vgId:%d, indexmgr set index:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, index,
×
70
         DID(pRaftId), CID(pRaftId));
71
}
72

73
void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr * pOldIndex, SRaftId *oldReplicasId){
×
74
  for(int j = 0; j < pOldIndex->totalReplicaNum; ++j){
×
75
    sDebug("old Index j:%d, index:%"PRId64, j, pOldIndex->index[j]);
×
76
  }
77
  
78
  for (int i = 0; i < pNewIndex->totalReplicaNum; ++i) {
×
79
    for(int j = 0; j < pOldIndex->totalReplicaNum; ++j){
×
80
      if (syncUtilSameId(/*(const SRaftId*)*/&((oldReplicasId[j])), &((*(pNewIndex->replicas))[i]))) {
×
81
        pNewIndex->index[i] = pOldIndex->index[j];
×
82
        pNewIndex->privateTerm[i] = pOldIndex->privateTerm[j];
×
83
        pNewIndex->startTimeArr[i] = pOldIndex->startTimeArr[j];
×
84
        pNewIndex->recvTimeArr[i] = pOldIndex->recvTimeArr[j];   
×
85
      }
86
    }
87
  }
88

89
  for (int i = 0; i < pNewIndex->totalReplicaNum; ++i){
×
90
    sDebug("new index i:%d, index:%"PRId64, i, pNewIndex->index[i]);
×
91
  }
92
}
×
93

94
SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) {
905,904✔
95
  for (int i = 0; i < pNode->totalReplicaNum; i++) {
1,890,899!
96
    if (syncUtilSameId(&pNode->replicasId[i], pRaftId)) {
1,890,908✔
97
      return pNode->logReplMgrs[i];
905,910✔
98
    }
99
  }
100

101
  terrno = TSDB_CODE_SYN_INVALID_ID;
×
102
  sError("vgId:%d, indexmgr get replmgr from dnode:%d cluster:%d failed", pNode->vgId, DID(pRaftId), CID(pRaftId));
2!
103
  return NULL;
2✔
104
}
105

106
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
515,109✔
107
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
1,075,877✔
108
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
1,075,874✔
109
      SyncIndex idx = (pIndexMgr->index)[i];
515,105✔
110
      return idx;
515,105✔
111
    }
112
  }
113

114
  terrno = TSDB_CODE_SYN_INVALID_ID;
3✔
115
  sError("vgId:%d, indexmgr get index from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
×
116
         CID(pRaftId));
117
  return SYNC_INDEX_INVALID;
×
118
}
119

120
void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t startTime) {
×
121
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
×
122
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
×
123
      (pIndexMgr->startTimeArr)[i] = startTime;
×
124
      return;
×
125
    }
126
  }
127

128
  sError("vgId:%d, indexmgr set start-time:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId,
×
129
         startTime, DID(pRaftId), CID(pRaftId));
130
}
131

132
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
×
133
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
×
134
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
×
135
      int64_t startTime = (pIndexMgr->startTimeArr)[i];
×
136
      return startTime;
×
137
    }
138
  }
139

140
  sError("vgId:%d, indexmgr get start-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
×
141
         CID(pRaftId));
142
  return TSDB_CODE_SYN_INVALID_ID;
×
143
  ;
144
}
145

146
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
102,775✔
147
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
198,412✔
148
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
198,405✔
149
      (pIndexMgr->recvTimeArr)[i] = recvTime;
102,772✔
150
      return;
102,772✔
151
    }
152
  }
153

154
  sError("vgId:%d, indexmgr set recv-time:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, recvTime,
7!
155
         DID(pRaftId), CID(pRaftId));
156
}
157

158
void syncIndexMgrSetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t sentTime) {
58,355✔
159
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
123,724!
160
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
123,724✔
161
      (pIndexMgr->sentTimeArr)[i] = sentTime;
58,355✔
162
      return;
58,355✔
163
    }
164
  }
165

166
  sError("vgId:%d, indexmgr set sent-time:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, sentTime,
×
167
         DID(pRaftId), CID(pRaftId));
168
}
169

170
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
830,632✔
171
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
1,509,138!
172
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
1,509,157✔
173
      int64_t recvTime = (pIndexMgr->recvTimeArr)[i];
830,646✔
174
      return recvTime;
830,646✔
175
    }
176
  }
177

178
  sError("vgId:%d, indexmgr get recv-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
×
179
         CID(pRaftId));
180
  return TSDB_CODE_SYN_INVALID_ID;
8✔
181
}
182

183
int64_t syncIndexMgrGetSentTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
182,240✔
184
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
289,661!
185
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
289,661✔
186
      int64_t recvTime = (pIndexMgr->sentTimeArr)[i];
182,241✔
187
      return recvTime;
182,241✔
188
    }
189
  }
190

191
  sError("vgId:%d, indexmgr get sent-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
×
192
         CID(pRaftId));
193
  return TSDB_CODE_SYN_INVALID_ID;
×
194
}
195

196
void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
×
197
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
×
198
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
×
199
      (pIndexMgr->privateTerm)[i] = term;
×
200
      return;
×
201
    }
202
  }
203

204
  sError("vgId:%d, indexmgr set term:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, term,
×
205
         DID(pRaftId), CID(pRaftId));
206
}
207

208
SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
×
209
  for (int i = 0; i < pIndexMgr->totalReplicaNum; ++i) {
×
210
    if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
×
211
      SyncTerm term = (pIndexMgr->privateTerm)[i];
×
212
      return term;
×
213
    }
214
  }
215

216
  sError("vgId:%d, indexmgr get term from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
×
217
         CID(pRaftId));
218
  return TSDB_CODE_SYN_INVALID_ID;
×
219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc