• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4872

04 Dec 2025 01:55AM UTC coverage: 64.678% (+0.02%) from 64.654%
#4872

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

880 of 2219 new or added lines in 36 files covered. (39.66%)

6146 existing lines in 122 files now uncovered.

159679 of 246882 relevant lines covered (64.68%)

110947965.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.48
/source/dnode/mnode/impl/src/mndArbGroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndArbGroup.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "mndSync.h"
24

25
#define ARBGROUP_VER_NUMBER   1
26
#define ARBGROUP_RESERVE_SIZE 51
27

28
static SHashObj *arbUpdateHash = NULL;
29

30
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
31
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
32
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
33

34
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
35
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
36
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
37

38
static int32_t mndArbPutUpdateArbIntoWQ(SMnode *pMnode, SArbGroup *pNewGroup);
39
static int32_t mndArbPutBatchUpdateIntoWQ(SMnode *pMnode, SArray *newGroupArray);
40

41
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
42
static int32_t mndArbProcessTimer(SRpcMsg *pReq);
43
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
44
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
45
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
46
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
47
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
48
static void    mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
49
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq);
50

51
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
214,857✔
52
  if (token1 == NULL || token2 == NULL) return -1;
214,857✔
53
  if (strlen(token1) == 0 || strlen(token2) == 0) return -1;
214,857✔
54
  return strncmp(token1, token2, TSDB_ARB_TOKEN_SIZE);
161,613✔
55
}
56

57
int32_t mndInitArbGroup(SMnode *pMnode) {
489,467✔
58
  int32_t   code = 0;
489,467✔
59
  SSdbTable table = {
489,467✔
60
      .sdbType = SDB_ARBGROUP,
61
      .keyType = SDB_KEY_INT32,
62
      .encodeFp = (SdbEncodeFp)mndArbGroupActionEncode,
63
      .decodeFp = (SdbDecodeFp)mndArbGroupActionDecode,
64
      .insertFp = (SdbInsertFp)mndArbGroupActionInsert,
65
      .updateFp = (SdbUpdateFp)mndArbGroupActionUpdate,
66
      .deleteFp = (SdbDeleteFp)mndArbGroupActionDelete,
67
  };
68

69
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
489,467✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndArbProcessTimer);
489,467✔
71
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
489,467✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
489,467✔
73
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
489,467✔
74
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
489,467✔
75
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_ASSIGN_LEADER, mndProcessAssignLeaderMsg);
489,467✔
76

77
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
489,467✔
78
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
489,467✔
79

80
  arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
489,467✔
81
  if (arbUpdateHash == NULL) {
489,467✔
82
    code = terrno;
×
83
    TAOS_RETURN(code);
×
84
  }
85

86
  return sdbSetTable(pMnode->pSdb, table);
489,467✔
87
}
88

89
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
488,699✔
90

91
static SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
145,386✔
92
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
145,386✔
93
  if (pGroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
145,386✔
94
    terrno = TSDB_CODE_MND_ARBGROUP_NOT_EXIST;
×
95
  }
96
  return pGroup;
145,386✔
97
}
98

99
static void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
145,386✔
100
  SSdb *pSdb = pMnode->pSdb;
145,386✔
101
  sdbRelease(pSdb, pGroup);
145,386✔
102
}
145,386✔
103

104
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
12,006✔
105
  if (pVgObj->replica != 2) {
12,006✔
106
    TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
107
  }
108
  (void)memset(outGroup, 0, sizeof(SArbGroup));
12,006✔
109
  outGroup->dbUid = pVgObj->dbUid;
12,006✔
110
  outGroup->vgId = pVgObj->vgId;
12,006✔
111
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
36,018✔
112
    SArbGroupMember *pMember = &outGroup->members[i];
24,012✔
113
    pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
24,012✔
114
  }
115

116
  TAOS_RETURN(TSDB_CODE_SUCCESS);
12,006✔
117
}
118

119
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
65,623✔
120
  int32_t code = 0;
65,623✔
121
  int32_t lino = 0;
65,623✔
122
  terrno = TSDB_CODE_OUT_OF_MEMORY;
65,623✔
123

124
  int32_t  size = sizeof(SArbGroup) + ARBGROUP_RESERVE_SIZE;
65,623✔
125
  SSdbRaw *pRaw = sdbAllocRaw(SDB_ARBGROUP, ARBGROUP_VER_NUMBER, size);
65,623✔
126
  if (pRaw == NULL) goto _OVER;
65,623✔
127

128
  int32_t dataPos = 0;
65,623✔
129
  SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
65,623✔
130
  SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
65,623✔
131
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
196,869✔
132
    SArbGroupMember *pMember = &pGroup->members[i];
131,246✔
133
    SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
131,246✔
134
    SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
131,246✔
135
  }
136
  SDB_SET_INT8(pRaw, dataPos, pGroup->isSync, _OVER)
65,623✔
137

138
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
65,623✔
139
  SDB_SET_INT32(pRaw, dataPos, pLeader->assignedDnodeId, _OVER)
65,623✔
140
  SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
65,623✔
141
  SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
65,623✔
142
  SDB_SET_INT8(pRaw, dataPos, pLeader->assignAcked, _OVER)
65,623✔
143
  SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
65,623✔
144
  SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
65,623✔
145

146
  SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
65,623✔
147

148
  terrno = 0;
65,623✔
149

150
_OVER:
65,623✔
151
  if (terrno != 0) {
65,623✔
152
    mError("arbgroup:%d, failed to encode to raw:%p since %s", pGroup->vgId, pRaw, terrstr());
×
153
    sdbFreeRaw(pRaw);
×
154
    return NULL;
×
155
  }
156

157
  mTrace("arbgroup:%d, encode to raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
65,623✔
158
  return pRaw;
65,623✔
159
}
160

161
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
22,616✔
162
  int32_t code = 0;
22,616✔
163
  int32_t lino = 0;
22,616✔
164
  terrno = TSDB_CODE_OUT_OF_MEMORY;
22,616✔
165
  SSdbRow   *pRow = NULL;
22,616✔
166
  SArbGroup *pGroup = NULL;
22,616✔
167

168
  int8_t sver = 0;
22,616✔
169
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
22,616✔
170

171
  if (sver != ARBGROUP_VER_NUMBER) {
22,616✔
172
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
173
    goto _OVER;
×
174
  }
175

176
  pRow = sdbAllocRow(sizeof(SArbGroup));
22,616✔
177
  if (pRow == NULL) goto _OVER;
22,616✔
178

179
  pGroup = sdbGetRowObj(pRow);
22,616✔
180
  if (pGroup == NULL) goto _OVER;
22,616✔
181

182
  int32_t dataPos = 0;
22,616✔
183
  SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
22,616✔
184
  SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
22,616✔
185
  int64_t nowMs = taosGetTimestampMs();
22,616✔
186
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
67,848✔
187
    SArbGroupMember *pMember = &pGroup->members[i];
45,232✔
188
    SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
45,232✔
189
    SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
45,232✔
190

191
    pMember->state.nextHbSeq = 0;
45,232✔
192
    pMember->state.responsedHbSeq = -1;
45,232✔
193
    pMember->state.lastHbMs = nowMs;
45,232✔
194
  }
195
  SDB_GET_INT8(pRaw, dataPos, &pGroup->isSync, _OVER)
22,616✔
196

197
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
22,616✔
198
  SDB_GET_INT32(pRaw, dataPos, &pLeader->assignedDnodeId, _OVER)
22,616✔
199
  SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
22,616✔
200
  SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
22,616✔
201
  SDB_GET_INT8(pRaw, dataPos, &pLeader->assignAcked, _OVER)
22,616✔
202
  SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
22,616✔
203
  SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
22,616✔
204

205
  pGroup->mutexInited = false;
22,616✔
206

207
  SDB_GET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
22,616✔
208

209
  terrno = 0;
22,616✔
210

211
_OVER:
22,616✔
212
  if (terrno != 0) {
22,616✔
213
    mError("arbgroup:%d, failed to decode from raw:%p since %s", pGroup == NULL ? 0 : pGroup->vgId, pRaw, terrstr());
×
214
    taosMemoryFreeClear(pRow);
×
215
    return NULL;
×
216
  }
217

218
  mTrace("arbgroup:%d, decode from raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
22,616✔
219
  return pRow;
22,616✔
220
}
221

222
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
6,019✔
223
  mTrace("arbgroup:%d, perform insert action, row:%p", pGroup->vgId, pGroup);
6,019✔
224
  if (!pGroup->mutexInited && (taosThreadMutexInit(&pGroup->mutex, NULL) == 0)) {
6,019✔
225
    pGroup->mutexInited = true;
6,019✔
226
  }
227

228
  return pGroup->mutexInited ? 0 : -1;
6,019✔
229
}
230

231
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
22,616✔
232
  mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
22,616✔
233
  if (pGroup->mutexInited) {
22,616✔
234
    (void)taosThreadMutexDestroy(&pGroup->mutex);
6,019✔
235
    pGroup->mutexInited = false;
6,019✔
236
  }
237
  return 0;
22,616✔
238
}
239

240
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
12,073✔
241
  mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
12,073✔
242
  (void)taosThreadMutexLock(&pOld->mutex);
12,073✔
243

244
  if (pOld->version != pNew->version) {
12,073✔
245
    mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
×
246
          " new version:%" PRId64,
247
          pOld->vgId, pOld, pNew, pOld->version, pNew->version);
248
    goto _OVER;
×
249
  }
250

251
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
36,219✔
252
    tstrncpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
24,146✔
253
  }
254
  pOld->isSync = pNew->isSync;
12,073✔
255
  pOld->assignedLeader.assignedDnodeId = pNew->assignedLeader.assignedDnodeId;
12,073✔
256
  tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
12,073✔
257
  pOld->assignedLeader.assignAcked = pNew->assignedLeader.assignAcked;
12,073✔
258
  pOld->version++;
12,073✔
259
  pOld->code = pNew->code;
12,073✔
260
  pOld->updateTimeMs = pNew->updateTimeMs;
12,073✔
261

262
  mInfo(
12,073✔
263
      "arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
264
      "as-token:%s, as-acked:%d, version:%" PRId64,
265
      pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
266
      pOld->assignedLeader.assignedDnodeId, pOld->assignedLeader.token, pOld->assignedLeader.assignAcked,
267
      pOld->version);
268

269
_OVER:
×
270
  (void)taosThreadMutexUnlock(&pOld->mutex);
12,073✔
271

272
  if (mndIsLeader(pSdb->pMnode)) {
12,073✔
273
    mInfo("arbgroup:%d, remove from arb Update Hash", pOld->vgId);
12,073✔
274
    if (taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)) != 0) {
12,073✔
275
      mError("arbgroup:%d, failed to remove from arb Update Hash", pOld->vgId);
4,524✔
276
    }
277
  }
278
  return 0;
12,073✔
279
}
280

281
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
282
  int32_t  code = 0;
×
283
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
284
  if (pRedoRaw == NULL) {
×
285
    code = terrno;
×
286
    TAOS_RETURN(code);
×
287
  }
288
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
289
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
×
290
  return 0;
×
291
}
292

293
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
6,003✔
294
  int32_t  code = 0;
6,003✔
295
  SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
6,003✔
296
  if (pUndoRaw == NULL) {
6,003✔
297
    code = terrno;
×
298
    TAOS_RETURN(code);
×
299
  }
300
  if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
6,003✔
301
  if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
6,003✔
302
  return 0;
6,003✔
303
}
304

305
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
49,077✔
306
  int32_t  code = 0;
49,077✔
307
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
49,077✔
308
  if (pCommitRaw == NULL) {
49,077✔
309
    code = terrno;
×
310
    TAOS_RETURN(code);
×
311
  }
312
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
49,077✔
313
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
49,077✔
314
  return 0;
49,077✔
315
}
316

317
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
4,524✔
318
  int32_t  code = 0;
4,524✔
319
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
4,524✔
320
  if (pRedoRaw == NULL) {
4,524✔
321
    code = terrno;
×
322
    TAOS_RETURN(code);
×
323
  }
324
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
4,524✔
325
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
4,524✔
326
  return 0;
4,524✔
327
}
328

329
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
330
  int32_t  code = 0;
×
331
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
332
  if (pRedoRaw == NULL) {
×
333
    code = terrno;
×
334
    TAOS_RETURN(code);
×
335
  }
336
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
337
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
338
  return 0;
×
339
}
340

341
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
4,524✔
342
  int32_t  code = 0;
4,524✔
343
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
4,524✔
344
  if (pCommitRaw == NULL) {
4,524✔
345
    code = terrno;
×
346
    TAOS_RETURN(code);
×
347
  }
348
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
4,524✔
349
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
4,524✔
350
  return 0;
4,524✔
351
}
352

353
static void *mndBuildArbHeartBeatReq(int32_t *pContLen, char *arbToken, int32_t dnodeId, int64_t arbTerm,
111,722✔
354
                                     SArray *hbMembers) {
355
  SVArbHeartBeatReq req = {0};
111,722✔
356
  req.dnodeId = dnodeId;
111,722✔
357
  req.arbToken = arbToken;
111,722✔
358
  req.arbTerm = arbTerm;
111,722✔
359
  req.hbMembers = hbMembers;
111,722✔
360

361
  int32_t contLen = tSerializeSVArbHeartBeatReq(NULL, 0, &req);
111,722✔
362
  if (contLen <= 0) return NULL;
111,722✔
363

364
  void *pReq = rpcMallocCont(contLen);
111,722✔
365
  if (pReq == NULL) return NULL;
111,722✔
366

367
  if (tSerializeSVArbHeartBeatReq(pReq, contLen, &req) <= 0) {
111,722✔
368
    rpcFreeCont(pReq);
×
369
    return NULL;
×
370
  }
371
  *pContLen = contLen;
111,722✔
372
  return pReq;
111,722✔
373
}
374

375
static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t arbTerm, SArray *hbMembers) {
111,722✔
376
  int32_t contLen = 0;
111,722✔
377
  void   *pHead = mndBuildArbHeartBeatReq(&contLen, arbToken, pDnode->id, arbTerm, hbMembers);
111,722✔
378
  if (pHead == NULL) {
111,722✔
379
    mError("arbgroup:0, dnodeId:%d, failed to build arb-hb request", pDnode->id);
×
380
    return -1;
×
381
  }
382
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_HEARTBEAT, .pCont = pHead, .contLen = contLen};
111,722✔
383

384
  SEpSet epSet = mndGetDnodeEpset(pDnode);
111,722✔
385
  if (epSet.numOfEps == 0) {
111,722✔
386
    mError("arbgroup:0, dnodeId:%d, failed to send arb-hb request to dnode since no epSet found", pDnode->id);
×
387
    rpcFreeCont(pHead);
×
388
    return -1;
×
389
  }
390

391
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
111,722✔
392
  if (code != 0) {
111,722✔
393
    mError("arbgroup:0, dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code);
×
394
  } else {
395
    if (tsSyncLogHeartbeat) {
111,722✔
396
      mInfo("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id);
×
397
    } else {
398
      mTrace("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id);
111,722✔
399
    }
400
  }
401
  return code;
111,722✔
402
}
403

404
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
10,911,106✔
405
  int32_t    code = 0;
10,911,106✔
406
  SMnode    *pMnode = pReq->info.node;
10,911,106✔
407
  SSdb      *pSdb = pMnode->pSdb;
10,911,106✔
408
  SArbGroup *pArbGroup = NULL;
10,911,106✔
409
  void      *pIter = NULL;
10,911,106✔
410

411
  SHashObj *pDnodeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
10,911,106✔
412

413
  // collect member of same dnode
414
  while (1) {
415
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
10,976,015✔
416
    if (pIter == NULL) break;
10,976,015✔
417

418
    (void)taosThreadMutexLock(&pArbGroup->mutex);
64,909✔
419

420
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
194,727✔
421
      SArbGroupMember *pMember = &pArbGroup->members[i];
129,818✔
422
      int32_t          dnodeId = pMember->info.dnodeId;
129,818✔
423
      void            *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
129,818✔
424
      SArray          *hbMembers = NULL;
129,818✔
425
      if (pObj) {
129,818✔
426
        hbMembers = *(SArray **)pObj;
18,096✔
427
      } else {
428
        hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
111,722✔
429
        if (taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES) != 0) {
111,722✔
430
          mError("arbgroup:0, dnodeId:%d, failed to push hb member inty]o hash, but conitnue next at this timer round",
×
431
                 dnodeId);
432
        }
433
      }
434
      SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
129,818✔
435
      if (taosArrayPush(hbMembers, &reqMember) == NULL) {
259,636✔
436
        mError("arbgroup:0, dnodeId:%d, failed to push hb member, but conitnue next at this timer round", dnodeId);
×
437
      }
438
    }
439

440
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
64,909✔
441
    sdbRelease(pSdb, pArbGroup);
64,909✔
442
  }
443

444
  char arbToken[TSDB_ARB_TOKEN_SIZE];
10,894,852✔
445
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
10,911,106✔
446
    mError("arbgroup:0, failed to get arb token for arb-hb timer");
×
447
    pIter = taosHashIterate(pDnodeHash, NULL);
×
448
    while (pIter) {
×
449
      SArray *hbMembers = *(SArray **)pIter;
×
450
      taosArrayDestroy(hbMembers);
×
451
      pIter = taosHashIterate(pDnodeHash, pIter);
×
452
    }
453
    taosHashCleanup(pDnodeHash);
×
454
    TAOS_RETURN(code);
×
455
  }
456

457
  int64_t nowMs = taosGetTimestampMs();
10,911,106✔
458

459
  pIter = NULL;
10,911,106✔
460
  while (1) {
111,722✔
461
    pIter = taosHashIterate(pDnodeHash, pIter);
11,022,828✔
462
    if (pIter == NULL) break;
11,022,828✔
463

464
    int32_t dnodeId = *(int32_t *)taosHashGetKey(pIter, NULL);
111,722✔
465
    SArray *hbMembers = *(SArray **)pIter;
111,722✔
466

467
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
111,722✔
468
    if (pDnode == NULL) {
111,722✔
469
      mError("arbgroup:0, dnodeId:%d, timer failed to acquire dnode", dnodeId);
×
470
      taosArrayDestroy(hbMembers);
×
471
      continue;
×
472
    }
473

474
    int64_t mndTerm = mndGetTerm(pMnode);
111,722✔
475

476
    if (mndIsDnodeOnline(pDnode, nowMs)) {
111,722✔
477
      int32_t sendCode = mndSendArbHeartBeatReq(pDnode, arbToken, mndTerm, hbMembers);
111,722✔
478
      if (TSDB_CODE_SUCCESS != sendCode) {
111,722✔
479
        mError("arbgroup:0, dnodeId:%d, timer failed to send arb-hb request", dnodeId);
×
480
      }
481
    }
482

483
    mndReleaseDnode(pMnode, pDnode);
111,722✔
484
    taosArrayDestroy(hbMembers);
111,722✔
485
  }
486
  taosHashCleanup(pDnodeHash);
10,911,106✔
487

488
  return 0;
10,911,106✔
489
}
490

491
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
43,467✔
492
                                     char *member0Token, char *member1Token) {
493
  SVArbCheckSyncReq req = {0};
43,467✔
494
  req.arbToken = arbToken;
43,467✔
495
  req.arbTerm = arbTerm;
43,467✔
496
  req.member0Token = member0Token;
43,467✔
497
  req.member1Token = member1Token;
43,467✔
498

499
  int32_t reqLen = tSerializeSVArbCheckSyncReq(NULL, 0, &req);
43,467✔
500
  int32_t contLen = reqLen + sizeof(SMsgHead);
43,467✔
501

502
  if (contLen <= 0) return NULL;
43,467✔
503
  SMsgHead *pHead = rpcMallocCont(contLen);
43,467✔
504
  if (pHead == NULL) return NULL;
43,467✔
505

506
  pHead->contLen = htonl(contLen);
43,467✔
507
  pHead->vgId = htonl(vgId);
43,467✔
508
  if (tSerializeSVArbCheckSyncReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
43,467✔
509
    rpcFreeCont(pHead);
×
510
    return NULL;
×
511
  }
512
  *pContLen = contLen;
43,467✔
513
  return pHead;
43,467✔
514
}
515

516
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
43,467✔
517
                                      char *member1Token) {
518
  int32_t code = 0;
43,467✔
519
  int32_t contLen = 0;
43,467✔
520
  void   *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
43,467✔
521
  if (!pHead) {
43,467✔
522
    mError("arbgroup:%d, failed to build check-sync request", vgId);
×
523
    return -1;
×
524
  }
525
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_CHECK_SYNC, .pCont = pHead, .contLen = contLen};
43,467✔
526
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
43,467✔
527
  TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
43,467✔
528
  
529
  SEpSet epSet = mndGetVgroupEpsetById(pMnode, vgId);
43,467✔
530
  if (epSet.numOfEps == 0) {
43,467✔
531
    mError("arbgroup:%d, failed to send check-sync request since no epSet found", vgId);
×
532
    rpcFreeCont(pHead);
×
533
    code = -1;
×
534
    if (terrno != 0) code = terrno;
×
535
    TAOS_RETURN(code);
×
536
  }
537

538
  code = tmsgSendReq(&epSet, &rpcMsg);
43,467✔
539
  if (code != 0) {
43,467✔
540
    mError("arbgroup:%d, failed to send check-sync request since 0x%x", vgId, code);
×
541
  } else {
542
    mDebug("arbgroup:%d, send check-sync request", vgId);
43,467✔
543
  }
544
  return code;
43,467✔
545
}
546

547
static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) {
88,524✔
548
  SArbGroupMember *pArbMember = &pArbGroup->members[index];
88,524✔
549
  return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutMs);
88,524✔
550
}
551

UNCOV
552
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
×
553
                                             char *memberToken, bool force) {
UNCOV
554
  SVArbSetAssignedLeaderReq req = {0};
×
UNCOV
555
  req.arbToken = arbToken;
×
UNCOV
556
  req.arbTerm = arbTerm;
×
UNCOV
557
  req.memberToken = memberToken;
×
UNCOV
558
  if (force) req.force = 1;
×
559

UNCOV
560
  int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
×
UNCOV
561
  int32_t contLen = reqLen + sizeof(SMsgHead);
×
562

UNCOV
563
  if (contLen <= 0) return NULL;
×
UNCOV
564
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
565
  if (pHead == NULL) return NULL;
×
566

UNCOV
567
  pHead->contLen = htonl(contLen);
×
UNCOV
568
  pHead->vgId = htonl(vgId);
×
UNCOV
569
  if (tSerializeSVArbSetAssignedLeaderReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
×
570
    rpcFreeCont(pHead);
×
571
    return NULL;
×
572
  }
UNCOV
573
  *pContLen = contLen;
×
UNCOV
574
  return pHead;
×
575
}
576

UNCOV
577
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
×
578
                                              int64_t term, char *memberToken, bool force) {
UNCOV
579
  int32_t code = 0;
×
UNCOV
580
  int32_t contLen = 0;
×
UNCOV
581
  void   *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force);
×
UNCOV
582
  if (!pHead) {
×
583
    mError("arbgroup:%d, failed to build set-assigned request", vgId);
×
584
    code = -1;
×
585
    if (terrno != 0) code = terrno;
×
586
    TAOS_RETURN(code);
×
587
  }
UNCOV
588
  SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
×
589

UNCOV
590
  SEpSet epSet = mndGetDnodeEpsetById(pMnode, dnodeId);
×
UNCOV
591
  if (epSet.numOfEps == 0) {
×
592
    mError("arbgroup:%d, failed to send arb-set-assigned request to dnode:%d since no epSet found", vgId, dnodeId);
×
593
    rpcFreeCont(pHead);
×
594
    code = -1;
×
595
    if (terrno != 0) code = terrno;
×
596
    TAOS_RETURN(code);
×
597
  }
UNCOV
598
  code = tmsgSendReq(&epSet, &rpcMsg);
×
UNCOV
599
  if (code != 0) {
×
600
    mError("arbgroup:%d, failed to send arb-set-assigned request to dnode:%d since 0x%x", vgId, dnodeId, code);
×
601
  } else {
UNCOV
602
    mInfo("arbgroup:%d, send arb-set-assigned request to dnode:%d", vgId, dnodeId);
×
603
  }
UNCOV
604
  return code;
×
605
}
606

607
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
44,262✔
608
  *pOp = CHECK_SYNC_NONE;
44,262✔
609
  int32_t code = 0;
44,262✔
610

611
  int32_t vgId = pArbGroup->vgId;
44,262✔
612

613
  bool                member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
44,262✔
614
  bool                member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
44,262✔
615
  SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
44,262✔
616
  int32_t             currentAssignedDnodeId = pAssignedLeader->assignedDnodeId;
44,262✔
617

618
  // 1. has assigned && no response => send req
619
  if (currentAssignedDnodeId != 0 && pAssignedLeader->assignAcked == false) {
44,262✔
UNCOV
620
    *pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
×
UNCOV
621
    return;
×
622
  }
623

624
  // 2. both of the two members are timeout => skip
625
  if (member0IsTimeout && member1IsTimeout) {
44,262✔
626
    return;
725✔
627
  }
628

629
  // 3. no member is timeout => check sync
630
  if (member0IsTimeout == false && member1IsTimeout == false) {
43,537✔
631
    // no assigned leader and not sync
632
    if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
31,212✔
633
      *pOp = CHECK_SYNC_CHECK_SYNC;
31,142✔
634
    }
635
    return;
31,212✔
636
  }
637

638
  // 4. one of the members is timeout => set assigned leader
639
  int32_t          candidateIndex = member0IsTimeout ? 1 : 0;
12,325✔
640
  SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
12,325✔
641

642
  // has assigned leader and dnodeId not match => skip
643
  if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
12,325✔
644
    mInfo("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, assigned leader has been set to dnodeId:%d",
×
645
          vgId, pMember->info.dnodeId, currentAssignedDnodeId);
646
    return;
×
647
  }
648

649
  // not sync => skip
650
  if (pArbGroup->isSync == false) {
12,325✔
651
    if (currentAssignedDnodeId == pMember->info.dnodeId) {
12,325✔
652
      mDebug("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, arb group is not sync", vgId,
×
653
             pMember->info.dnodeId);
654
    } else {
655
      mInfo("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, arb group is not sync", vgId,
12,325✔
656
            pMember->info.dnodeId);
657
    }
658
    *pOp = CHECK_SYNC_CHECK_SYNC;
12,325✔
659
    return;
12,325✔
660
  }
661

662
  // is sync && no assigned leader => write to sdb
UNCOV
663
  mndArbGroupDupObj(pArbGroup, pNewGroup);
×
UNCOV
664
  mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
×
UNCOV
665
  *pOp = CHECK_SYNC_UPDATE;
×
666
}
667

668
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) {
×
669
  SMnode    *pMnode = pReq->info.node;
×
670
  int32_t    code = -1, lino = 0;
×
671
  SArray    *pArray = NULL;
×
672
  void      *pIter = NULL;
×
673
  SSdb      *pSdb = pMnode->pSdb;
×
674
  SArbGroup *pArbGroup = NULL;
×
675

676
  SAssignLeaderReq req = {0};
×
677
  if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) {
×
678
    code = TSDB_CODE_INVALID_MSG;
×
679
    goto _exit;
×
680
  }
681

682
  mInfo("arbgroup:0, begin to process assign leader");
×
683

684
  char arbToken[TSDB_ARB_TOKEN_SIZE];
×
685
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
×
686

687
  int64_t term = mndGetTerm(pMnode);
×
688
  if (term < 0) {
×
689
    mError("arbgroup:0, arb failed to get term since %s", terrstr());
×
690
    code = -1;
×
691
    if (terrno != 0) code = terrno;
×
692
    TAOS_RETURN(code);
×
693
  }
694

695
  while (1) {
×
696
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
×
697
    if (pIter == NULL) break;
×
698

699
    SArbGroup arbGroupDup = {0};
×
700

701
    (void)taosThreadMutexLock(&pArbGroup->mutex);
×
702
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
×
703
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
×
704

705
    sdbRelease(pSdb, pArbGroup);
×
706

707
    int32_t dnodeId = 0;
×
708
    for (int32_t i = 0; i < 2; i++) {
×
709
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId);
×
710
      bool       isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
711
      mndReleaseDnode(pMnode, pDnode);
×
712
      if (isonline) {
×
713
        dnodeId = arbGroupDup.members[i].info.dnodeId;
×
714
        break;
×
715
      }
716
    }
717

718
    (void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true);
×
719
    mInfo("arbgroup:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId);
×
720
  }
721

722
  code = 0;
×
723

724
  // auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen);
725

726
_exit:
×
727
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
728
    mError("arbgroup:0, failed to assign leader since %s", tstrerror(code));
×
729
  }
730

731
  tFreeSAssignLeaderReq(&req);
×
732
  TAOS_RETURN(code);
×
733
}
734

735
static int32_t mndArbProcessTimer(SRpcMsg *pReq) {
7,215,961✔
736
  int32_t    code = 0, lino = 0;
7,215,961✔
737
  SMnode    *pMnode = pReq->info.node;
7,215,961✔
738
  SSdb      *pSdb = pMnode->pSdb;
7,215,961✔
739
  SArbGroup *pArbGroup = NULL;
7,215,961✔
740
  void      *pIter = NULL;
7,215,961✔
741
  SArray    *pUpdateArray = NULL;
7,215,961✔
742

743
  char arbToken[TSDB_ARB_TOKEN_SIZE];
7,205,220✔
744
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
7,215,961✔
745

746
  int64_t term = mndGetTerm(pMnode);
7,215,961✔
747
  if (term < 0) {
7,215,961✔
748
    mError("arbgroup:0, arb failed to get term since %s", terrstr());
×
749
    code = -1;
×
750
    if (terrno != 0) code = terrno;
×
751
    TAOS_RETURN(code);
×
752
  }
753

754
  int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
7,215,961✔
755
  int64_t nowMs = taosGetTimestampMs();
7,215,961✔
756
  if (nowMs - roleTimeMs < tsArbHeartBeatIntervalMs * 2) {
7,215,961✔
757
    mInfo("arbgroup:0, arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64,
370,845✔
758
          roleTimeMs, nowMs);
759
    return 0;
370,845✔
760
  }
761

762
  while (1) {
44,262✔
763
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
6,889,378✔
764
    if (pIter == NULL) break;
6,889,378✔
765

766
    SArbGroup arbGroupDup = {0};
44,262✔
767

768
    (void)taosThreadMutexLock(&pArbGroup->mutex);
44,262✔
769
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
44,262✔
770
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
44,262✔
771

772
    sdbRelease(pSdb, pArbGroup);
44,262✔
773

774
    ECheckSyncOp op = CHECK_SYNC_NONE;
44,262✔
775
    SArbGroup    newGroup = {0};
44,262✔
776
    mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
44,262✔
777

778
    int32_t             vgId = arbGroupDup.vgId;
44,262✔
779
    SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
44,262✔
780
    int32_t             assgndDnodeId = pAssgndLeader->assignedDnodeId;
44,262✔
781

782
    switch (op) {
44,262✔
783
      case CHECK_SYNC_NONE:
795✔
784
        mTrace("arbgroup:%d, arb skip to send msg by check sync", vgId);
795✔
785
        break;
795✔
UNCOV
786
      case CHECK_SYNC_SET_ASSIGNED_LEADER:
×
UNCOV
787
        (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false);
×
UNCOV
788
        mInfo("arbgroup:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
×
UNCOV
789
        break;
×
790
      case CHECK_SYNC_CHECK_SYNC:
43,467✔
791
        (void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
43,467✔
792
                                     arbGroupDup.members[1].state.token);
793
        mInfo("arbgroup:%d, send vnode-arb-check-sync request", vgId);
43,467✔
794
        break;
43,467✔
UNCOV
795
      case CHECK_SYNC_UPDATE:
×
UNCOV
796
        if (!pUpdateArray) {
×
UNCOV
797
          pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
×
UNCOV
798
          if (!pUpdateArray) {
×
799
            TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
800
          }
801
        }
802

UNCOV
803
        if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
×
804
          TAOS_CHECK_EXIT(terrno);
×
805
        }
UNCOV
806
        break;
×
807
      default:
×
808
        mError("arbgroup:%d, arb unknown check sync op:%d", vgId, op);
×
809
        break;
×
810
    }
811
  }
812

813
  TAOS_CHECK_EXIT(mndArbPutBatchUpdateIntoWQ(pMnode, pUpdateArray));
6,845,116✔
814

815
_exit:
6,845,116✔
816
  if (code != 0) {
6,845,116✔
817
    mError("arbgroup:0, failed to check sync at line %d since %s", lino, terrstr());
×
818
  }
819

820
  taosArrayDestroy(pUpdateArray);
6,845,116✔
821
  return 0;
6,845,116✔
822
}
823

824
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
41,566✔
825
  SMArbUpdateGroupBatchReq req = {0};
41,566✔
826
  req.updateArray = updateArray;
41,566✔
827

828
  int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
41,566✔
829
  if (contLen <= 0) return NULL;
41,566✔
830
  SMsgHead *pHead = rpcMallocCont(contLen);
41,566✔
831
  if (pHead == NULL) return NULL;
41,566✔
832

833
  if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
41,566✔
834
    rpcFreeCont(pHead);
×
835
    return NULL;
×
836
  }
837
  *pContLen = contLen;
41,566✔
838
  return pHead;
41,566✔
839
}
840

841
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
43,074✔
842
  outGroup->vgId = pGroup->vgId;
43,074✔
843
  outGroup->dbUid = pGroup->dbUid;
43,074✔
844
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
129,222✔
845
    outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
86,148✔
846
    outGroup->members[i].token = pGroup->members[i].state.token;  // just copy the pointer
86,148✔
847
  }
848
  outGroup->isSync = pGroup->isSync;
43,074✔
849
  outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.assignedDnodeId;
43,074✔
850
  outGroup->assignedLeader.token = pGroup->assignedLeader.token;  // just copy the pointer
43,074✔
851
  outGroup->assignedLeader.acked = pGroup->assignedLeader.assignAcked;
43,074✔
852
  outGroup->version = pGroup->version;
43,074✔
853
  outGroup->code = pGroup->code;
43,074✔
854
  outGroup->updateTimeMs = pGroup->updateTimeMs;
43,074✔
855
}
43,074✔
856

857
static int32_t mndArbPutUpdateArbIntoWQ(SMnode *pMnode, SArbGroup *pNewGroup) {
54✔
858
  if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
54✔
859
    mInfo("arbgroup:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
19✔
860
    return 0;
19✔
861
  }
862

863
  int32_t ret = -1;
35✔
864

865
  SMArbUpdateGroup newGroup = {0};
35✔
866
  mndInitArbUpdateGroup(pNewGroup, &newGroup);
35✔
867

868
  SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
35✔
869
  if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
35✔
870

871
  int32_t contLen = 0;
35✔
872
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
35✔
873
  if (!pHead) {
35✔
874
    mError("arbgroup:0, failed to build arb-update-group request");
×
875
    goto _OVER;
×
876
  }
877

878
  SRpcMsg rpcMsg = {
35✔
879
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
880
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
35✔
881
  if (ret != 0) goto _OVER;
35✔
882

883
  mInfo("arbgroup:%d, put into arb update hash", pNewGroup->vgId);
35✔
884
  if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
35✔
885

886
_OVER:
35✔
887
  taosArrayDestroy(pArray);
35✔
888
  if (ret != 0) {
35✔
889
    mError("arbgroup:%d, failed to put arb group update into write queue since %s", pNewGroup->vgId, tstrerror(ret));
×
890
  }
891
  return ret;
35✔
892
}
893

894
static int32_t mndArbPutBatchUpdateIntoWQ(SMnode *pMnode, SArray *newGroupArray) {
6,956,103✔
895
  int32_t ret = -1;
6,956,103✔
896

897
  size_t  sz = taosArrayGetSize(newGroupArray);
6,956,103✔
898
  SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
6,956,103✔
899
  for (size_t i = 0; i < sz; i++) {
7,056,520✔
900
    SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
100,417✔
901
    if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
100,417✔
902
      mInfo("arbgroup:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
57,378✔
903
      continue;
57,378✔
904
    }
905

906
    SMArbUpdateGroup newGroup = {0};
43,039✔
907
    mndInitArbUpdateGroup(pNewGroup, &newGroup);
43,039✔
908

909
    if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
43,039✔
910
    mInfo("arbgroup:%d, put into arb update hash in array", pNewGroup->vgId);
43,039✔
911
    if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) {
43,039✔
912
      mError("arbgroup:%d, failed to put into arb update hash since %s", pNewGroup->vgId, tstrerror(ret));
×
913
      goto _OVER;
×
914
    }
915
  }
916

917
  if (taosArrayGetSize(pArray) == 0) {
6,956,103✔
918
    ret = 0;
6,914,572✔
919
    goto _OVER;
6,914,572✔
920
  }
921

922
  int32_t contLen = 0;
41,531✔
923
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
41,531✔
924
  if (!pHead) {
41,531✔
925
    mError("arbgroup:0, failed to build arb-update-group request");
×
926
    goto _OVER;
×
927
  }
928

929
  SRpcMsg rpcMsg = {
41,531✔
930
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
931
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
41,531✔
932

933
_OVER:
6,956,103✔
934
  taosArrayDestroy(pArray);
6,956,103✔
935

936
  if (ret != 0) {
6,956,103✔
937
    mError("arbgroup:0, failed to put arb group update into write queue since %s", tstrerror(ret));
×
938
    for (size_t i = 0; i < sz; i++) {
×
939
      SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
940
      if (taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != 0) {
×
941
        mError("arbgroup:%d, failed to remove from arb Update Hash", pNewGroup->vgId);
×
942
      }
943
    }
944
  }
945

946
  return ret;
6,956,103✔
947
}
948

949
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
41,566✔
950
  int    code = -1;
41,566✔
951
  size_t sz = 0;
41,566✔
952

953
  SMArbUpdateGroupBatchReq req = {0};
41,566✔
954
  if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
41,566✔
955
    mError("arbgroup:0, arb failed to decode arb-update-group request");
×
956
    TAOS_RETURN(code);
×
957
  }
958

959
  SMnode *pMnode = pReq->info.node;
41,566✔
960
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
41,566✔
961
  if (pTrans == NULL) {
41,566✔
962
    mError("arbgroup:0, failed to create update arbgroup trans, since %s", terrstr());
×
963
    tFreeSMArbUpdateGroupBatchReq(&req);
×
964
    TAOS_RETURN(terrno);
×
965
  }
966

967
  sz = taosArrayGetSize(req.updateArray);
41,566✔
968
  for (size_t i = 0; i < sz; i++) {
84,640✔
969
    SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
43,074✔
970
    SArbGroup         newGroup = {0};
43,074✔
971
    newGroup.vgId = pUpdateGroup->vgId;
43,074✔
972
    newGroup.dbUid = pUpdateGroup->dbUid;
43,074✔
973
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
129,222✔
974
      newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
86,148✔
975
      tstrncpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
86,148✔
976
    }
977

978
    newGroup.isSync = pUpdateGroup->isSync;
43,074✔
979
    newGroup.assignedLeader.assignedDnodeId = pUpdateGroup->assignedLeader.dnodeId;
43,074✔
980
    tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
43,074✔
981
    newGroup.assignedLeader.assignAcked = pUpdateGroup->assignedLeader.acked;
43,074✔
982
    newGroup.version = pUpdateGroup->version;
43,074✔
983
    newGroup.code = pUpdateGroup->code;
43,074✔
984
    newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
43,074✔
985

986
    mInfo(
43,074✔
987
        "trans:%d, arbgroup:%d, used to update member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
988
        "%" PRId64,
989
        pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
990
        newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
991
        newGroup.assignedLeader.assignedDnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.assignAcked,
992
        pUpdateGroup->code, pUpdateGroup->updateTimeMs);
993

994
    SArbGroup *pOldGroup = mndAcquireArbGroup(pMnode, newGroup.vgId);
43,074✔
995
    if (!pOldGroup) {
43,074✔
996
      mError("trans:%d, arbgroup:%d, arb skip to update arbgroup, since no obj found", pTrans->id, newGroup.vgId);
×
997
      if (taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)) != 0) {
×
998
        mError("trans:%d, arbgroup:%d, failed to remove from arb Update Hash", pTrans->id, newGroup.vgId);
×
999
      }
1000
      continue;
×
1001
    }
1002

1003
    mndTransAddArbGroupId(pTrans, newGroup.vgId);
43,074✔
1004

1005
    if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
43,074✔
1006
      mError("trans:%d, arbgroup:%d, failed to update arbgroup in set commit log since %s", pTrans->id, newGroup.vgId,
×
1007
             tstrerror(code));
1008
      mndReleaseArbGroup(pMnode, pOldGroup);
×
1009
      goto _OVER;
×
1010
    }
1011

1012
    mInfo("trans:%d, arbgroup:%d, used to update member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
43,074✔
1013
          pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
1014
          newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
1015
          newGroup.assignedLeader.assignedDnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.assignAcked);
1016

1017
    mndReleaseArbGroup(pMnode, pOldGroup);
43,074✔
1018
  }
1019

1020
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
41,566✔
1021
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
6,041✔
1022

1023
  code = 0;
6,041✔
1024

1025
_OVER:
41,566✔
1026
  if (code != 0) {
41,566✔
1027
    // failed to update arbgroup
1028
    mError("trans:%d, arbgroup:0, failed to update arbgroup since %s", pTrans->id, tstrerror(code));
35,525✔
1029
    for (size_t i = 0; i < sz; i++) {
71,050✔
1030
      SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
35,525✔
1031
      if (taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)) != 0) {
35,525✔
1032
        mError("trans:%d, arbgroup:%d failed to remove from arb Update Hash", pTrans->id, pUpdateGroup->vgId);
×
1033
      }
1034
    }
1035
  }
1036

1037
  mndTransDrop(pTrans);
41,566✔
1038
  tFreeSMArbUpdateGroupBatchReq(&req);
41,566✔
1039
  return code;
41,566✔
1040
}
1041

1042
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
144,733✔
1043
  (void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
144,733✔
1044
}
144,733✔
1045

UNCOV
1046
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
×
UNCOV
1047
  SArbGroupMember *pMember = &pGroup->members[index];
×
1048

UNCOV
1049
  pGroup->assignedLeader.assignedDnodeId = pMember->info.dnodeId;
×
UNCOV
1050
  tstrncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
×
UNCOV
1051
  pGroup->assignedLeader.assignAcked = false;
×
UNCOV
1052
}
×
1053

UNCOV
1054
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
×
UNCOV
1055
  pGroup->assignedLeader.assignedDnodeId = 0;
×
UNCOV
1056
  (void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
×
UNCOV
1057
  pGroup->assignedLeader.assignAcked = false;
×
UNCOV
1058
}
×
1059

1060
bool mndArbIsNeedUpdateTokenByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
101,533✔
1061
                                        SArbGroup *pNewGroup) {
1062
  bool             updateToken = false;
101,533✔
1063
  SArbGroupMember *pMember = NULL;
101,533✔
1064

1065
  (void)taosThreadMutexLock(&pGroup->mutex);
101,533✔
1066

1067
  int index = 0;
101,533✔
1068
  for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
151,942✔
1069
    pMember = &pGroup->members[index];
151,942✔
1070
    if (pMember->info.dnodeId == dnodeId) {
151,942✔
1071
      break;
101,533✔
1072
    }
1073
    pMember = NULL;
50,409✔
1074
  }
1075

1076
  if (pMember == NULL) {
101,533✔
1077
    mError("arbgroup:%d, arb token update check failed, dnodeId:%d not found", pRspMember->vgId, dnodeId);
×
1078
    goto _OVER;
×
1079
  }
1080

1081
  if (pMember->state.responsedHbSeq >= pRspMember->hbSeq) {
101,533✔
1082
    // skip
1083
    mError("arbgroup:%d, dnodeId:%d skip arb token update, heart beat seq expired, local:%d msg:%d", pRspMember->vgId,
×
1084
           dnodeId, pMember->state.responsedHbSeq, pRspMember->hbSeq);
1085
    goto _OVER;
×
1086
  }
1087

1088
  // update hb state
1089
  pMember->state.responsedHbSeq = pRspMember->hbSeq;
101,533✔
1090
  pMember->state.lastHbMs = nowMs;
101,533✔
1091
  if (mndArbCheckToken(pMember->state.token, pRspMember->memberToken) == 0) {
101,533✔
1092
    // skip
1093
    mDebug("arbgroup:%d, dnodeId:%d skip arb token update, token matched", pRspMember->vgId, dnodeId);
1,116✔
1094
    goto _OVER;
1,116✔
1095
  }
1096

1097
  // update token
1098
  mndArbGroupDupObj(pGroup, pNewGroup);
100,417✔
1099
  tstrncpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
100,417✔
1100
  pNewGroup->isSync = false;
100,417✔
1101

1102
  bool resetAssigned = false;
100,417✔
1103
  if (pMember->info.dnodeId == pGroup->assignedLeader.assignedDnodeId) {
100,417✔
UNCOV
1104
    mndArbGroupResetAssignedLeader(pNewGroup);
×
UNCOV
1105
    resetAssigned = true;
×
1106
  }
1107

1108
  updateToken = true;
100,417✔
1109
  mInfo("arbgroup:%d, need to update token, by heartbeat from dnodeId:%d, resetAssigned:%d", pRspMember->vgId, dnodeId,
100,417✔
1110
        resetAssigned);
1111

1112
_OVER:
×
1113
  (void)taosThreadMutexUnlock(&pGroup->mutex);
101,533✔
1114
  return updateToken;
101,533✔
1115
}
1116

1117
static int32_t mndArbUpdateByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
110,987✔
1118
  int64_t nowMs = taosGetTimestampMs();
110,987✔
1119
  size_t  size = taosArrayGetSize(memberArray);
110,987✔
1120
  SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
110,987✔
1121

1122
  for (size_t i = 0; i < size; i++) {
212,520✔
1123
    SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
101,533✔
1124

1125
    SArbGroup  newGroup = {0};
101,533✔
1126
    SArbGroup *pGroup = mndAcquireArbGroup(pMnode, pRspMember->vgId);
101,533✔
1127
    if (pGroup == NULL) {
101,533✔
1128
      mError("arbgroup:%d failed to update arb token, not found", pRspMember->vgId);
×
1129
      continue;
×
1130
    }
1131

1132
    bool updateToken = mndArbIsNeedUpdateTokenByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
101,533✔
1133
    if (updateToken) {
101,533✔
1134
      if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
100,417✔
1135
        mError("arbgroup:%d, failed to push newGroup to updateArray, but continue at this heartbeat", pRspMember->vgId);
×
1136
      }
1137
    }
1138

1139
    mndReleaseArbGroup(pMnode, pGroup);
101,533✔
1140
  }
1141

1142
  TAOS_CHECK_RETURN(mndArbPutBatchUpdateIntoWQ(pMnode, pUpdateArray));
110,987✔
1143

1144
  taosArrayDestroy(pUpdateArray);
110,987✔
1145
  return 0;
110,987✔
1146
}
1147

1148
bool mndArbIsNeedUpdateSyncStatusByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
779✔
1149
                                             bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
1150
  bool updateIsSync = false;
779✔
1151

1152
  (void)taosThreadMutexLock(&pGroup->mutex);
779✔
1153

1154
  if (pGroup->assignedLeader.assignedDnodeId != 0) {
779✔
1155
    terrno = TSDB_CODE_SUCCESS;
×
1156
    mInfo("arbgroup:%d, skip to update arb sync, has assigned leader:%d", vgId, pGroup->assignedLeader.assignedDnodeId);
×
1157
    goto _OVER;
×
1158
  }
1159

1160
  char *local0Token = pGroup->members[0].state.token;
779✔
1161
  char *local1Token = pGroup->members[1].state.token;
779✔
1162
  if (mndArbCheckToken(local0Token, member0Token) != 0 || mndArbCheckToken(local1Token, member1Token) != 0) {
779✔
1163
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
725✔
1164
    mInfo("arbgroup:0, skip to update arb sync, memberToken mismatch local:[%s][%s], msg:[%s][%s]", local0Token,
725✔
1165
          local1Token, member0Token, member1Token);
1166
    goto _OVER;
725✔
1167
  }
1168

1169
  if (pGroup->isSync != newIsSync) {
54✔
1170
    mndArbGroupDupObj(pGroup, pNewGroup);
54✔
1171
    pNewGroup->isSync = newIsSync;
54✔
1172
    pNewGroup->code = code;
54✔
1173
    pNewGroup->updateTimeMs = taosGetTimestampMs();
54✔
1174

1175
    mInfo("arbgroup:%d, need to update isSync status, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync,
54✔
1176
          pNewGroup->updateTimeMs);
1177
    updateIsSync = true;
54✔
1178
  }
1179

1180
_OVER:
×
1181
  (void)taosThreadMutexUnlock(&pGroup->mutex);
779✔
1182
  return updateIsSync;
779✔
1183
}
1184

1185
static int32_t mndArbUpdateByCheckSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token,
779✔
1186
                                       bool newIsSync, int32_t rsp_code) {
1187
  int32_t    code = 0;
779✔
1188
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, vgId);
779✔
1189
  if (pGroup == NULL) {
779✔
1190
    mError("arbgroup:%d, failed to update arb sync, not found", vgId);
×
1191
    code = -1;
×
1192
    if (terrno != 0) code = terrno;
×
1193
    TAOS_RETURN(code);
×
1194
  }
1195

1196
  SArbGroup newGroup = {0};
779✔
1197
  bool      updateIsSync =
1198
      mndArbIsNeedUpdateSyncStatusByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
779✔
1199
  if (updateIsSync) {
779✔
1200
    if (mndArbPutUpdateArbIntoWQ(pMnode, &newGroup) != 0) {
54✔
1201
      mError("arbgroup:%d, failed to pullup update arb sync, since %s", vgId, terrstr());
×
1202
    }
1203
  }
1204

1205
  mndReleaseArbGroup(pMnode, pGroup);
779✔
1206
  return 0;
779✔
1207
}
1208

1209
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
110,987✔
1210
  if (pRsp->contLen == 0) {
110,987✔
1211
    mDebug("arbgroup:0, arb hb-rsp contLen is 0");
×
1212
    return 0;
×
1213
  }
1214

1215
  int32_t code = -1;
110,987✔
1216

1217
  SMnode *pMnode = pRsp->info.node;
110,987✔
1218
  SSdb   *pSdb = pMnode->pSdb;
110,987✔
1219

1220
  char arbToken[TSDB_ARB_TOKEN_SIZE];
110,987✔
1221
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
110,987✔
1222
    mError("arbgroup:0, failed to get arb token for arb-hb response");
×
1223
    TAOS_RETURN(code);
×
1224
  }
1225

1226
  SVArbHeartBeatRsp arbHbRsp = {0};
110,987✔
1227
  if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
110,987✔
1228
    mInfo("arbgroup:0, arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1229
    TAOS_RETURN(code);
×
1230
  }
1231

1232
  if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
110,987✔
1233
    mInfo("arbgroup:0, arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]",
×
1234
          arbHbRsp.dnodeId, arbToken, arbHbRsp.arbToken);
1235
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1236
    goto _OVER;
×
1237
  }
1238

1239
  if (tsSyncLogHeartbeat) {
110,987✔
1240
    mInfo("arbgroup:0, receive arb-hb rsp from dnode %d", arbHbRsp.dnodeId);
×
1241
  } else {
1242
    mTrace("arbgroup:0, receive arb-hb rsp from dnode %d", arbHbRsp.dnodeId);
110,987✔
1243
  }
1244

1245
  TAOS_CHECK_GOTO(mndArbUpdateByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
110,987✔
1246
  code = 0;
110,987✔
1247

1248
_OVER:
110,987✔
1249
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
110,987✔
1250
  return code;
110,987✔
1251
}
1252

1253
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
43,467✔
1254
  if (pRsp->contLen == 0) {
43,467✔
1255
    mDebug("arbgroup:0, arb check-sync-rsp contLen is 0");
42,688✔
1256
    return 0;
42,688✔
1257
  }
1258

1259
  int32_t code = -1;
779✔
1260

1261
  SMnode *pMnode = pRsp->info.node;
779✔
1262
  SSdb   *pSdb = pMnode->pSdb;
779✔
1263

1264
  char arbToken[TSDB_ARB_TOKEN_SIZE];
779✔
1265
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
779✔
1266
    mError("arbgroup:0, failed to get arb token from vnode-arb-check-sync-rsp");
×
1267
    TAOS_RETURN(code);
×
1268
  }
1269

1270
  SVArbCheckSyncRsp syncRsp = {0};
779✔
1271
  if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
779✔
1272
    mInfo("arbgroup:0, arb vnode-arb-check-sync-rsp deserialize failed, since:%s", tstrerror(pRsp->code));
×
1273
    if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
×
1274
      terrno = TSDB_CODE_SUCCESS;
×
1275
      return 0;
×
1276
    }
1277
    TAOS_RETURN(code);
×
1278
  }
1279

1280
  mInfo("arbgroup:%d, vnode-arb-check-sync-rsp received, QID:0x%" PRIx64 ":0x%" PRIx64 ", seqNum:%" PRIx64
779✔
1281
        ", errCode:%d",
1282
        syncRsp.vgId, pRsp->info.traceId.rootId, pRsp->info.traceId.msgId, pRsp->info.seqNum, syncRsp.errCode);
1283
  if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
779✔
1284
    mError("arbgroup:%d, skip update arb sync for arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
×
1285
           syncRsp.arbToken);
1286
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1287
    goto _OVER;
×
1288
  }
1289

1290
  bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
779✔
1291
  if ((code = mndArbUpdateByCheckSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
779✔
1292
                                      syncRsp.errCode)) != 0) {
1293
    mError("arbgroup:%d, failed to update arb sync for since:%s", syncRsp.vgId, terrstr());
×
1294
    goto _OVER;
×
1295
  }
1296

1297
  code = 0;
779✔
1298

1299
_OVER:
779✔
1300
  tFreeSVArbCheckSyncRsp(&syncRsp);
779✔
1301
  TAOS_RETURN(code);
779✔
1302
}
1303

1304
bool mndArbIsNeedUpdateAssignedBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
×
1305
                                                   SArbGroup *pNewGroup) {
1306
  bool updateAssigned = false;
×
1307

1308
  (void)taosThreadMutexLock(&pGroup->mutex);
×
1309
  if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
×
1310
    mError("arbgroup:%d, skip update arb assigned for member token mismatch, local:[%s] msg:[%s]", vgId,
×
1311
           pGroup->assignedLeader.token, memberToken);
1312
    goto _OVER;
×
1313
  }
1314

1315
  if (errcode != TSDB_CODE_SUCCESS) {
×
1316
    mError("arbgroup:%d, skip update arb assigned for since:%s", vgId, tstrerror(errcode));
×
1317
    goto _OVER;
×
1318
  }
1319

1320
  if (pGroup->assignedLeader.assignAcked == false) {
×
1321
    mndArbGroupDupObj(pGroup, pNewGroup);
×
1322
    pNewGroup->isSync = false;
×
1323
    pNewGroup->assignedLeader.assignAcked = true;
×
1324

1325
    mInfo("arbgroup:%d, arb received assigned ack", vgId);
×
1326
    updateAssigned = true;
×
1327
    goto _OVER;
×
1328
  }
1329

1330
_OVER:
×
1331
  (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1332
  return updateAssigned;
×
1333
}
1334

UNCOV
1335
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
×
UNCOV
1336
  if (pRsp->contLen == 0) {
×
UNCOV
1337
    mDebug("arbgroup:0, arb set-assigned-rsp contLen is 0");
×
UNCOV
1338
    return 0;
×
1339
  }
1340

1341
  int32_t code = -1;
×
1342

1343
  SMnode *pMnode = pRsp->info.node;
×
1344
  SSdb   *pSdb = pMnode->pSdb;
×
1345

1346
  char arbToken[TSDB_ARB_TOKEN_SIZE];
×
1347
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1348
    mError("arbgroup:0, failed to get arb token for arb-set-assigned response");
×
1349
    TAOS_RETURN(code);
×
1350
  }
1351

1352
  SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
×
1353
  if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
×
1354
    mError("arbgroup:0, arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1355
    TAOS_RETURN(code);
×
1356
  }
1357

1358
  if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
×
1359
    mError("arbgroup:%d, skip update arb assigned for arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
×
1360
           arbToken, setAssignedRsp.arbToken);
1361
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1362
    goto _OVER;
×
1363
  }
1364

1365
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
×
1366
  if (!pGroup) {
×
1367
    mError("arbgroup:%d, failed to set arb assigned for since:%s", setAssignedRsp.vgId, terrstr());
×
1368
    code = -1;
×
1369
    if (terrno != 0) code = terrno;
×
1370
    goto _OVER;
×
1371
  }
1372

1373
  SArbGroup newGroup = {0};
×
1374
  bool      updateAssigned = mndArbIsNeedUpdateAssignedBySetAssignedLeader(
×
1375
      pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken, pRsp->code, &newGroup);
1376
  if (updateAssigned) {
×
1377
    if ((code = mndArbPutUpdateArbIntoWQ(pMnode, &newGroup)) != 0) {
×
1378
      mError("arbgroup:%d, failed to pullup update arb assigned since:%s", setAssignedRsp.vgId, tstrerror(code));
×
1379
      goto _OVER;
×
1380
    }
1381
  }
1382

1383
  mndReleaseArbGroup(pMnode, pGroup);
×
1384

1385
  code = 0;
×
1386

1387
_OVER:
×
1388
  tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
×
1389
  return code;
×
1390
}
1391

1392
static char *formatTimestamp(char *buf, int64_t val, int precision) {
×
1393
  time_t tt;
×
1394
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1395
    tt = (time_t)(val / 1000000);
×
1396
  }
1397
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
1398
    tt = (time_t)(val / 1000000000);
×
1399
  } else {
1400
    tt = (time_t)(val / 1000);
×
1401
  }
1402

1403
  struct tm tm;
×
1404
  if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
×
1405
    mError("failed to get local time");
×
1406
    return NULL;
×
1407
  }
1408
  size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
×
1409

1410
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1411
    sprintf(buf + pos, ".%06d", (int)(val % 1000000));
×
1412
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
1413
    sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
×
1414
  } else {
1415
    sprintf(buf + pos, ".%03d", (int)(val % 1000));
×
1416
  }
1417

1418
  return buf;
×
1419
}
1420

1421
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1422
  SMnode    *pMnode = pReq->info.node;
×
1423
  SSdb      *pSdb = pMnode->pSdb;
×
1424
  int32_t    numOfRows = 0;
×
1425
  int32_t    cols = 0;
×
1426
  SArbGroup *pGroup = NULL;
×
1427
  int32_t    code = 0;
×
1428
  int32_t    lino = 0;
×
1429

1430
  while (numOfRows < rows) {
×
1431
    pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
×
1432
    if (pShow->pIter == NULL) break;
×
1433

1434
    (void)taosThreadMutexLock(&pGroup->mutex);
×
1435

1436
    cols = 0;
×
1437
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1438
    SVgObj          *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
×
1439
    if (!pVgObj) {
×
1440
      (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1441
      sdbRelease(pSdb, pGroup);
×
1442
      continue;
×
1443
    }
1444
    char dbNameInGroup[TSDB_DB_FNAME_LEN];
×
1445
    tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
×
1446
    sdbRelease(pSdb, pVgObj);
×
1447

1448
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1449
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(dbNameInGroup), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1450
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pGroup, &lino, _OVER);
×
1451

1452
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1453
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false), pGroup, &lino, _OVER);
×
1454

1455
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
1456
      SArbGroupMember *pMember = &pGroup->members[i];
×
1457
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1458
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false), pGroup,
×
1459
                          &lino, _OVER);
1460
    }
1461

1462
    mInfo("arbgroup:%d, arb group sync:%d, code:%s, update time:%" PRId64, pGroup->vgId, pGroup->isSync,
×
1463
          tstrerror(pGroup->code), pGroup->updateTimeMs);
1464

1465
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1466
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
×
1467

1468
    char strCheckSyncCode[100] = {0};
×
1469
    char bufUpdateTime[40] = {0};
×
1470
    (void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
×
1471
    (void)tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
×
1472

1473
    char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
×
1474
    STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);
×
1475
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1476
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)checkSyncCode, false), pGroup, &lino, _OVER);
×
1477

1478
    if (pGroup->assignedLeader.assignedDnodeId != 0) {
×
1479
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1480
      RETRIEVE_CHECK_GOTO(
×
1481
          colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.assignedDnodeId, false), pGroup,
1482
          &lino, _OVER);
1483

1484
      char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
×
1485
      STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1486
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1487
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)token, false), pGroup, &lino, _OVER);
×
1488

1489
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1490
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.assignAcked, false),
×
1491
                          pGroup, &lino, _OVER);
1492
    } else {
1493
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1494
      colDataSetNULL(pColInfo, numOfRows);
×
1495

1496
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1497
      colDataSetNULL(pColInfo, numOfRows);
×
1498

1499
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1500
      colDataSetNULL(pColInfo, numOfRows);
×
1501
    }
1502

1503
    (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1504

1505
    numOfRows++;
×
1506
    sdbRelease(pSdb, pGroup);
×
1507
  }
1508

1509
_OVER:
×
1510
  if (code != 0) mError("arbgroup:0, failed to restrieve arb group at line:%d, since %s", lino, tstrerror(code));
×
1511
  pShow->numOfRows += numOfRows;
×
1512

1513
  return numOfRows;
×
1514
}
1515

1516
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter) {
×
1517
  SSdb *pSdb = pMnode->pSdb;
×
1518
  sdbCancelFetchByType(pSdb, pIter, SDB_ARBGROUP);
×
1519
}
×
1520

1521
int32_t mndGetArbGroupSize(SMnode *pMnode) {
×
1522
  SSdb *pSdb = pMnode->pSdb;
×
1523
  return sdbGetSize(pSdb, SDB_ARBGROUP);
×
1524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc