• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4131

20 May 2025 07:22AM UTC coverage: 63.096% (+0.7%) from 62.384%
#4131

push

travis-ci

web-flow
docs(datain): add topic meta options docs in tmq (#31147)

157751 of 318088 branches covered (49.59%)

Branch coverage included in aggregate %.

243052 of 317143 relevant lines covered (76.64%)

18743283.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.71
/source/libs/sync/src/syncEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncEnv.h"
18
#include "syncUtil.h"
19
#include "tref.h"
20

21
static SSyncEnv gSyncEnv = {0};
22
static int32_t  gNodeRefId = -1;
23
static int32_t  gHbDataRefId = -1;
24

25
SSyncEnv *syncEnv() { return &gSyncEnv; }
668,309✔
26

27
bool syncIsInit() { return atomic_load_8(&gSyncEnv.isStart); }
3,600,279✔
28

29
int32_t syncInit() {
4,898✔
30
  if (syncIsInit()) return 0;
4,898✔
31

32
  uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF);
2,737✔
33
  taosSeedRand(seed);
2,737✔
34

35
  (void)memset(&gSyncEnv, 0, sizeof(SSyncEnv));
2,737✔
36
  gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
2,737✔
37

38
  gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose);
2,737✔
39
  if (gNodeRefId < 0) {
2,737!
40
    sError("failed to init node rset");
×
41
    syncCleanUp();
×
42
    return TSDB_CODE_SYN_WRONG_REF;
×
43
  }
44
  sDebug("sync node rset is open, rsetId:%d", gNodeRefId);
2,737✔
45

46
  gHbDataRefId = taosOpenRef(200, (RefFp)syncHbTimerDataFree);
2,737✔
47
  if (gHbDataRefId < 0) {
2,737!
48
    sError("failed to init hbdata rset");
×
49
    syncCleanUp();
×
50
    return TSDB_CODE_SYN_WRONG_REF;
×
51
  }
52

53
  sDebug("sync hbdata rset is open, rsetId:%d", gHbDataRefId);
2,737✔
54

55
  atomic_store_8(&gSyncEnv.isStart, 1);
2,737✔
56
  return 0;
2,737✔
57
}
58

59
void syncCleanUp() {
2,736✔
60
  atomic_store_8(&gSyncEnv.isStart, 0);
2,736✔
61
  taosTmrCleanUp(gSyncEnv.pTimerManager);
2,736✔
62
  (void)memset(&gSyncEnv, 0, sizeof(SSyncEnv));
2,736✔
63

64
  if (gNodeRefId != -1) {
2,736!
65
    sDebug("sync node rset is closed, rsetId:%d", gNodeRefId);
2,736✔
66
    taosCloseRef(gNodeRefId);
2,736✔
67
    gNodeRefId = -1;
2,736✔
68
  }
69

70
  if (gHbDataRefId != -1) {
2,736!
71
    sDebug("sync hbdata rset is closed, rsetId:%d", gHbDataRefId);
2,736✔
72
    taosCloseRef(gHbDataRefId);
2,736✔
73
    gHbDataRefId = -1;
2,736✔
74
  }
75
}
2,736✔
76

77
int64_t syncNodeAdd(SSyncNode *pNode) {
16,936✔
78
  pNode->rid = taosAddRef(gNodeRefId, pNode);
16,936✔
79
  if (pNode->rid < 0) {
16,936!
80
    return terrno = TSDB_CODE_SYN_WRONG_REF;
×
81
  }
82

83
  sDebug("vgId:%d, sync node refId:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
16,936✔
84
  return pNode->rid;
16,936✔
85
}
86

87
void syncNodeRemove(int64_t rid) {
16,935✔
88
  sDebug("sync node refId:%" PRId64 " is removed from rsetId:%d", rid, gNodeRefId);
16,935✔
89
  if (rid > 0) {
16,935!
90
    int32_t code = 0;
16,935✔
91
    if ((code = taosRemoveRef(gNodeRefId, rid)) != 0)
16,935!
92
      sError("failed to remove sync node from refId:%" PRId64 ", rsetId:%d, since %s", rid, gNodeRefId,
×
93
             tstrerror(code));
94
  }
95
}
16,934✔
96

97
SSyncNode *syncNodeAcquire(int64_t rid) {
34,155,885✔
98
  SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
34,155,885✔
99
  if (pNode == NULL) {
34,167,273!
100
    sError("failed to acquire sync node from refId:%" PRId64 ", rsetId:%d", rid, gNodeRefId);
×
101
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
102
  }
103

104
  return pNode;
34,167,588✔
105
}
106

107
void syncNodeRelease(SSyncNode *pNode) {
34,163,091✔
108
  if (pNode) {
34,163,091!
109
    int32_t code = 0;
34,163,888✔
110
    if ((code = taosReleaseRef(gNodeRefId, pNode->rid)) != 0)
34,163,888!
111
      sError("failed to release sync node from refId:%" PRId64 ", rsetId:%d, since %s", pNode->rid, gNodeRefId,
×
112
             tstrerror(code));
113
  }
114
}
34,159,689✔
115

116
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
2,915✔
117
  pData->rid = taosAddRef(gHbDataRefId, pData);
2,915✔
118
  if (pData->rid < 0) {
2,915!
119
    return terrno = TSDB_CODE_SYN_WRONG_REF;
×
120
  }
121

122
  return pData->rid;
2,915✔
123
}
124

125
void syncHbTimerDataRemove(int64_t rid) {
30,637✔
126
  if (rid > 0) {
30,637✔
127
    int32_t code = 0;
2,910✔
128
    if ((code = taosRemoveRef(gHbDataRefId, rid)) != 0)
2,910!
129
      sError("failed to remove hbdata from refId:%" PRId64 ", rsetId:%d, since %s", rid, gHbDataRefId, tstrerror(code));
×
130
  }
131
}
30,637✔
132

133
SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
53,447✔
134
  SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid);
53,447✔
135
  if (pData == NULL && rid > 0) {
53,447!
136
    sInfo("failed to acquire hbdata from refId:%" PRId64 ", rsetId:%d", rid, gHbDataRefId);
×
137
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
138
  }
139

140
  return pData;
53,447✔
141
}
142

143
void syncHbTimerDataRelease(SSyncHbTimerData *pData) {
50,532✔
144
  if (pData) {
50,532!
145
    int32_t code = 0;
50,532✔
146
    if ((code = taosReleaseRef(gHbDataRefId, pData->rid)) != 0) {
50,532!
147
      sError("failed to release hbdata from refId:%" PRId64 ", rsetId:%d, since %s", pData->rid, gHbDataRefId,
×
148
             tstrerror(code));
149
    }
150
  }
151
}
50,532✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc