• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4911

04 Jan 2026 09:05AM UTC coverage: 65.028% (-0.8%) from 65.864%
#4911

push

travis-ci

web-flow
merge: from main to 3.0 branch #34156

1206 of 4524 new or added lines in 22 files covered. (26.66%)

1517 existing lines in 134 files now uncovered.

195276 of 300296 relevant lines covered (65.03%)

116931714.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.81
/source/dnode/mnode/impl/src/mndArbGroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndArbGroup.h"
18
#include "audit.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndShow.h"
22
#include "mndSync.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25

26
#define ARBGROUP_VER_NUMBER   1
27
#define ARBGROUP_RESERVE_SIZE 51
28

29
static SHashObj *arbUpdateHash = NULL;
30

31
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
32
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
33
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
34

35
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
36
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
37
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
38

39
static int32_t mndArbPutUpdateArbIntoWQ(SMnode *pMnode, SArbGroup *pNewGroup);
40
static int32_t mndArbPutBatchUpdateIntoWQ(SMnode *pMnode, SArray *newGroupArray);
41

42
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
43
static int32_t mndArbProcessTimer(SRpcMsg *pReq);
44
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
45
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
46
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
47
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
48
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
50
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq);
51

52
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
209,567✔
53
  if (token1 == NULL || token2 == NULL) return -1;
209,567✔
54
  if (strlen(token1) == 0 || strlen(token2) == 0) return -1;
209,567✔
55
  return strncmp(token1, token2, TSDB_ARB_TOKEN_SIZE);
165,083✔
56
}
57

58
int32_t mndInitArbGroup(SMnode *pMnode) {
384,737✔
59
  int32_t   code = 0;
384,737✔
60
  SSdbTable table = {
384,737✔
61
      .sdbType = SDB_ARBGROUP,
62
      .keyType = SDB_KEY_INT32,
63
      .encodeFp = (SdbEncodeFp)mndArbGroupActionEncode,
64
      .decodeFp = (SdbDecodeFp)mndArbGroupActionDecode,
65
      .insertFp = (SdbInsertFp)mndArbGroupActionInsert,
66
      .updateFp = (SdbUpdateFp)mndArbGroupActionUpdate,
67
      .deleteFp = (SdbDeleteFp)mndArbGroupActionDelete,
68
  };
69

70
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
384,737✔
71
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndArbProcessTimer);
384,737✔
72
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
384,737✔
73
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
384,737✔
74
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
384,737✔
75
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
384,737✔
76
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_ASSIGN_LEADER, mndProcessAssignLeaderMsg);
384,737✔
77

78
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
384,737✔
79
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
384,737✔
80

81
  arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
384,737✔
82
  if (arbUpdateHash == NULL) {
384,737✔
83
    code = terrno;
×
84
    TAOS_RETURN(code);
×
85
  }
86

87
  return sdbSetTable(pMnode->pSdb, table);
384,737✔
88
}
89

90
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
384,676✔
91

92
static SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
138,580✔
93
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
138,580✔
94
  if (pGroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
138,580✔
95
    terrno = TSDB_CODE_MND_ARBGROUP_NOT_EXIST;
×
96
  }
97
  return pGroup;
138,580✔
98
}
99

100
static void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
138,580✔
101
  SSdb *pSdb = pMnode->pSdb;
138,580✔
102
  sdbRelease(pSdb, pGroup);
138,580✔
103
}
138,580✔
104

105
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
11,884✔
106
  if (pVgObj->replica != 2) {
11,884✔
107
    TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
108
  }
109
  (void)memset(outGroup, 0, sizeof(SArbGroup));
11,884✔
110
  outGroup->dbUid = pVgObj->dbUid;
11,884✔
111
  outGroup->vgId = pVgObj->vgId;
11,884✔
112
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
35,652✔
113
    SArbGroupMember *pMember = &outGroup->members[i];
23,768✔
114
    pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
23,768✔
115
  }
116

117
  TAOS_RETURN(TSDB_CODE_SUCCESS);
11,884✔
118
}
119

120
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
62,814✔
121
  int32_t code = 0;
62,814✔
122
  int32_t lino = 0;
62,814✔
123
  terrno = TSDB_CODE_OUT_OF_MEMORY;
62,814✔
124

125
  int32_t  size = sizeof(SArbGroup) + ARBGROUP_RESERVE_SIZE;
62,814✔
126
  SSdbRaw *pRaw = sdbAllocRaw(SDB_ARBGROUP, ARBGROUP_VER_NUMBER, size);
62,814✔
127
  if (pRaw == NULL) goto _OVER;
62,814✔
128

129
  int32_t dataPos = 0;
62,814✔
130
  SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
62,814✔
131
  SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
62,814✔
132
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
188,442✔
133
    SArbGroupMember *pMember = &pGroup->members[i];
125,628✔
134
    SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
125,628✔
135
    SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
125,628✔
136
  }
137
  SDB_SET_INT8(pRaw, dataPos, pGroup->isSync, _OVER)
62,814✔
138

139
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
62,814✔
140
  SDB_SET_INT32(pRaw, dataPos, pLeader->assignedDnodeId, _OVER)
62,814✔
141
  SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
62,814✔
142
  SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
62,814✔
143
  SDB_SET_INT8(pRaw, dataPos, pLeader->assignAcked, _OVER)
62,814✔
144
  SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
62,814✔
145
  SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
62,814✔
146

147
  SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
62,814✔
148

149
  terrno = 0;
62,814✔
150

151
_OVER:
62,814✔
152
  if (terrno != 0) {
62,814✔
153
    mError("arbgroup:%d, failed to encode to raw:%p since %s", pGroup->vgId, pRaw, terrstr());
×
154
    sdbFreeRaw(pRaw);
×
155
    return NULL;
×
156
  }
157

158
  mTrace("arbgroup:%d, encode to raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
62,814✔
159
  return pRaw;
62,814✔
160
}
161

162
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
24,114✔
163
  int32_t code = 0;
24,114✔
164
  int32_t lino = 0;
24,114✔
165
  terrno = TSDB_CODE_OUT_OF_MEMORY;
24,114✔
166
  SSdbRow   *pRow = NULL;
24,114✔
167
  SArbGroup *pGroup = NULL;
24,114✔
168

169
  int8_t sver = 0;
24,114✔
170
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
24,114✔
171

172
  if (sver != ARBGROUP_VER_NUMBER) {
24,114✔
173
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
174
    goto _OVER;
×
175
  }
176

177
  pRow = sdbAllocRow(sizeof(SArbGroup));
24,114✔
178
  if (pRow == NULL) goto _OVER;
24,114✔
179

180
  pGroup = sdbGetRowObj(pRow);
24,114✔
181
  if (pGroup == NULL) goto _OVER;
24,114✔
182

183
  int32_t dataPos = 0;
24,114✔
184
  SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
24,114✔
185
  SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
24,114✔
186
  int64_t nowMs = taosGetTimestampMs();
24,114✔
187
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
72,342✔
188
    SArbGroupMember *pMember = &pGroup->members[i];
48,228✔
189
    SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
48,228✔
190
    SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
48,228✔
191

192
    pMember->state.nextHbSeq = 0;
48,228✔
193
    pMember->state.responsedHbSeq = -1;
48,228✔
194
    pMember->state.lastHbMs = nowMs;
48,228✔
195
  }
196
  SDB_GET_INT8(pRaw, dataPos, &pGroup->isSync, _OVER)
24,114✔
197

198
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
24,114✔
199
  SDB_GET_INT32(pRaw, dataPos, &pLeader->assignedDnodeId, _OVER)
24,114✔
200
  SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
24,114✔
201
  SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
24,114✔
202
  SDB_GET_INT8(pRaw, dataPos, &pLeader->assignAcked, _OVER)
24,114✔
203
  SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
24,114✔
204
  SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
24,114✔
205

206
  pGroup->mutexInited = false;
24,114✔
207

208
  SDB_GET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
24,114✔
209

210
  terrno = 0;
24,114✔
211

212
_OVER:
24,114✔
213
  if (terrno != 0) {
24,114✔
214
    mError("arbgroup:%d, failed to decode from raw:%p since %s", pGroup == NULL ? 0 : pGroup->vgId, pRaw, terrstr());
×
215
    taosMemoryFreeClear(pRow);
×
216
    return NULL;
×
217
  }
218

219
  mTrace("arbgroup:%d, decode from raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
24,114✔
220
  return pRow;
24,114✔
221
}
222

223
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
6,195✔
224
  mTrace("arbgroup:%d, perform insert action, row:%p", pGroup->vgId, pGroup);
6,195✔
225
  if (!pGroup->mutexInited && (taosThreadMutexInit(&pGroup->mutex, NULL) == 0)) {
6,195✔
226
    pGroup->mutexInited = true;
6,195✔
227
  }
228

229
  return pGroup->mutexInited ? 0 : -1;
6,195✔
230
}
231

232
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
24,114✔
233
  mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
24,114✔
234
  if (pGroup->mutexInited) {
24,114✔
235
    (void)taosThreadMutexDestroy(&pGroup->mutex);
6,195✔
236
    pGroup->mutexInited = false;
6,195✔
237
  }
238
  return 0;
24,114✔
239
}
240

241
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
13,731✔
242
  mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
13,731✔
243
  (void)taosThreadMutexLock(&pOld->mutex);
13,731✔
244

245
  if (pOld->version != pNew->version) {
13,731✔
246
    mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
×
247
          " new version:%" PRId64,
248
          pOld->vgId, pOld, pNew, pOld->version, pNew->version);
249
    goto _OVER;
×
250
  }
251

252
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
41,193✔
253
    tstrncpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
27,462✔
254
  }
255
  pOld->isSync = pNew->isSync;
13,731✔
256
  pOld->assignedLeader.assignedDnodeId = pNew->assignedLeader.assignedDnodeId;
13,731✔
257
  tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
13,731✔
258
  pOld->assignedLeader.assignAcked = pNew->assignedLeader.assignAcked;
13,731✔
259
  pOld->version++;
13,731✔
260
  pOld->code = pNew->code;
13,731✔
261
  pOld->updateTimeMs = pNew->updateTimeMs;
13,731✔
262

263
  mInfo(
13,731✔
264
      "arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
265
      "as-token:%s, as-acked:%d, version:%" PRId64,
266
      pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
267
      pOld->assignedLeader.assignedDnodeId, pOld->assignedLeader.token, pOld->assignedLeader.assignAcked,
268
      pOld->version);
269

270
_OVER:
×
271
  (void)taosThreadMutexUnlock(&pOld->mutex);
13,731✔
272

273
  if (mndIsLeader(pSdb->pMnode)) {
13,731✔
274
    mInfo("arbgroup:%d, remove from arb Update Hash", pOld->vgId);
13,731✔
275
    if (taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)) != 0) {
13,731✔
276
      mError("arbgroup:%d, failed to remove from arb Update Hash", pOld->vgId);
4,188✔
277
    }
278
  }
279
  return 0;
13,731✔
280
}
281

282
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
283
  int32_t  code = 0;
×
284
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
285
  if (pRedoRaw == NULL) {
×
286
    code = terrno;
×
287
    TAOS_RETURN(code);
×
288
  }
289
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
290
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
×
291
  return 0;
×
292
}
293

294
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
5,942✔
295
  int32_t  code = 0;
5,942✔
296
  SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
5,942✔
297
  if (pUndoRaw == NULL) {
5,942✔
298
    code = terrno;
×
299
    TAOS_RETURN(code);
×
300
  }
301
  if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
5,942✔
302
  if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
5,942✔
303
  return 0;
5,942✔
304
}
305

306
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
46,489✔
307
  int32_t  code = 0;
46,489✔
308
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
46,489✔
309
  if (pCommitRaw == NULL) {
46,489✔
310
    code = terrno;
×
311
    TAOS_RETURN(code);
×
312
  }
313
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
46,489✔
314
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
46,489✔
315
  return 0;
46,489✔
316
}
317

318
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
4,188✔
319
  int32_t  code = 0;
4,188✔
320
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
4,188✔
321
  if (pRedoRaw == NULL) {
4,188✔
322
    code = terrno;
×
323
    TAOS_RETURN(code);
×
324
  }
325
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
4,188✔
326
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
4,188✔
327
  return 0;
4,188✔
328
}
329

330
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
331
  int32_t  code = 0;
×
332
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
333
  if (pRedoRaw == NULL) {
×
334
    code = terrno;
×
335
    TAOS_RETURN(code);
×
336
  }
337
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
338
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
339
  return 0;
×
340
}
341

342
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
4,188✔
343
  int32_t  code = 0;
4,188✔
344
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
4,188✔
345
  if (pCommitRaw == NULL) {
4,188✔
346
    code = terrno;
×
347
    TAOS_RETURN(code);
×
348
  }
349
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
4,188✔
350
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
4,188✔
351
  return 0;
4,188✔
352
}
353

354
static void *mndBuildArbHeartBeatReq(int32_t *pContLen, char *arbToken, int32_t dnodeId, int64_t arbTerm,
107,836✔
355
                                     SArray *hbMembers) {
356
  SVArbHeartBeatReq req = {0};
107,836✔
357
  req.dnodeId = dnodeId;
107,836✔
358
  req.arbToken = arbToken;
107,836✔
359
  req.arbTerm = arbTerm;
107,836✔
360
  req.hbMembers = hbMembers;
107,836✔
361

362
  int32_t contLen = tSerializeSVArbHeartBeatReq(NULL, 0, &req);
107,836✔
363
  if (contLen <= 0) return NULL;
107,836✔
364

365
  void *pReq = rpcMallocCont(contLen);
107,836✔
366
  if (pReq == NULL) return NULL;
107,836✔
367

368
  if (tSerializeSVArbHeartBeatReq(pReq, contLen, &req) <= 0) {
107,836✔
369
    rpcFreeCont(pReq);
×
370
    return NULL;
×
371
  }
372
  *pContLen = contLen;
107,836✔
373
  return pReq;
107,836✔
374
}
375

376
static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t arbTerm, SArray *hbMembers) {
107,836✔
377
  int32_t contLen = 0;
107,836✔
378
  void   *pHead = mndBuildArbHeartBeatReq(&contLen, arbToken, pDnode->id, arbTerm, hbMembers);
107,836✔
379
  if (pHead == NULL) {
107,836✔
380
    mError("arbgroup:0, dnodeId:%d, failed to build arb-hb request", pDnode->id);
×
381
    return -1;
×
382
  }
383
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_HEARTBEAT, .pCont = pHead, .contLen = contLen};
107,836✔
384

385
  SEpSet epSet = mndGetDnodeEpset(pDnode);
107,836✔
386
  if (epSet.numOfEps == 0) {
107,836✔
387
    mError("arbgroup:0, dnodeId:%d, failed to send arb-hb request to dnode since no epSet found", pDnode->id);
×
388
    rpcFreeCont(pHead);
×
389
    return -1;
×
390
  }
391

392
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
107,836✔
393
  if (code != 0) {
107,836✔
394
    mError("arbgroup:0, dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code);
×
395
  } else {
396
    if (tsSyncLogHeartbeat) {
107,836✔
397
      mInfo("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id);
×
398
    } else {
399
      mTrace("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id);
107,836✔
400
    }
401
  }
402
  return code;
107,836✔
403
}
404

405
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
11,281,381✔
406
  int32_t    code = 0;
11,281,381✔
407
  SMnode    *pMnode = pReq->info.node;
11,281,381✔
408
  SSdb      *pSdb = pMnode->pSdb;
11,281,381✔
409
  SArbGroup *pArbGroup = NULL;
11,281,381✔
410
  void      *pIter = NULL;
11,281,381✔
411

412
  SHashObj *pDnodeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
11,281,381✔
413

414
  // collect member of same dnode
415
  while (1) {
416
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
11,342,279✔
417
    if (pIter == NULL) break;
11,342,279✔
418

419
    (void)taosThreadMutexLock(&pArbGroup->mutex);
60,898✔
420

421
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
182,694✔
422
      SArbGroupMember *pMember = &pArbGroup->members[i];
121,796✔
423
      int32_t          dnodeId = pMember->info.dnodeId;
121,796✔
424
      void            *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
121,796✔
425
      SArray          *hbMembers = NULL;
121,796✔
426
      if (pObj) {
121,796✔
427
        hbMembers = *(SArray **)pObj;
13,960✔
428
      } else {
429
        hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
107,836✔
430
        if (taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES) != 0) {
107,836✔
431
          mError("arbgroup:0, dnodeId:%d, failed to push hb member inty]o hash, but conitnue next at this timer round",
×
432
                 dnodeId);
433
        }
434
      }
435
      SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
121,796✔
436
      if (taosArrayPush(hbMembers, &reqMember) == NULL) {
243,592✔
437
        mError("arbgroup:0, dnodeId:%d, failed to push hb member, but conitnue next at this timer round", dnodeId);
×
438
      }
439
    }
440

441
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
60,898✔
442
    sdbRelease(pSdb, pArbGroup);
60,898✔
443
  }
444

445
  char arbToken[TSDB_ARB_TOKEN_SIZE];
11,279,999✔
446
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
11,281,381✔
447
    mError("arbgroup:0, failed to get arb token for arb-hb timer");
×
448
    pIter = taosHashIterate(pDnodeHash, NULL);
×
449
    while (pIter) {
×
450
      SArray *hbMembers = *(SArray **)pIter;
×
451
      taosArrayDestroy(hbMembers);
×
452
      pIter = taosHashIterate(pDnodeHash, pIter);
×
453
    }
454
    taosHashCleanup(pDnodeHash);
×
455
    TAOS_RETURN(code);
×
456
  }
457

458
  int64_t nowMs = taosGetTimestampMs();
11,281,381✔
459

460
  pIter = NULL;
11,281,381✔
461
  while (1) {
107,836✔
462
    pIter = taosHashIterate(pDnodeHash, pIter);
11,389,217✔
463
    if (pIter == NULL) break;
11,389,217✔
464

465
    int32_t dnodeId = *(int32_t *)taosHashGetKey(pIter, NULL);
107,836✔
466
    SArray *hbMembers = *(SArray **)pIter;
107,836✔
467

468
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
107,836✔
469
    if (pDnode == NULL) {
107,836✔
470
      mError("arbgroup:0, dnodeId:%d, timer failed to acquire dnode", dnodeId);
×
471
      taosArrayDestroy(hbMembers);
×
472
      continue;
×
473
    }
474

475
    int64_t mndTerm = mndGetTerm(pMnode);
107,836✔
476

477
    if (mndIsDnodeOnline(pDnode, nowMs)) {
107,836✔
478
      int32_t sendCode = mndSendArbHeartBeatReq(pDnode, arbToken, mndTerm, hbMembers);
107,836✔
479
      if (TSDB_CODE_SUCCESS != sendCode) {
107,836✔
480
        mError("arbgroup:0, dnodeId:%d, timer failed to send arb-hb request", dnodeId);
×
481
      }
482
    }
483

484
    mndReleaseDnode(pMnode, pDnode);
107,836✔
485
    taosArrayDestroy(hbMembers);
107,836✔
486
  }
487
  taosHashCleanup(pDnodeHash);
11,281,381✔
488

489
  return 0;
11,281,381✔
490
}
491

492
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
17,223✔
493
                                     char *member0Token, char *member1Token) {
494
  SVArbCheckSyncReq req = {0};
17,223✔
495
  req.arbToken = arbToken;
17,223✔
496
  req.arbTerm = arbTerm;
17,223✔
497
  req.member0Token = member0Token;
17,223✔
498
  req.member1Token = member1Token;
17,223✔
499

500
  int32_t reqLen = tSerializeSVArbCheckSyncReq(NULL, 0, &req);
17,223✔
501
  int32_t contLen = reqLen + sizeof(SMsgHead);
17,223✔
502

503
  if (contLen <= 0) return NULL;
17,223✔
504
  SMsgHead *pHead = rpcMallocCont(contLen);
17,223✔
505
  if (pHead == NULL) return NULL;
17,223✔
506

507
  pHead->contLen = htonl(contLen);
17,223✔
508
  pHead->vgId = htonl(vgId);
17,223✔
509
  if (tSerializeSVArbCheckSyncReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
17,223✔
510
    rpcFreeCont(pHead);
×
511
    return NULL;
×
512
  }
513
  *pContLen = contLen;
17,223✔
514
  return pHead;
17,223✔
515
}
516

517
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
17,223✔
518
                                      char *member1Token) {
519
  int32_t code = 0;
17,223✔
520
  int32_t contLen = 0;
17,223✔
521
  void   *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
17,223✔
522
  if (!pHead) {
17,223✔
523
    mError("arbgroup:%d, failed to build check-sync request", vgId);
×
524
    return -1;
×
525
  }
526
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_CHECK_SYNC, .pCont = pHead, .contLen = contLen};
17,223✔
527
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
17,223✔
528
  TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
17,223✔
529
  
530
  SEpSet epSet = mndGetVgroupEpsetById(pMnode, vgId);
17,223✔
531
  if (epSet.numOfEps == 0) {
17,223✔
532
    mError("arbgroup:%d, failed to send check-sync request since no epSet found", vgId);
×
533
    rpcFreeCont(pHead);
×
534
    code = -1;
×
535
    if (terrno != 0) code = terrno;
×
536
    TAOS_RETURN(code);
×
537
  }
538

539
  code = tmsgSendReq(&epSet, &rpcMsg);
17,223✔
540
  if (code != 0) {
17,223✔
541
    mError("arbgroup:%d, failed to send check-sync request since 0x%x", vgId, code);
×
542
  } else {
543
    mDebug("arbgroup:%d, send check-sync request", vgId);
17,223✔
544
  }
545
  return code;
17,223✔
546
}
547

548
static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) {
81,670✔
549
  SArbGroupMember *pArbMember = &pArbGroup->members[index];
81,670✔
550
  return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutMs);
81,670✔
551
}
552

553
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
10,784✔
554
                                             char *memberToken, bool force) {
555
  SVArbSetAssignedLeaderReq req = {0};
10,784✔
556
  req.arbToken = arbToken;
10,784✔
557
  req.arbTerm = arbTerm;
10,784✔
558
  req.memberToken = memberToken;
10,784✔
559
  if (force) req.force = 1;
10,784✔
560

561
  int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
10,784✔
562
  int32_t contLen = reqLen + sizeof(SMsgHead);
10,784✔
563

564
  if (contLen <= 0) return NULL;
10,784✔
565
  SMsgHead *pHead = rpcMallocCont(contLen);
10,784✔
566
  if (pHead == NULL) return NULL;
10,784✔
567

568
  pHead->contLen = htonl(contLen);
10,784✔
569
  pHead->vgId = htonl(vgId);
10,784✔
570
  if (tSerializeSVArbSetAssignedLeaderReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
10,784✔
571
    rpcFreeCont(pHead);
×
572
    return NULL;
×
573
  }
574
  *pContLen = contLen;
10,784✔
575
  return pHead;
10,784✔
576
}
577

578
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
10,784✔
579
                                              int64_t term, char *memberToken, bool force) {
580
  int32_t code = 0;
10,784✔
581
  int32_t contLen = 0;
10,784✔
582
  void   *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force);
10,784✔
583
  if (!pHead) {
10,784✔
584
    mError("arbgroup:%d, failed to build set-assigned request", vgId);
×
585
    code = -1;
×
586
    if (terrno != 0) code = terrno;
×
587
    TAOS_RETURN(code);
×
588
  }
589
  SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
10,784✔
590

591
  SEpSet epSet = mndGetDnodeEpsetById(pMnode, dnodeId);
10,784✔
592
  if (epSet.numOfEps == 0) {
10,784✔
593
    mError("arbgroup:%d, failed to send arb-set-assigned request to dnode:%d since no epSet found", vgId, dnodeId);
×
594
    rpcFreeCont(pHead);
×
595
    code = -1;
×
596
    if (terrno != 0) code = terrno;
×
597
    TAOS_RETURN(code);
×
598
  }
599
  code = tmsgSendReq(&epSet, &rpcMsg);
10,784✔
600
  if (code != 0) {
10,784✔
601
    mError("arbgroup:%d, failed to send arb-set-assigned request to dnode:%d since 0x%x", vgId, dnodeId, code);
×
602
  } else {
603
    mInfo("arbgroup:%d, send arb-set-assigned request to dnode:%d", vgId, dnodeId);
10,784✔
604
  }
605
  return code;
10,784✔
606
}
607

608
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
40,835✔
609
  *pOp = CHECK_SYNC_NONE;
40,835✔
610
  int32_t code = 0;
40,835✔
611

612
  int32_t vgId = pArbGroup->vgId;
40,835✔
613

614
  bool                member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
40,835✔
615
  bool                member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
40,835✔
616
  SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
40,835✔
617
  int32_t             currentAssignedDnodeId = pAssignedLeader->assignedDnodeId;
40,835✔
618

619
  // 1. has assigned && no response => send req
620
  if (currentAssignedDnodeId != 0 && pAssignedLeader->assignAcked == false) {
40,835✔
621
    *pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
10,784✔
622
    return;
10,784✔
623
  }
624

625
  // 2. both of the two members are timeout => skip
626
  if (member0IsTimeout && member1IsTimeout) {
30,051✔
627
    return;
×
628
  }
629

630
  // 3. no member is timeout => check sync
631
  if (member0IsTimeout == false && member1IsTimeout == false) {
30,051✔
632
    // no assigned leader and not sync
633
    if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
25,333✔
634
      *pOp = CHECK_SYNC_CHECK_SYNC;
17,223✔
635
    }
636
    return;
25,333✔
637
  }
638

639
  // 4. one of the members is timeout => set assigned leader
640
  int32_t          candidateIndex = member0IsTimeout ? 1 : 0;
4,718✔
641
  SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
4,718✔
642

643
  // has assigned leader and dnodeId not match => skip
644
  if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
4,718✔
645
    mInfo("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, assigned leader has been set to dnodeId:%d",
×
646
          vgId, pMember->info.dnodeId, currentAssignedDnodeId);
647
    return;
×
648
  }
649

650
  // not sync => skip
651
  if (pArbGroup->isSync == false) {
4,718✔
UNCOV
652
    if (currentAssignedDnodeId == pMember->info.dnodeId) {
×
653
      mDebug("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, arb group is not sync", vgId,
×
654
             pMember->info.dnodeId);
655
    } else {
UNCOV
656
      mInfo("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, arb group is not sync", vgId,
×
657
            pMember->info.dnodeId);
658
    }
UNCOV
659
    *pOp = CHECK_SYNC_CHECK_SYNC;
×
UNCOV
660
    return;
×
661
  }
662

663
  // is sync && no assigned leader => write to sdb
664
  mndArbGroupDupObj(pArbGroup, pNewGroup);
4,718✔
665
  mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
4,718✔
666
  *pOp = CHECK_SYNC_UPDATE;
4,718✔
667
}
668

669
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) {
×
670
  SMnode    *pMnode = pReq->info.node;
×
671
  int32_t    code = -1, lino = 0;
×
672
  SArray    *pArray = NULL;
×
673
  void      *pIter = NULL;
×
674
  SSdb      *pSdb = pMnode->pSdb;
×
675
  SArbGroup *pArbGroup = NULL;
×
676
  int64_t    tss = taosGetTimestampMs();
×
677

678
  SAssignLeaderReq req = {0};
×
679
  if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) {
×
680
    code = TSDB_CODE_INVALID_MSG;
×
681
    goto _exit;
×
682
  }
683

684
  mInfo("arbgroup:0, begin to process assign leader");
×
685

686
  char arbToken[TSDB_ARB_TOKEN_SIZE];
×
687
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
×
688

689
  int64_t term = mndGetTerm(pMnode);
×
690
  if (term < 0) {
×
691
    mError("arbgroup:0, arb failed to get term since %s", terrstr());
×
692
    code = -1;
×
693
    if (terrno != 0) code = terrno;
×
694
    TAOS_RETURN(code);
×
695
  }
696

697
  while (1) {
×
698
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
×
699
    if (pIter == NULL) break;
×
700

701
    SArbGroup arbGroupDup = {0};
×
702

703
    (void)taosThreadMutexLock(&pArbGroup->mutex);
×
704
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
×
705
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
×
706

707
    sdbRelease(pSdb, pArbGroup);
×
708

709
    int32_t dnodeId = 0;
×
710
    for (int32_t i = 0; i < 2; i++) {
×
711
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId);
×
712
      bool       isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
713
      mndReleaseDnode(pMnode, pDnode);
×
714
      if (isonline) {
×
715
        dnodeId = arbGroupDup.members[i].info.dnodeId;
×
716
        break;
×
717
      }
718
    }
719

720
    (void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true);
×
721
    mInfo("arbgroup:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId);
×
722
  }
723

724
  code = 0;
×
725
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
×
726
    int64_t tse = taosGetTimestampMs();
×
727
    double  duration = (double)(tse - tss);
×
728
    duration = duration / 1000;
×
729
    auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen, duration, 0);
×
730
  }
731
_exit:
×
732
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
733
    mError("arbgroup:0, failed to assign leader since %s", tstrerror(code));
×
734
  }
735

736
  tFreeSAssignLeaderReq(&req);
×
737
  TAOS_RETURN(code);
×
738
}
739

740
static int32_t mndArbProcessTimer(SRpcMsg *pReq) {
7,470,143✔
741
  int32_t    code = 0, lino = 0;
7,470,143✔
742
  SMnode    *pMnode = pReq->info.node;
7,470,143✔
743
  SSdb      *pSdb = pMnode->pSdb;
7,470,143✔
744
  SArbGroup *pArbGroup = NULL;
7,470,143✔
745
  void      *pIter = NULL;
7,470,143✔
746
  SArray    *pUpdateArray = NULL;
7,470,143✔
747

748
  char arbToken[TSDB_ARB_TOKEN_SIZE];
7,469,271✔
749
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
7,470,143✔
750

751
  int64_t term = mndGetTerm(pMnode);
7,470,143✔
752
  if (term < 0) {
7,470,143✔
753
    mError("arbgroup:0, arb failed to get term since %s", terrstr());
×
754
    code = -1;
×
755
    if (terrno != 0) code = terrno;
×
756
    TAOS_RETURN(code);
×
757
  }
758

759
  int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
7,470,143✔
760
  int64_t nowMs = taosGetTimestampMs();
7,470,143✔
761
  if (nowMs - roleTimeMs < tsArbHeartBeatIntervalMs * 2) {
7,470,143✔
762
    mInfo("arbgroup:0, arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64,
321,876✔
763
          roleTimeMs, nowMs);
764
    return 0;
321,876✔
765
  }
766

767
  while (1) {
40,835✔
768
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
7,189,102✔
769
    if (pIter == NULL) break;
7,189,102✔
770

771
    SArbGroup arbGroupDup = {0};
40,835✔
772

773
    (void)taosThreadMutexLock(&pArbGroup->mutex);
40,835✔
774
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
40,835✔
775
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
40,835✔
776

777
    sdbRelease(pSdb, pArbGroup);
40,835✔
778

779
    ECheckSyncOp op = CHECK_SYNC_NONE;
40,835✔
780
    SArbGroup    newGroup = {0};
40,835✔
781
    mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
40,835✔
782

783
    int32_t             vgId = arbGroupDup.vgId;
40,835✔
784
    SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
40,835✔
785
    int32_t             assgndDnodeId = pAssgndLeader->assignedDnodeId;
40,835✔
786

787
    switch (op) {
40,835✔
788
      case CHECK_SYNC_NONE:
8,110✔
789
        mTrace("arbgroup:%d, arb skip to send msg by check sync", vgId);
8,110✔
790
        break;
8,110✔
791
      case CHECK_SYNC_SET_ASSIGNED_LEADER:
10,784✔
792
        (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false);
10,784✔
793
        mInfo("arbgroup:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
10,784✔
794
        break;
10,784✔
795
      case CHECK_SYNC_CHECK_SYNC:
17,223✔
796
        (void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
17,223✔
797
                                     arbGroupDup.members[1].state.token);
798
        mInfo("arbgroup:%d, send vnode-arb-check-sync request", vgId);
17,223✔
799
        break;
17,223✔
800
      case CHECK_SYNC_UPDATE:
4,718✔
801
        if (!pUpdateArray) {
4,718✔
802
          pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
4,718✔
803
          if (!pUpdateArray) {
4,718✔
804
            TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
805
          }
806
        }
807

808
        if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
4,718✔
809
          TAOS_CHECK_EXIT(terrno);
×
810
        }
811
        break;
4,718✔
812
      default:
×
813
        mError("arbgroup:%d, arb unknown check sync op:%d", vgId, op);
×
814
        break;
×
815
    }
816
  }
817

818
  TAOS_CHECK_EXIT(mndArbPutBatchUpdateIntoWQ(pMnode, pUpdateArray));
7,148,267✔
819

820
_exit:
7,148,267✔
821
  if (code != 0) {
7,148,267✔
822
    mError("arbgroup:0, failed to check sync at line %d since %s", lino, terrstr());
×
823
  }
824

825
  taosArrayDestroy(pUpdateArray);
7,148,267✔
826
  return 0;
7,148,267✔
827
}
828

829
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
39,151✔
830
  SMArbUpdateGroupBatchReq req = {0};
39,151✔
831
  req.updateArray = updateArray;
39,151✔
832

833
  int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
39,151✔
834
  if (contLen <= 0) return NULL;
39,151✔
835
  SMsgHead *pHead = rpcMallocCont(contLen);
39,151✔
836
  if (pHead == NULL) return NULL;
39,151✔
837

838
  if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
39,151✔
839
    rpcFreeCont(pHead);
×
840
    return NULL;
×
841
  }
842
  *pContLen = contLen;
39,151✔
843
  return pHead;
39,151✔
844
}
845

846
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
40,547✔
847
  outGroup->vgId = pGroup->vgId;
40,547✔
848
  outGroup->dbUid = pGroup->dbUid;
40,547✔
849
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
121,641✔
850
    outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
81,094✔
851
    outGroup->members[i].token = pGroup->members[i].state.token;  // just copy the pointer
81,094✔
852
  }
853
  outGroup->isSync = pGroup->isSync;
40,547✔
854
  outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.assignedDnodeId;
40,547✔
855
  outGroup->assignedLeader.token = pGroup->assignedLeader.token;  // just copy the pointer
40,547✔
856
  outGroup->assignedLeader.acked = pGroup->assignedLeader.assignAcked;
40,547✔
857
  outGroup->version = pGroup->version;
40,547✔
858
  outGroup->code = pGroup->code;
40,547✔
859
  outGroup->updateTimeMs = pGroup->updateTimeMs;
40,547✔
860
}
40,547✔
861

862
static int32_t mndArbPutUpdateArbIntoWQ(SMnode *pMnode, SArbGroup *pNewGroup) {
1,593✔
863
  if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
1,593✔
864
    mInfo("arbgroup:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
256✔
865
    return 0;
256✔
866
  }
867

868
  int32_t ret = -1;
1,337✔
869

870
  SMArbUpdateGroup newGroup = {0};
1,337✔
871
  mndInitArbUpdateGroup(pNewGroup, &newGroup);
1,337✔
872

873
  SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
1,337✔
874
  if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
1,337✔
875

876
  int32_t contLen = 0;
1,337✔
877
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
1,337✔
878
  if (!pHead) {
1,337✔
879
    mError("arbgroup:0, failed to build arb-update-group request");
×
880
    goto _OVER;
×
881
  }
882

883
  SRpcMsg rpcMsg = {
1,337✔
884
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
885
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1,337✔
886
  if (ret != 0) goto _OVER;
1,337✔
887

888
  mInfo("arbgroup:%d, put into arb update hash", pNewGroup->vgId);
1,337✔
889
  if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
1,337✔
890

891
_OVER:
1,337✔
892
  taosArrayDestroy(pArray);
1,337✔
893
  if (ret != 0) {
1,337✔
894
    mError("arbgroup:%d, failed to put arb group update into write queue since %s", pNewGroup->vgId, tstrerror(ret));
×
895
  }
896
  return ret;
1,337✔
897
}
898

899
static int32_t mndArbPutBatchUpdateIntoWQ(SMnode *pMnode, SArray *newGroupArray) {
7,256,103✔
900
  int32_t ret = -1;
7,256,103✔
901

902
  size_t  sz = taosArrayGetSize(newGroupArray);
7,256,103✔
903
  SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
7,256,103✔
904
  for (size_t i = 0; i < sz; i++) {
7,346,248✔
905
    SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
90,145✔
906
    if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
90,145✔
907
      mInfo("arbgroup:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
50,935✔
908
      continue;
50,935✔
909
    }
910

911
    SMArbUpdateGroup newGroup = {0};
39,210✔
912
    mndInitArbUpdateGroup(pNewGroup, &newGroup);
39,210✔
913

914
    if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
39,210✔
915
    mInfo("arbgroup:%d, put into arb update hash in array", pNewGroup->vgId);
39,210✔
916
    if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) {
39,210✔
917
      mError("arbgroup:%d, failed to put into arb update hash since %s", pNewGroup->vgId, tstrerror(ret));
×
918
      goto _OVER;
×
919
    }
920
  }
921

922
  if (taosArrayGetSize(pArray) == 0) {
7,256,103✔
923
    ret = 0;
7,218,289✔
924
    goto _OVER;
7,218,289✔
925
  }
926

927
  int32_t contLen = 0;
37,814✔
928
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
37,814✔
929
  if (!pHead) {
37,814✔
930
    mError("arbgroup:0, failed to build arb-update-group request");
×
931
    goto _OVER;
×
932
  }
933

934
  SRpcMsg rpcMsg = {
37,814✔
935
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
936
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
37,814✔
937

938
_OVER:
7,256,103✔
939
  taosArrayDestroy(pArray);
7,256,103✔
940

941
  if (ret != 0) {
7,256,103✔
942
    mError("arbgroup:0, failed to put arb group update into write queue since %s", tstrerror(ret));
×
943
    for (size_t i = 0; i < sz; i++) {
×
944
      SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
945
      if (taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != 0) {
×
946
        mError("arbgroup:%d, failed to remove from arb Update Hash", pNewGroup->vgId);
×
947
      }
948
    }
949
  }
950

951
  return ret;
7,256,103✔
952
}
953

954
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
39,151✔
955
  int    code = -1;
39,151✔
956
  size_t sz = 0;
39,151✔
957

958
  SMArbUpdateGroupBatchReq req = {0};
39,151✔
959
  if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
39,151✔
960
    mError("arbgroup:0, arb failed to decode arb-update-group request");
×
961
    TAOS_RETURN(code);
×
962
  }
963

964
  SMnode *pMnode = pReq->info.node;
39,151✔
965
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
39,151✔
966
  if (pTrans == NULL) {
39,151✔
967
    mError("arbgroup:0, failed to create update arbgroup trans, since %s", terrstr());
×
968
    tFreeSMArbUpdateGroupBatchReq(&req);
×
969
    TAOS_RETURN(terrno);
×
970
  }
971

972
  sz = taosArrayGetSize(req.updateArray);
39,151✔
973
  for (size_t i = 0; i < sz; i++) {
79,698✔
974
    SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
40,547✔
975
    SArbGroup         newGroup = {0};
40,547✔
976
    newGroup.vgId = pUpdateGroup->vgId;
40,547✔
977
    newGroup.dbUid = pUpdateGroup->dbUid;
40,547✔
978
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
121,641✔
979
      newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
81,094✔
980
      tstrncpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
81,094✔
981
    }
982

983
    newGroup.isSync = pUpdateGroup->isSync;
40,547✔
984
    newGroup.assignedLeader.assignedDnodeId = pUpdateGroup->assignedLeader.dnodeId;
40,547✔
985
    tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
40,547✔
986
    newGroup.assignedLeader.assignAcked = pUpdateGroup->assignedLeader.acked;
40,547✔
987
    newGroup.version = pUpdateGroup->version;
40,547✔
988
    newGroup.code = pUpdateGroup->code;
40,547✔
989
    newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
40,547✔
990

991
    mInfo(
40,547✔
992
        "trans:%d, arbgroup:%d, used to update member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
993
        "%" PRId64,
994
        pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
995
        newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
996
        newGroup.assignedLeader.assignedDnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.assignAcked,
997
        pUpdateGroup->code, pUpdateGroup->updateTimeMs);
998

999
    SArbGroup *pOldGroup = mndAcquireArbGroup(pMnode, newGroup.vgId);
40,547✔
1000
    if (!pOldGroup) {
40,547✔
1001
      mError("trans:%d, arbgroup:%d, arb skip to update arbgroup, since no obj found", pTrans->id, newGroup.vgId);
×
1002
      if (taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)) != 0) {
×
1003
        mError("trans:%d, arbgroup:%d, failed to remove from arb Update Hash", pTrans->id, newGroup.vgId);
×
1004
      }
1005
      continue;
×
1006
    }
1007

1008
    mndTransAddArbGroupId(pTrans, newGroup.vgId);
40,547✔
1009

1010
    if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
40,547✔
1011
      mError("trans:%d, arbgroup:%d, failed to update arbgroup in set commit log since %s", pTrans->id, newGroup.vgId,
×
1012
             tstrerror(code));
1013
      mndReleaseArbGroup(pMnode, pOldGroup);
×
1014
      goto _OVER;
×
1015
    }
1016

1017
    mInfo("trans:%d, arbgroup:%d, used to update member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
40,547✔
1018
          pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
1019
          newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
1020
          newGroup.assignedLeader.assignedDnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.assignAcked);
1021

1022
    mndReleaseArbGroup(pMnode, pOldGroup);
40,547✔
1023
  }
1024

1025
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
39,151✔
1026
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
8,147✔
1027

1028
  code = 0;
8,147✔
1029

1030
_OVER:
39,151✔
1031
  if (code != 0) {
39,151✔
1032
    // failed to update arbgroup
1033
    mError("trans:%d, arbgroup:0, failed to update arbgroup since %s", pTrans->id, tstrerror(code));
31,004✔
1034
    for (size_t i = 0; i < sz; i++) {
62,008✔
1035
      SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
31,004✔
1036
      if (taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)) != 0) {
31,004✔
1037
        mError("trans:%d, arbgroup:%d failed to remove from arb Update Hash", pTrans->id, pUpdateGroup->vgId);
×
1038
      }
1039
    }
1040
  }
1041

1042
  mndTransDrop(pTrans);
39,151✔
1043
  tFreeSMArbUpdateGroupBatchReq(&req);
39,151✔
1044
  return code;
39,151✔
1045
}
1046

1047
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
132,573✔
1048
  (void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
132,573✔
1049
}
132,573✔
1050

1051
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
4,718✔
1052
  SArbGroupMember *pMember = &pGroup->members[index];
4,718✔
1053

1054
  pGroup->assignedLeader.assignedDnodeId = pMember->info.dnodeId;
4,718✔
1055
  tstrncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
4,718✔
1056
  pGroup->assignedLeader.assignAcked = false;
4,718✔
1057
}
4,718✔
1058

1059
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
7,414✔
1060
  pGroup->assignedLeader.assignedDnodeId = 0;
7,414✔
1061
  (void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
7,414✔
1062
  pGroup->assignedLeader.assignAcked = false;
7,414✔
1063
}
7,414✔
1064

1065
bool mndArbIsNeedUpdateTokenByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
96,184✔
1066
                                        SArbGroup *pNewGroup) {
1067
  bool             updateToken = false;
96,184✔
1068
  SArbGroupMember *pMember = NULL;
96,184✔
1069

1070
  (void)taosThreadMutexLock(&pGroup->mutex);
96,184✔
1071

1072
  int index = 0;
96,184✔
1073
  for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
144,276✔
1074
    pMember = &pGroup->members[index];
144,276✔
1075
    if (pMember->info.dnodeId == dnodeId) {
144,276✔
1076
      break;
96,184✔
1077
    }
1078
    pMember = NULL;
48,092✔
1079
  }
1080

1081
  if (pMember == NULL) {
96,184✔
1082
    mError("arbgroup:%d, arb token update check failed, dnodeId:%d not found", pRspMember->vgId, dnodeId);
×
1083
    goto _OVER;
×
1084
  }
1085

1086
  if (pMember->state.responsedHbSeq >= pRspMember->hbSeq) {
96,184✔
1087
    // skip
1088
    mError("arbgroup:%d, dnodeId:%d skip arb token update, heart beat seq expired, local:%d msg:%d", pRspMember->vgId,
×
1089
           dnodeId, pMember->state.responsedHbSeq, pRspMember->hbSeq);
1090
    goto _OVER;
×
1091
  }
1092

1093
  // update hb state
1094
  pMember->state.responsedHbSeq = pRspMember->hbSeq;
96,184✔
1095
  pMember->state.lastHbMs = nowMs;
96,184✔
1096
  if (mndArbCheckToken(pMember->state.token, pRspMember->memberToken) == 0) {
96,184✔
1097
    // skip
1098
    mDebug("arbgroup:%d, dnodeId:%d skip arb token update, token matched", pRspMember->vgId, dnodeId);
10,757✔
1099
    goto _OVER;
10,757✔
1100
  }
1101

1102
  // update token
1103
  mndArbGroupDupObj(pGroup, pNewGroup);
85,427✔
1104
  tstrncpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
85,427✔
1105
  pNewGroup->isSync = false;
85,427✔
1106

1107
  bool resetAssigned = false;
85,427✔
1108
  if (pMember->info.dnodeId == pGroup->assignedLeader.assignedDnodeId) {
85,427✔
1109
    mndArbGroupResetAssignedLeader(pNewGroup);
7,414✔
1110
    resetAssigned = true;
7,414✔
1111
  }
1112

1113
  updateToken = true;
85,427✔
1114
  mInfo("arbgroup:%d, need to update token, by heartbeat from dnodeId:%d, resetAssigned:%d", pRspMember->vgId, dnodeId,
85,427✔
1115
        resetAssigned);
1116

1117
_OVER:
×
1118
  (void)taosThreadMutexUnlock(&pGroup->mutex);
96,184✔
1119
  return updateToken;
96,184✔
1120
}
1121

1122
static int32_t mndArbUpdateByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
107,836✔
1123
  int64_t nowMs = taosGetTimestampMs();
107,836✔
1124
  size_t  size = taosArrayGetSize(memberArray);
107,836✔
1125
  SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
107,836✔
1126

1127
  for (size_t i = 0; i < size; i++) {
204,020✔
1128
    SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
96,184✔
1129

1130
    SArbGroup  newGroup = {0};
96,184✔
1131
    SArbGroup *pGroup = mndAcquireArbGroup(pMnode, pRspMember->vgId);
96,184✔
1132
    if (pGroup == NULL) {
96,184✔
1133
      mError("arbgroup:%d failed to update arb token, not found", pRspMember->vgId);
×
1134
      continue;
×
1135
    }
1136

1137
    bool updateToken = mndArbIsNeedUpdateTokenByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
96,184✔
1138
    if (updateToken) {
96,184✔
1139
      if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
85,427✔
1140
        mError("arbgroup:%d, failed to push newGroup to updateArray, but continue at this heartbeat", pRspMember->vgId);
×
1141
      }
1142
    }
1143

1144
    mndReleaseArbGroup(pMnode, pGroup);
96,184✔
1145
  }
1146

1147
  TAOS_CHECK_RETURN(mndArbPutBatchUpdateIntoWQ(pMnode, pUpdateArray));
107,836✔
1148

1149
  taosArrayDestroy(pUpdateArray);
107,836✔
1150
  return 0;
107,836✔
1151
}
1152

1153
bool mndArbIsNeedUpdateSyncStatusByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
1,849✔
1154
                                             bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
1155
  bool updateIsSync = false;
1,849✔
1156

1157
  (void)taosThreadMutexLock(&pGroup->mutex);
1,849✔
1158

1159
  if (pGroup->assignedLeader.assignedDnodeId != 0) {
1,849✔
1160
    terrno = TSDB_CODE_SUCCESS;
×
1161
    mInfo("arbgroup:%d, skip to update arb sync, has assigned leader:%d", vgId, pGroup->assignedLeader.assignedDnodeId);
×
1162
    goto _OVER;
×
1163
  }
1164

1165
  char *local0Token = pGroup->members[0].state.token;
1,849✔
1166
  char *local1Token = pGroup->members[1].state.token;
1,849✔
1167
  if (mndArbCheckToken(local0Token, member0Token) != 0 || mndArbCheckToken(local1Token, member1Token) != 0) {
1,849✔
1168
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
256✔
1169
    mInfo("arbgroup:0, skip to update arb sync, memberToken mismatch local:[%s][%s], msg:[%s][%s]", local0Token,
256✔
1170
          local1Token, member0Token, member1Token);
1171
    goto _OVER;
256✔
1172
  }
1173

1174
  if (pGroup->isSync != newIsSync) {
1,593✔
1175
    mndArbGroupDupObj(pGroup, pNewGroup);
1,593✔
1176
    pNewGroup->isSync = newIsSync;
1,593✔
1177
    pNewGroup->code = code;
1,593✔
1178
    pNewGroup->updateTimeMs = taosGetTimestampMs();
1,593✔
1179

1180
    mInfo("arbgroup:%d, need to update isSync status, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync,
1,593✔
1181
          pNewGroup->updateTimeMs);
1182
    updateIsSync = true;
1,593✔
1183
  }
1184

1185
_OVER:
×
1186
  (void)taosThreadMutexUnlock(&pGroup->mutex);
1,849✔
1187
  return updateIsSync;
1,849✔
1188
}
1189

1190
static int32_t mndArbUpdateByCheckSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token,
1,849✔
1191
                                       bool newIsSync, int32_t rsp_code) {
1192
  int32_t    code = 0;
1,849✔
1193
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, vgId);
1,849✔
1194
  if (pGroup == NULL) {
1,849✔
1195
    mError("arbgroup:%d, failed to update arb sync, not found", vgId);
×
1196
    code = -1;
×
1197
    if (terrno != 0) code = terrno;
×
1198
    TAOS_RETURN(code);
×
1199
  }
1200

1201
  SArbGroup newGroup = {0};
1,849✔
1202
  bool      updateIsSync =
1203
      mndArbIsNeedUpdateSyncStatusByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
1,849✔
1204
  if (updateIsSync) {
1,849✔
1205
    if (mndArbPutUpdateArbIntoWQ(pMnode, &newGroup) != 0) {
1,593✔
1206
      mError("arbgroup:%d, failed to pullup update arb sync, since %s", vgId, terrstr());
×
1207
    }
1208
  }
1209

1210
  mndReleaseArbGroup(pMnode, pGroup);
1,849✔
1211
  return 0;
1,849✔
1212
}
1213

1214
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
107,836✔
1215
  if (pRsp->contLen == 0) {
107,836✔
1216
    mDebug("arbgroup:0, arb hb-rsp contLen is 0");
×
1217
    return 0;
×
1218
  }
1219

1220
  int32_t code = -1;
107,836✔
1221

1222
  SMnode *pMnode = pRsp->info.node;
107,836✔
1223
  SSdb   *pSdb = pMnode->pSdb;
107,836✔
1224

1225
  char arbToken[TSDB_ARB_TOKEN_SIZE];
107,836✔
1226
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
107,836✔
1227
    mError("arbgroup:0, failed to get arb token for arb-hb response");
×
1228
    TAOS_RETURN(code);
×
1229
  }
1230

1231
  SVArbHeartBeatRsp arbHbRsp = {0};
107,836✔
1232
  if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
107,836✔
1233
    mInfo("arbgroup:0, arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1234
    TAOS_RETURN(code);
×
1235
  }
1236

1237
  if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
107,836✔
1238
    mInfo("arbgroup:0, arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]",
×
1239
          arbHbRsp.dnodeId, arbToken, arbHbRsp.arbToken);
1240
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1241
    goto _OVER;
×
1242
  }
1243

1244
  if (tsSyncLogHeartbeat) {
107,836✔
1245
    mInfo("arbgroup:0, receive arb-hb rsp from dnode %d", arbHbRsp.dnodeId);
×
1246
  } else {
1247
    mTrace("arbgroup:0, receive arb-hb rsp from dnode %d", arbHbRsp.dnodeId);
107,836✔
1248
  }
1249

1250
  TAOS_CHECK_GOTO(mndArbUpdateByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
107,836✔
1251
  code = 0;
107,836✔
1252

1253
_OVER:
107,836✔
1254
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
107,836✔
1255
  return code;
107,836✔
1256
}
1257

1258
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
17,223✔
1259
  if (pRsp->contLen == 0) {
17,223✔
1260
    mDebug("arbgroup:0, arb check-sync-rsp contLen is 0");
15,374✔
1261
    return 0;
15,374✔
1262
  }
1263

1264
  int32_t code = -1;
1,849✔
1265

1266
  SMnode *pMnode = pRsp->info.node;
1,849✔
1267
  SSdb   *pSdb = pMnode->pSdb;
1,849✔
1268

1269
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1,849✔
1270
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
1,849✔
1271
    mError("arbgroup:0, failed to get arb token from vnode-arb-check-sync-rsp");
×
1272
    TAOS_RETURN(code);
×
1273
  }
1274

1275
  SVArbCheckSyncRsp syncRsp = {0};
1,849✔
1276
  if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
1,849✔
1277
    mInfo("arbgroup:0, arb vnode-arb-check-sync-rsp deserialize failed, since:%s", tstrerror(pRsp->code));
×
1278
    if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
×
1279
      terrno = TSDB_CODE_SUCCESS;
×
1280
      return 0;
×
1281
    }
1282
    TAOS_RETURN(code);
×
1283
  }
1284

1285
  mInfo("arbgroup:%d, vnode-arb-check-sync-rsp received, QID:0x%" PRIx64 ":0x%" PRIx64 ", seqNum:%" PRIx64
1,849✔
1286
        ", errCode:%d",
1287
        syncRsp.vgId, pRsp->info.traceId.rootId, pRsp->info.traceId.msgId, pRsp->info.seqNum, syncRsp.errCode);
1288
  if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
1,849✔
1289
    mError("arbgroup:%d, skip update arb sync for arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
×
1290
           syncRsp.arbToken);
1291
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1292
    goto _OVER;
×
1293
  }
1294

1295
  bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
1,849✔
1296
  if ((code = mndArbUpdateByCheckSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
1,849✔
1297
                                      syncRsp.errCode)) != 0) {
1298
    mError("arbgroup:%d, failed to update arb sync for since:%s", syncRsp.vgId, terrstr());
×
1299
    goto _OVER;
×
1300
  }
1301

1302
  code = 0;
1,849✔
1303

1304
_OVER:
1,849✔
1305
  tFreeSVArbCheckSyncRsp(&syncRsp);
1,849✔
1306
  TAOS_RETURN(code);
1,849✔
1307
}
1308

1309
bool mndArbIsNeedUpdateAssignedBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
×
1310
                                                   SArbGroup *pNewGroup) {
1311
  bool updateAssigned = false;
×
1312

1313
  (void)taosThreadMutexLock(&pGroup->mutex);
×
1314
  if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
×
1315
    mError("arbgroup:%d, skip update arb assigned for member token mismatch, local:[%s] msg:[%s]", vgId,
×
1316
           pGroup->assignedLeader.token, memberToken);
1317
    goto _OVER;
×
1318
  }
1319

1320
  if (errcode != TSDB_CODE_SUCCESS) {
×
1321
    mError("arbgroup:%d, skip update arb assigned for since:%s", vgId, tstrerror(errcode));
×
1322
    goto _OVER;
×
1323
  }
1324

1325
  if (pGroup->assignedLeader.assignAcked == false) {
×
1326
    mndArbGroupDupObj(pGroup, pNewGroup);
×
1327
    pNewGroup->isSync = false;
×
1328
    pNewGroup->assignedLeader.assignAcked = true;
×
1329

1330
    mInfo("arbgroup:%d, arb received assigned ack", vgId);
×
1331
    updateAssigned = true;
×
1332
    goto _OVER;
×
1333
  }
1334

1335
_OVER:
×
1336
  (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1337
  return updateAssigned;
×
1338
}
1339

1340
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
10,784✔
1341
  if (pRsp->contLen == 0) {
10,784✔
1342
    mDebug("arbgroup:0, arb set-assigned-rsp contLen is 0");
10,784✔
1343
    return 0;
10,784✔
1344
  }
1345

1346
  int32_t code = -1;
×
1347

1348
  SMnode *pMnode = pRsp->info.node;
×
1349
  SSdb   *pSdb = pMnode->pSdb;
×
1350

1351
  char arbToken[TSDB_ARB_TOKEN_SIZE];
×
1352
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1353
    mError("arbgroup:0, failed to get arb token for arb-set-assigned response");
×
1354
    TAOS_RETURN(code);
×
1355
  }
1356

1357
  SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
×
1358
  if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
×
1359
    mError("arbgroup:0, arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1360
    TAOS_RETURN(code);
×
1361
  }
1362

1363
  if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
×
1364
    mError("arbgroup:%d, skip update arb assigned for arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
×
1365
           arbToken, setAssignedRsp.arbToken);
1366
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1367
    goto _OVER;
×
1368
  }
1369

1370
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
×
1371
  if (!pGroup) {
×
1372
    mError("arbgroup:%d, failed to set arb assigned for since:%s", setAssignedRsp.vgId, terrstr());
×
1373
    code = -1;
×
1374
    if (terrno != 0) code = terrno;
×
1375
    goto _OVER;
×
1376
  }
1377

1378
  SArbGroup newGroup = {0};
×
1379
  bool      updateAssigned = mndArbIsNeedUpdateAssignedBySetAssignedLeader(
×
1380
      pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken, pRsp->code, &newGroup);
1381
  if (updateAssigned) {
×
1382
    if ((code = mndArbPutUpdateArbIntoWQ(pMnode, &newGroup)) != 0) {
×
1383
      mError("arbgroup:%d, failed to pullup update arb assigned since:%s", setAssignedRsp.vgId, tstrerror(code));
×
1384
      goto _OVER;
×
1385
    }
1386
  }
1387

1388
  mndReleaseArbGroup(pMnode, pGroup);
×
1389

1390
  code = 0;
×
1391

1392
_OVER:
×
1393
  tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
×
1394
  return code;
×
1395
}
1396

1397
static char *formatTimestamp(char *buf, int64_t val, int precision) {
×
1398
  time_t tt;
×
1399
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1400
    tt = (time_t)(val / 1000000);
×
1401
  }
1402
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
1403
    tt = (time_t)(val / 1000000000);
×
1404
  } else {
1405
    tt = (time_t)(val / 1000);
×
1406
  }
1407

1408
  struct tm tm;
×
1409
  if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
×
1410
    mError("failed to get local time");
×
1411
    return NULL;
×
1412
  }
1413
  size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
×
1414

1415
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1416
    sprintf(buf + pos, ".%06d", (int)(val % 1000000));
×
1417
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
1418
    sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
×
1419
  } else {
1420
    sprintf(buf + pos, ".%03d", (int)(val % 1000));
×
1421
  }
1422

1423
  return buf;
×
1424
}
1425

1426
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1427
  SMnode    *pMnode = pReq->info.node;
×
1428
  SSdb      *pSdb = pMnode->pSdb;
×
1429
  int32_t    numOfRows = 0;
×
1430
  int32_t    cols = 0;
×
1431
  SArbGroup *pGroup = NULL;
×
1432
  int32_t    code = 0;
×
1433
  int32_t    lino = 0;
×
1434

1435
  while (numOfRows < rows) {
×
1436
    pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
×
1437
    if (pShow->pIter == NULL) break;
×
1438

1439
    (void)taosThreadMutexLock(&pGroup->mutex);
×
1440

1441
    cols = 0;
×
1442
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1443
    SVgObj          *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
×
1444
    if (!pVgObj) {
×
1445
      (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1446
      sdbRelease(pSdb, pGroup);
×
1447
      continue;
×
1448
    }
1449
    char dbNameInGroup[TSDB_DB_FNAME_LEN];
×
1450
    tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
×
1451
    sdbRelease(pSdb, pVgObj);
×
1452

1453
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1454
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(dbNameInGroup), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1455
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pGroup, &lino, _OVER);
×
1456

1457
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1458
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false), pGroup, &lino, _OVER);
×
1459

1460
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
1461
      SArbGroupMember *pMember = &pGroup->members[i];
×
1462
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1463
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false), pGroup,
×
1464
                          &lino, _OVER);
1465
    }
1466

1467
    mInfo("arbgroup:%d, arb group sync:%d, code:%s, update time:%" PRId64, pGroup->vgId, pGroup->isSync,
×
1468
          tstrerror(pGroup->code), pGroup->updateTimeMs);
1469

1470
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1471
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
×
1472

1473
    char strCheckSyncCode[100] = {0};
×
1474
    char bufUpdateTime[40] = {0};
×
1475
    (void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
×
1476
    (void)tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
×
1477

1478
    char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
×
1479
    STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);
×
1480
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1481
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)checkSyncCode, false), pGroup, &lino, _OVER);
×
1482

1483
    if (pGroup->assignedLeader.assignedDnodeId != 0) {
×
1484
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1485
      RETRIEVE_CHECK_GOTO(
×
1486
          colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.assignedDnodeId, false), pGroup,
1487
          &lino, _OVER);
1488

1489
      char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
×
1490
      STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1491
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1492
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)token, false), pGroup, &lino, _OVER);
×
1493

1494
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1495
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.assignAcked, false),
×
1496
                          pGroup, &lino, _OVER);
1497
    } else {
1498
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1499
      colDataSetNULL(pColInfo, numOfRows);
×
1500

1501
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1502
      colDataSetNULL(pColInfo, numOfRows);
×
1503

1504
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1505
      colDataSetNULL(pColInfo, numOfRows);
×
1506
    }
1507

1508
    (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1509

1510
    numOfRows++;
×
1511
    sdbRelease(pSdb, pGroup);
×
1512
  }
1513

1514
_OVER:
×
1515
  if (code != 0) mError("arbgroup:0, failed to restrieve arb group at line:%d, since %s", lino, tstrerror(code));
×
1516
  pShow->numOfRows += numOfRows;
×
1517

1518
  return numOfRows;
×
1519
}
1520

1521
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter) {
×
1522
  SSdb *pSdb = pMnode->pSdb;
×
1523
  sdbCancelFetchByType(pSdb, pIter, SDB_ARBGROUP);
×
1524
}
×
1525

1526
int32_t mndGetArbGroupSize(SMnode *pMnode) {
×
1527
  SSdb *pSdb = pMnode->pSdb;
×
1528
  return sdbGetSize(pSdb, SDB_ARBGROUP);
×
1529
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc