• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3608

12 Feb 2025 05:57AM UTC coverage: 63.066% (+1.4%) from 61.715%
#3608

push

travis-ci

web-flow
Merge pull request #29746 from taosdata/merge/mainto3.02

merge: from main to 3.0 branch

140199 of 286257 branches covered (48.98%)

Branch coverage included in aggregate %.

89 of 161 new or added lines in 18 files covered. (55.28%)

3211 existing lines in 190 files now uncovered.

218998 of 283298 relevant lines covered (77.3%)

5949310.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.62
/source/dnode/mnode/impl/src/mndArbGroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndArbGroup.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23

24
#define ARBGROUP_VER_NUMBER   1
25
#define ARBGROUP_RESERVE_SIZE 51
26

27
static SHashObj *arbUpdateHash = NULL;
28

29
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
30
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
31
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
32

33
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
34
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
35
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
36

37
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup);
38
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray);
39

40
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
41
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
42
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
43
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
44
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
45
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
46
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
48

49
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
75✔
50
  if (token1 == NULL || token2 == NULL) return -1;
75!
51
  if (strlen(token1) == 0 || strlen(token2) == 0) return -1;
75!
52
  return strncmp(token1, token2, TSDB_ARB_TOKEN_SIZE);
54✔
53
}
54

55
int32_t mndInitArbGroup(SMnode *pMnode) {
1,784✔
56
  int32_t   code = 0;
1,784✔
57
  SSdbTable table = {
1,784✔
58
      .sdbType = SDB_ARBGROUP,
59
      .keyType = SDB_KEY_INT32,
60
      .encodeFp = (SdbEncodeFp)mndArbGroupActionEncode,
61
      .decodeFp = (SdbDecodeFp)mndArbGroupActionDecode,
62
      .insertFp = (SdbInsertFp)mndArbGroupActionInsert,
63
      .updateFp = (SdbUpdateFp)mndArbGroupActionUpdate,
64
      .deleteFp = (SdbDeleteFp)mndArbGroupActionDelete,
65
  };
66

67
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
1,784✔
68
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer);
1,784✔
69
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
1,784✔
70
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
1,784✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
1,784✔
72
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
1,784✔
73

74
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
1,784✔
75
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
1,784✔
76

77
  arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,784✔
78
  if (arbUpdateHash == NULL) {
1,784!
UNCOV
79
    code = terrno;
×
80
    TAOS_RETURN(code);
×
81
  }
82

83
  return sdbSetTable(pMnode->pSdb, table);
1,784✔
84
}
85

86
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
1,783✔
87

UNCOV
88
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
×
89
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
×
90
  if (pGroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
91
    terrno = TSDB_CODE_MND_ARBGROUP_NOT_EXIST;
×
92
  }
UNCOV
93
  return pGroup;
×
94
}
95

UNCOV
96
void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
×
97
  SSdb *pSdb = pMnode->pSdb;
×
98
  sdbRelease(pSdb, pGroup);
×
99
}
×
100

101
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
14✔
102
  if (pVgObj->replica != 2) {
14!
UNCOV
103
    TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
104
  }
105
  (void)memset(outGroup, 0, sizeof(SArbGroup));
14✔
106
  outGroup->dbUid = pVgObj->dbUid;
14✔
107
  outGroup->vgId = pVgObj->vgId;
14✔
108
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
42✔
109
    SArbGroupMember *pMember = &outGroup->members[i];
28✔
110
    pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
28✔
111
  }
112

113
  TAOS_RETURN(TSDB_CODE_SUCCESS);
14✔
114
}
115

116
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
40✔
117
  int32_t code = 0;
40✔
118
  int32_t lino = 0;
40✔
119
  terrno = TSDB_CODE_OUT_OF_MEMORY;
40✔
120

121
  int32_t  size = sizeof(SArbGroup) + ARBGROUP_RESERVE_SIZE;
40✔
122
  SSdbRaw *pRaw = sdbAllocRaw(SDB_ARBGROUP, ARBGROUP_VER_NUMBER, size);
40✔
123
  if (pRaw == NULL) goto _OVER;
40!
124

125
  int32_t dataPos = 0;
40✔
126
  SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
40!
127
  SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
40!
128
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
120✔
129
    SArbGroupMember *pMember = &pGroup->members[i];
80✔
130
    SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
80!
131
    SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
80!
132
  }
133
  SDB_SET_INT8(pRaw, dataPos, pGroup->isSync, _OVER)
40!
134

135
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
40✔
136
  SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
40!
137
  SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
40!
138
  SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
40!
139
  SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
40!
140
  SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
40!
141
  SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
40!
142

143
  SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
40!
144

145
  terrno = 0;
40✔
146

147
_OVER:
40✔
148
  if (terrno != 0) {
40!
149
    mError("arbgroup:%d, failed to encode to raw:%p since %s", pGroup->vgId, pRaw, terrstr());
×
150
    sdbFreeRaw(pRaw);
×
UNCOV
151
    return NULL;
×
152
  }
153

154
  mTrace("arbgroup:%d, encode to raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
40!
155
  return pRaw;
40✔
156
}
157

158
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
32✔
159
  int32_t code = 0;
32✔
160
  int32_t lino = 0;
32✔
161
  terrno = TSDB_CODE_OUT_OF_MEMORY;
32✔
162
  SSdbRow   *pRow = NULL;
32✔
163
  SArbGroup *pGroup = NULL;
32✔
164

165
  int8_t sver = 0;
32✔
166
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
32!
167

168
  if (sver != ARBGROUP_VER_NUMBER) {
32!
169
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
UNCOV
170
    goto _OVER;
×
171
  }
172

173
  pRow = sdbAllocRow(sizeof(SArbGroup));
32✔
174
  if (pRow == NULL) goto _OVER;
32!
175

176
  pGroup = sdbGetRowObj(pRow);
32✔
177
  if (pGroup == NULL) goto _OVER;
32!
178

179
  int32_t dataPos = 0;
32✔
180
  SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
32!
181
  SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
32!
182
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
96✔
183
    SArbGroupMember *pMember = &pGroup->members[i];
64✔
184
    SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
64!
185
    SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
64!
186

187
    pMember->state.nextHbSeq = 0;
64✔
188
    pMember->state.responsedHbSeq = -1;
64✔
189
    pMember->state.lastHbMs = 0;
64✔
190
  }
191
  SDB_GET_INT8(pRaw, dataPos, &pGroup->isSync, _OVER)
32!
192

193
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
32✔
194
  SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
32!
195
  SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
32!
196
  SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
32!
197
  SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
32!
198
  SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
32!
199
  SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
32!
200

201
  pGroup->mutexInited = false;
32✔
202

203
  SDB_GET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
32!
204

205
  terrno = 0;
32✔
206

207
_OVER:
32✔
208
  if (terrno != 0) {
32!
UNCOV
209
    mError("arbgroup:%d, failed to decode from raw:%p since %s", pGroup == NULL ? 0 : pGroup->vgId, pRaw, terrstr());
×
UNCOV
210
    taosMemoryFreeClear(pRow);
×
UNCOV
211
    return NULL;
×
212
  }
213

214
  mTrace("arbgroup:%d, decode from raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
32!
215
  return pRow;
32✔
216
}
217

218
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
8✔
219
  mTrace("arbgroup:%d, perform insert action, row:%p", pGroup->vgId, pGroup);
8!
220
  if (!pGroup->mutexInited && (taosThreadMutexInit(&pGroup->mutex, NULL) == 0)) {
8!
221
    pGroup->mutexInited = true;
8✔
222
  }
223

224
  return pGroup->mutexInited ? 0 : -1;
8!
225
}
226

227
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
31✔
228
  mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
31!
229
  if (pGroup->mutexInited) {
31✔
230
    (void)taosThreadMutexDestroy(&pGroup->mutex);
8✔
231
    pGroup->mutexInited = false;
8✔
232
  }
233
  return 0;
31✔
234
}
235

236
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
17✔
237
  mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
17!
238
  (void)taosThreadMutexLock(&pOld->mutex);
17✔
239

240
  if (pOld->version != pNew->version) {
17!
241
    mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
×
242
          " new version:%" PRId64,
243
          pOld->vgId, pOld, pNew, pOld->version, pNew->version);
UNCOV
244
    goto _OVER;
×
245
  }
246

247
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
51✔
248
    tstrncpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
34✔
249
  }
250
  pOld->isSync = pNew->isSync;
17✔
251
  pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
17✔
252
  tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
17✔
253
  pOld->assignedLeader.acked = pNew->assignedLeader.acked;
17✔
254
  pOld->version++;
17✔
255
  pOld->code = pNew->code;
17✔
256
  pOld->updateTimeMs = pNew->updateTimeMs;
17✔
257

258
  mInfo(
17!
259
      "arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
260
      "as-token:%s, as-acked:%d, version:%" PRId64,
261
      pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
262
      pOld->assignedLeader.dnodeId, pOld->assignedLeader.token, pOld->assignedLeader.acked, pOld->version);
263

UNCOV
264
_OVER:
×
265
  (void)taosThreadMutexUnlock(&pOld->mutex);
17✔
266

267
  if (taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)) != 0) {
17✔
268
    mError("arbgroup:%d, failed to remove from arbUpdateHash", pOld->vgId);
6!
269
  }
270
  return 0;
17✔
271
}
272

273
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
UNCOV
274
  int32_t  code = 0;
×
275
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
276
  if (pRedoRaw == NULL) {
×
277
    code = terrno;
×
UNCOV
278
    TAOS_RETURN(code);
×
279
  }
UNCOV
280
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
UNCOV
281
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
×
UNCOV
282
  return 0;
×
283
}
284

285
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
7✔
286
  int32_t  code = 0;
7✔
287
  SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
7✔
288
  if (pUndoRaw == NULL) {
7!
UNCOV
289
    code = terrno;
×
UNCOV
290
    TAOS_RETURN(code);
×
291
  }
292
  if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
7!
293
  if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
7!
294
  return 0;
7✔
295
}
296

297
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
18✔
298
  int32_t  code = 0;
18✔
299
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
18✔
300
  if (pCommitRaw == NULL) {
18!
UNCOV
301
    code = terrno;
×
UNCOV
302
    TAOS_RETURN(code);
×
303
  }
304
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
18!
305
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
18!
306
  return 0;
18✔
307
}
308

309
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
310
  int32_t  code = 0;
6✔
311
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
6✔
312
  if (pRedoRaw == NULL) {
6!
UNCOV
313
    code = terrno;
×
UNCOV
314
    TAOS_RETURN(code);
×
315
  }
316
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
6!
317
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
6!
318
  return 0;
6✔
319
}
320

321
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
UNCOV
322
  int32_t  code = 0;
×
323
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
324
  if (pRedoRaw == NULL) {
×
325
    code = terrno;
×
UNCOV
326
    TAOS_RETURN(code);
×
327
  }
UNCOV
328
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
UNCOV
329
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
UNCOV
330
  return 0;
×
331
}
332

333
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
334
  int32_t  code = 0;
6✔
335
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
6✔
336
  if (pCommitRaw == NULL) {
6!
UNCOV
337
    code = terrno;
×
UNCOV
338
    TAOS_RETURN(code);
×
339
  }
340
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
6!
341
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
6!
342
  return 0;
6✔
343
}
344

345
static void *mndBuildArbHeartBeatReq(int32_t *pContLen, char *arbToken, int32_t dnodeId, int64_t arbTerm,
26✔
346
                                     SArray *hbMembers) {
347
  SVArbHeartBeatReq req = {0};
26✔
348
  req.dnodeId = dnodeId;
26✔
349
  req.arbToken = arbToken;
26✔
350
  req.arbTerm = arbTerm;
26✔
351
  req.hbMembers = hbMembers;
26✔
352

353
  int32_t contLen = tSerializeSVArbHeartBeatReq(NULL, 0, &req);
26✔
354
  if (contLen <= 0) return NULL;
26!
355

356
  void *pReq = rpcMallocCont(contLen);
26✔
357
  if (pReq == NULL) return NULL;
26!
358

359
  if (tSerializeSVArbHeartBeatReq(pReq, contLen, &req) <= 0) {
26!
UNCOV
360
    rpcFreeCont(pReq);
×
UNCOV
361
    return NULL;
×
362
  }
363
  *pContLen = contLen;
26✔
364
  return pReq;
26✔
365
}
366

367
static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t arbTerm, SArray *hbMembers) {
26✔
368
  int32_t contLen = 0;
26✔
369
  void   *pHead = mndBuildArbHeartBeatReq(&contLen, arbToken, pDnode->id, arbTerm, hbMembers);
26✔
370
  if (pHead == NULL) {
26!
UNCOV
371
    mError("dnodeId:%d, failed to build arb-hb request", pDnode->id);
×
UNCOV
372
    return -1;
×
373
  }
374
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_HEARTBEAT, .pCont = pHead, .contLen = contLen};
26✔
375

376
  SEpSet epSet = mndGetDnodeEpset(pDnode);
26✔
377
  if (epSet.numOfEps == 0) {
26!
UNCOV
378
    mError("dnodeId:%d, failed to send arb-hb request to dnode since no epSet found", pDnode->id);
×
UNCOV
379
    rpcFreeCont(pHead);
×
380
    return -1;
×
381
  }
382

383
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
26✔
384
  if (code != 0) {
26!
UNCOV
385
    mError("dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code);
×
386
  } else {
387
    mTrace("dnodeId:%d, send arb-hb request to dnode", pDnode->id);
26!
388
  }
389
  return code;
26✔
390
}
391

392
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
7,477✔
393
  int32_t    code = 0;
7,477✔
394
  SMnode    *pMnode = pReq->info.node;
7,477✔
395
  SSdb      *pSdb = pMnode->pSdb;
7,477✔
396
  SArbGroup *pArbGroup = NULL;
7,477✔
397
  void      *pIter = NULL;
7,477✔
398

399
  SHashObj *pDnodeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
7,477✔
400

401
  // collect member of same dnode
402
  while (1) {
403
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
7,494✔
404
    if (pIter == NULL) break;
7,494✔
405

406
    (void)taosThreadMutexLock(&pArbGroup->mutex);
17✔
407

408
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
51✔
409
      SArbGroupMember *pMember = &pArbGroup->members[i];
34✔
410
      int32_t          dnodeId = pMember->info.dnodeId;
34✔
411
      void            *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
34✔
412
      SArray          *hbMembers = NULL;
34✔
413
      if (pObj) {
34✔
414
        hbMembers = *(SArray **)pObj;
8✔
415
      } else {
416
        hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
26✔
417
        if (taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES) != 0) {
26!
418
          mError("dnodeId:%d, failed to push hb member inty]o hash, but conitnue next at this timer round", dnodeId);
×
419
        }
420
      }
421
      SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
34✔
422
      if (taosArrayPush(hbMembers, &reqMember) == NULL) {
68!
UNCOV
423
        mError("dnodeId:%d, failed to push hb member, but conitnue next at this timer round", dnodeId);
×
424
      }
425
    }
426

427
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
17✔
428
    sdbRelease(pSdb, pArbGroup);
17✔
429
  }
430

431
  char arbToken[TSDB_ARB_TOKEN_SIZE];
432
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
7,477!
433
    mError("failed to get arb token for arb-hb timer");
×
UNCOV
434
    pIter = taosHashIterate(pDnodeHash, NULL);
×
435
    while (pIter) {
×
436
      SArray *hbMembers = *(SArray **)pIter;
×
UNCOV
437
      taosArrayDestroy(hbMembers);
×
UNCOV
438
      pIter = taosHashIterate(pDnodeHash, pIter);
×
439
    }
UNCOV
440
    taosHashCleanup(pDnodeHash);
×
UNCOV
441
    TAOS_RETURN(code);
×
442
  }
443

444
  int64_t nowMs = taosGetTimestampMs();
7,477✔
445

446
  pIter = NULL;
7,477✔
447
  while (1) {
26✔
448
    pIter = taosHashIterate(pDnodeHash, pIter);
7,503✔
449
    if (pIter == NULL) break;
7,503✔
450

451
    int32_t dnodeId = *(int32_t *)taosHashGetKey(pIter, NULL);
26✔
452
    SArray *hbMembers = *(SArray **)pIter;
26✔
453

454
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
26✔
455
    if (pDnode == NULL) {
26!
UNCOV
456
      mError("dnodeId:%d, timer failed to acquire dnode", dnodeId);
×
UNCOV
457
      taosArrayDestroy(hbMembers);
×
UNCOV
458
      continue;
×
459
    }
460

461
    int64_t mndTerm = mndGetTerm(pMnode);
26✔
462

463
    if (mndIsDnodeOnline(pDnode, nowMs)) {
26!
464
      int32_t sendCode = mndSendArbHeartBeatReq(pDnode, arbToken, mndTerm, hbMembers);
26✔
465
      if (TSDB_CODE_SUCCESS != sendCode) {
26!
UNCOV
466
        mError("dnodeId:%d, timer failed to send arb-hb request", dnodeId);
×
467
      }
468
    }
469

470
    mndReleaseDnode(pMnode, pDnode);
26✔
471
    taosArrayDestroy(hbMembers);
26✔
472
  }
473
  taosHashCleanup(pDnodeHash);
7,477✔
474

475
  return 0;
7,477✔
476
}
477

478
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
3✔
479
                                     char *member0Token, char *member1Token) {
480
  SVArbCheckSyncReq req = {0};
3✔
481
  req.arbToken = arbToken;
3✔
482
  req.arbTerm = arbTerm;
3✔
483
  req.member0Token = member0Token;
3✔
484
  req.member1Token = member1Token;
3✔
485

486
  int32_t reqLen = tSerializeSVArbCheckSyncReq(NULL, 0, &req);
3✔
487
  int32_t contLen = reqLen + sizeof(SMsgHead);
3✔
488

489
  if (contLen <= 0) return NULL;
3!
490
  SMsgHead *pHead = rpcMallocCont(contLen);
3✔
491
  if (pHead == NULL) return NULL;
3!
492

493
  pHead->contLen = htonl(contLen);
3✔
494
  pHead->vgId = htonl(vgId);
3✔
495
  if (tSerializeSVArbCheckSyncReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
3!
UNCOV
496
    rpcFreeCont(pHead);
×
UNCOV
497
    return NULL;
×
498
  }
499
  *pContLen = contLen;
3✔
500
  return pHead;
3✔
501
}
502

503
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
3✔
504
                                      char *member1Token) {
505
  int32_t code = 0;
3✔
506
  int32_t contLen = 0;
3✔
507
  void   *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
3✔
508
  if (!pHead) {
3!
UNCOV
509
    mError("vgId:%d, failed to build check-sync request", vgId);
×
UNCOV
510
    return -1;
×
511
  }
512
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_CHECK_SYNC, .pCont = pHead, .contLen = contLen};
3✔
513

514
  SEpSet epSet = mndGetVgroupEpsetById(pMnode, vgId);
3✔
515
  if (epSet.numOfEps == 0) {
3!
UNCOV
516
    mError("vgId:%d, failed to send check-sync request since no epSet found", vgId);
×
UNCOV
517
    rpcFreeCont(pHead);
×
UNCOV
518
    code = -1;
×
UNCOV
519
    if (terrno != 0) code = terrno;
×
520
    TAOS_RETURN(code);
×
521
  }
522

523
  code = tmsgSendReq(&epSet, &rpcMsg);
3✔
524
  if (code != 0) {
3!
UNCOV
525
    mError("vgId:%d, failed to send check-sync request since 0x%x", vgId, code);
×
526
  } else {
527
    mDebug("vgId:%d, send check-sync request", vgId);
3!
528
  }
529
  return code;
3✔
530
}
531

532
static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) {
24✔
533
  SArbGroupMember *pArbMember = &pArbGroup->members[index];
24✔
534
  return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutSec * 1000);
24✔
535
}
536

537
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
×
538
                                             char *memberToken) {
539
  SVArbSetAssignedLeaderReq req = {0};
×
540
  req.arbToken = arbToken;
×
UNCOV
541
  req.arbTerm = arbTerm;
×
542
  req.memberToken = memberToken;
×
543

544
  int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
×
UNCOV
545
  int32_t contLen = reqLen + sizeof(SMsgHead);
×
546

547
  if (contLen <= 0) return NULL;
×
548
  SMsgHead *pHead = rpcMallocCont(contLen);
×
549
  if (pHead == NULL) return NULL;
×
550

UNCOV
551
  pHead->contLen = htonl(contLen);
×
552
  pHead->vgId = htonl(vgId);
×
553
  if (tSerializeSVArbSetAssignedLeaderReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
×
UNCOV
554
    rpcFreeCont(pHead);
×
UNCOV
555
    return NULL;
×
556
  }
UNCOV
557
  *pContLen = contLen;
×
558
  return pHead;
×
559
}
560

561
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
×
562
                                              int64_t term, char *memberToken) {
563
  int32_t code = 0;
×
564
  int32_t contLen = 0;
×
565
  void   *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken);
×
UNCOV
566
  if (!pHead) {
×
567
    mError("vgId:%d, failed to build set-assigned request", vgId);
×
UNCOV
568
    code = -1;
×
569
    if (terrno != 0) code = terrno;
×
570
    TAOS_RETURN(code);
×
571
  }
572
  SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
×
573

574
  SEpSet epSet = mndGetDnodeEpsetById(pMnode, dnodeId);
×
575
  if (epSet.numOfEps == 0) {
×
UNCOV
576
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since no epSet found", dnodeId, vgId);
×
577
    rpcFreeCont(pHead);
×
578
    code = -1;
×
579
    if (terrno != 0) code = terrno;
×
UNCOV
580
    TAOS_RETURN(code);
×
581
  }
UNCOV
582
  code = tmsgSendReq(&epSet, &rpcMsg);
×
583
  if (code != 0) {
×
UNCOV
584
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since 0x%x", dnodeId, vgId, code);
×
585
  } else {
UNCOV
586
    mInfo("dnodeId:%d vgId:%d, send arb-set-assigned request to dnode", dnodeId, vgId);
×
587
  }
UNCOV
588
  return code;
×
589
}
590

591
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
12✔
592
  *pOp = CHECK_SYNC_NONE;
12✔
593
  int32_t code = 0;
12✔
594

595
  int32_t vgId = pArbGroup->vgId;
12✔
596

597
  bool                member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
12✔
598
  bool                member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
12✔
599
  SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
12✔
600
  int32_t             currentAssignedDnodeId = pAssignedLeader->dnodeId;
12✔
601

602
  // 1. has assigned && no response => send req
603
  if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
12!
604
    *pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
2✔
605
    return;
2✔
606
  }
607

608
  // 2. both of the two members are timeout => skip
609
  if (member0IsTimeout && member1IsTimeout) {
10!
610
    return;
5✔
611
  }
612

613
  // 3. no member is timeout => check sync
614
  if (member0IsTimeout == false && member1IsTimeout == false) {
5!
615
    // no assigned leader and not sync
616
    if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
4!
617
      *pOp = CHECK_SYNC_CHECK_SYNC;
4✔
618
    }
619
    return;
4✔
620
  }
621

622
  // 4. one of the members is timeout => set assigned leader
623
  int32_t          candidateIndex = member0IsTimeout ? 1 : 0;
1✔
624
  SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
1✔
625

626
  // has assigned leader and dnodeId not match => skip
627
  if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
1!
UNCOV
628
    mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, assigned leader has been set to dnodeId:%d", vgId,
×
629
          pMember->info.dnodeId, currentAssignedDnodeId);
630
    return;
×
631
  }
632

633
  // not sync => skip
634
  if (pArbGroup->isSync == false) {
1!
UNCOV
635
    if (currentAssignedDnodeId == pMember->info.dnodeId) {
×
UNCOV
636
      mDebug("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
637
             pMember->info.dnodeId);
638
    } else {
UNCOV
639
      mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
640
            pMember->info.dnodeId);
641
    }
642
    //*pOp = CHECK_SYNC_CHECK_SYNC;
UNCOV
643
    return;
×
644
  }
645

646
  // is sync && no assigned leader => write to sdb
647
  mndArbGroupDupObj(pArbGroup, pNewGroup);
1✔
648
  mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
1✔
649
  *pOp = CHECK_SYNC_UPDATE;
1✔
650
}
651

652
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
3,487✔
653
  int32_t    code = 0, lino = 0;
3,487✔
654
  SMnode    *pMnode = pReq->info.node;
3,487✔
655
  SSdb      *pSdb = pMnode->pSdb;
3,487✔
656
  SArbGroup *pArbGroup = NULL;
3,487✔
657
  void      *pIter = NULL;
3,487✔
658
  SArray    *pUpdateArray = NULL;
3,487✔
659

660
  char arbToken[TSDB_ARB_TOKEN_SIZE];
661
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
3,487!
662

663
  int64_t term = mndGetTerm(pMnode);
3,487✔
664
  if (term < 0) {
3,487!
UNCOV
665
    mError("arb failed to get term since %s", terrstr());
×
UNCOV
666
    code = -1;
×
UNCOV
667
    if (terrno != 0) code = terrno;
×
UNCOV
668
    TAOS_RETURN(code);
×
669
  }
670

671
  int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
3,487✔
672
  int64_t nowMs = taosGetTimestampMs();
3,487✔
673
  if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
3,487✔
674
    mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs,
33!
675
          nowMs);
676
    return 0;
33✔
677
  }
678

679
  while (1) {
8✔
680
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
3,462✔
681
    if (pIter == NULL) break;
3,462✔
682

683
    SArbGroup arbGroupDup = {0};
8✔
684

685
    (void)taosThreadMutexLock(&pArbGroup->mutex);
8✔
686
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
8✔
687
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
8✔
688

689
    sdbRelease(pSdb, pArbGroup);
8✔
690

691
    ECheckSyncOp op = CHECK_SYNC_NONE;
8✔
692
    SArbGroup    newGroup = {0};
8✔
693
    mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
8✔
694

695
    int32_t             vgId = arbGroupDup.vgId;
8✔
696
    SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
8✔
697
    int32_t             assgndDnodeId = pAssgndLeader->dnodeId;
8✔
698

699
    switch (op) {
8!
700
      case CHECK_SYNC_NONE:
5✔
701
        mTrace("vgId:%d, arb skip to send msg by check sync", vgId);
5!
702
        break;
5✔
UNCOV
703
      case CHECK_SYNC_SET_ASSIGNED_LEADER:
×
UNCOV
704
        (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token);
×
UNCOV
705
        mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
×
706
        break;
×
707
      case CHECK_SYNC_CHECK_SYNC:
3✔
708
        (void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
3✔
709
                                     arbGroupDup.members[1].state.token);
710
        mInfo("vgId:%d, arb send check sync request", vgId);
3!
711
        break;
3✔
UNCOV
712
      case CHECK_SYNC_UPDATE:
×
UNCOV
713
        if (!pUpdateArray) {
×
714
          pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
×
715
          if (!pUpdateArray) {
×
UNCOV
716
            TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
717
          }
718
        }
719

720
        if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
×
UNCOV
721
          TAOS_CHECK_EXIT(terrno);
×
722
        }
UNCOV
723
        break;
×
UNCOV
724
      default:
×
UNCOV
725
        mError("vgId:%d, arb unknown check sync op:%d", vgId, op);
×
UNCOV
726
        break;
×
727
    }
728
  }
729

730
  TAOS_CHECK_EXIT(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
3,454!
731

732
_exit:
3,454✔
733
  if (code != 0) {
3,454!
UNCOV
734
    mError("failed to check sync at line %d since %s", lino, terrstr());
×
735
  }
736

737
  taosArrayDestroy(pUpdateArray);
3,454✔
738
  return 0;
3,454✔
739
}
740

741
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
9✔
742
  SMArbUpdateGroupBatchReq req = {0};
9✔
743
  req.updateArray = updateArray;
9✔
744

745
  int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
9✔
746
  if (contLen <= 0) return NULL;
9!
747
  SMsgHead *pHead = rpcMallocCont(contLen);
9✔
748
  if (pHead == NULL) return NULL;
9!
749

750
  if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
9!
UNCOV
751
    rpcFreeCont(pHead);
×
UNCOV
752
    return NULL;
×
753
  }
754
  *pContLen = contLen;
9✔
755
  return pHead;
9✔
756
}
757

758
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
11✔
759
  outGroup->vgId = pGroup->vgId;
11✔
760
  outGroup->dbUid = pGroup->dbUid;
11✔
761
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
33✔
762
    outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
22✔
763
    outGroup->members[i].token = pGroup->members[i].state.token;  // just copy the pointer
22✔
764
  }
765
  outGroup->isSync = pGroup->isSync;
11✔
766
  outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
11✔
767
  outGroup->assignedLeader.token = pGroup->assignedLeader.token;  // just copy the pointer
11✔
768
  outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
11✔
769
  outGroup->version = pGroup->version;
11✔
770
  outGroup->code = pGroup->code;
11✔
771
  outGroup->updateTimeMs = pGroup->updateTimeMs;
11✔
772
}
11✔
773

774
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
2✔
775
  if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
2✔
776
    mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
1!
777
    return 0;
1✔
778
  }
779

780
  int32_t ret = -1;
1✔
781

782
  SMArbUpdateGroup newGroup = {0};
1✔
783
  mndInitArbUpdateGroup(pNewGroup, &newGroup);
1✔
784

785
  SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
1✔
786
  if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
1!
787

788
  int32_t contLen = 0;
1✔
789
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
1✔
790
  if (!pHead) {
1!
UNCOV
791
    mError("failed to build arb-update-group request");
×
UNCOV
792
    goto _OVER;
×
793
  }
794

795
  SRpcMsg rpcMsg = {
1✔
796
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
797
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1✔
798
  if (ret != 0) goto _OVER;
1!
799

800
  if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
1!
801

802
_OVER:
1✔
803
  taosArrayDestroy(pArray);
1✔
804
  return ret;
1✔
805
}
806

807
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray) {
3,480✔
808
  int32_t ret = -1;
3,480✔
809

810
  size_t  sz = taosArrayGetSize(newGroupArray);
3,480✔
811
  SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
3,480✔
812
  for (size_t i = 0; i < sz; i++) {
3,504✔
813
    SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
24✔
814
    if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
24✔
815
      mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
14!
816
      continue;
14✔
817
    }
818

819
    SMArbUpdateGroup newGroup = {0};
10✔
820
    mndInitArbUpdateGroup(pNewGroup, &newGroup);
10✔
821

822
    if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
10!
823
    if (taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0) != 0) goto _OVER;
10!
824
  }
825

826
  if (taosArrayGetSize(pArray) == 0) {
3,480✔
827
    ret = 0;
3,472✔
828
    goto _OVER;
3,472✔
829
  }
830

831
  int32_t contLen = 0;
8✔
832
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
8✔
833
  if (!pHead) {
8!
UNCOV
834
    mError("failed to build arb-update-group request");
×
UNCOV
835
    goto _OVER;
×
836
  }
837

838
  SRpcMsg rpcMsg = {
8✔
839
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
840
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
8✔
841

842
_OVER:
3,480✔
843
  taosArrayDestroy(pArray);
3,480✔
844

845
  if (ret != 0) {
3,480!
UNCOV
846
    for (size_t i = 0; i < sz; i++) {
×
UNCOV
847
      SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
UNCOV
848
      if (taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != 0) {
×
UNCOV
849
        mError("failed to remove vgId:%d from arbUpdateHash", pNewGroup->vgId);
×
850
      }
851
    }
852
  }
853

854
  return ret;
3,480✔
855
}
856

857
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
9✔
858
  int    code = -1;
9✔
859
  size_t sz = 0;
9✔
860

861
  SMArbUpdateGroupBatchReq req = {0};
9✔
862
  if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
9!
863
    mError("arb failed to decode arb-update-group request");
×
UNCOV
864
    TAOS_RETURN(code);
×
865
  }
866

867
  SMnode *pMnode = pReq->info.node;
9✔
868
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
9✔
869
  if (pTrans == NULL) {
9!
UNCOV
870
    mError("failed to update arbgroup in create trans, since %s", terrstr());
×
UNCOV
871
    goto _OVER;
×
872
  }
873

874
  sz = taosArrayGetSize(req.updateArray);
9✔
875
  for (size_t i = 0; i < sz; i++) {
20✔
876
    SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
11✔
877
    SArbGroup         newGroup = {0};
11✔
878
    newGroup.vgId = pUpdateGroup->vgId;
11✔
879
    newGroup.dbUid = pUpdateGroup->dbUid;
11✔
880
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
33✔
881
      newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
22✔
882
      tstrncpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
22✔
883
    }
884

885
    newGroup.isSync = pUpdateGroup->isSync;
11✔
886
    newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
11✔
887
    tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
11✔
888
    newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
11✔
889
    newGroup.version = pUpdateGroup->version;
11✔
890
    newGroup.code = pUpdateGroup->code;
11✔
891
    newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
11✔
892

893
    mInfo(
11!
894
        "trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
895
        "%" PRId64,
896
        pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
897
        newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
898
        newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked,
899
        pUpdateGroup->code, pUpdateGroup->updateTimeMs);
900

901
    SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
11✔
902
    if (!pOldGroup) {
11!
UNCOV
903
      mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
×
UNCOV
904
      if (taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)) != 0) {
×
UNCOV
905
        mError("failed to remove vgId:%d from arbUpdateHash", newGroup.vgId);
×
906
      }
UNCOV
907
      continue;
×
908
    }
909

910
    mndTransAddArbGroupId(pTrans, newGroup.vgId);
11✔
911

912
    if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
11!
UNCOV
913
      mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
×
914
             terrstr());
UNCOV
915
      goto _OVER;
×
916
    }
917

918
    mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
11!
919
          pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
920
          newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
921
          newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
922

923
    sdbRelease(pMnode->pSdb, pOldGroup);
11✔
924
  }
925

926
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
9!
927
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
9!
928

929
  code = 0;
9✔
930

931
_OVER:
9✔
932
  if (code != 0) {
9!
933
    // failed to update arbgroup
UNCOV
934
    for (size_t i = 0; i < sz; i++) {
×
UNCOV
935
      SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
×
UNCOV
936
      if (taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)) != 0) {
×
UNCOV
937
        mError("failed to remove vgId:%d from arbUpdateHash", pUpdateGroup->vgId);
×
938
      }
939
    }
940
  }
941

942
  mndTransDrop(pTrans);
9✔
943
  tFreeSMArbUpdateGroupBatchReq(&req);
9✔
944
  return code;
9✔
945
}
946

947
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
38✔
948
  (void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
38✔
949
}
38✔
950

951
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
1✔
952
  SArbGroupMember *pMember = &pGroup->members[index];
1✔
953

954
  pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
1✔
955
  tstrncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
1✔
956
  pGroup->assignedLeader.acked = false;
1✔
957
}
1✔
958

959
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
1✔
960
  pGroup->assignedLeader.dnodeId = 0;
1✔
961
  (void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
1✔
962
  pGroup->assignedLeader.acked = false;
1✔
963
}
1✔
964

965
bool mndCheckArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
37✔
966
                                 SArbGroup *pNewGroup) {
967
  bool             updateToken = false;
37✔
968
  SArbGroupMember *pMember = NULL;
37✔
969

970
  (void)taosThreadMutexLock(&pGroup->mutex);
37✔
971

972
  int index = 0;
37✔
973
  for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
54!
974
    pMember = &pGroup->members[index];
54✔
975
    if (pMember->info.dnodeId == dnodeId) {
54✔
976
      break;
37✔
977
    }
978
    pMember = NULL;
17✔
979
  }
980

981
  if (pMember == NULL) {
37!
UNCOV
982
    mInfo("dnodeId:%d vgId:%d, arb token update check failed, no obj found", dnodeId, pRspMember->vgId);
×
UNCOV
983
    goto _OVER;
×
984
  }
985

986
  if (pMember->state.responsedHbSeq >= pRspMember->hbSeq) {
37✔
987
    // skip
988
    mInfo("dnodeId:%d vgId:%d, skip arb token update, heart beat seq expired, local:%d msg:%d", dnodeId,
1!
989
          pRspMember->vgId, pMember->state.responsedHbSeq, pRspMember->hbSeq);
990
    goto _OVER;
1✔
991
  }
992

993
  // update hb state
994
  pMember->state.responsedHbSeq = pRspMember->hbSeq;
36✔
995
  pMember->state.lastHbMs = nowMs;
36✔
996
  if (mndArbCheckToken(pMember->state.token, pRspMember->memberToken) == 0) {
36✔
997
    // skip
998
    mDebug("dnodeId:%d vgId:%d, skip arb token update, token matched", dnodeId, pRspMember->vgId);
11✔
999
    goto _OVER;
11✔
1000
  }
1001

1002
  // update token
1003
  mndArbGroupDupObj(pGroup, pNewGroup);
25✔
1004
  tstrncpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
25✔
1005
  pNewGroup->isSync = false;
25✔
1006

1007
  bool resetAssigned = false;
25✔
1008
  if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) {
25✔
1009
    mndArbGroupResetAssignedLeader(pNewGroup);
1✔
1010
    resetAssigned = true;
1✔
1011
  }
1012

1013
  updateToken = true;
25✔
1014
  mInfo("dnodeId:%d vgId:%d, arb token updating, resetAssigned:%d", dnodeId, pRspMember->vgId, resetAssigned);
25!
1015

UNCOV
1016
_OVER:
×
1017
  (void)taosThreadMutexUnlock(&pGroup->mutex);
37✔
1018
  return updateToken;
37✔
1019
}
1020

1021
static int32_t mndUpdateArbGroupsByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
26✔
1022
  int64_t nowMs = taosGetTimestampMs();
26✔
1023
  size_t  size = taosArrayGetSize(memberArray);
26✔
1024
  SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
26✔
1025

1026
  for (size_t i = 0; i < size; i++) {
60✔
1027
    SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
34✔
1028

1029
    SArbGroup  newGroup = {0};
34✔
1030
    SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &pRspMember->vgId);
34✔
1031
    if (pGroup == NULL) {
34!
UNCOV
1032
      mInfo("failed to update arb token, vgId:%d not found", pRspMember->vgId);
×
UNCOV
1033
      continue;
×
1034
    }
1035

1036
    bool updateToken = mndCheckArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
34✔
1037
    if (updateToken) {
34✔
1038
      if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
24!
UNCOV
1039
        mError("failed to push newGroup to updateArray, but continue at this hearbear");
×
1040
      }
1041
    }
1042

1043
    sdbRelease(pMnode->pSdb, pGroup);
34✔
1044
  }
1045

1046
  TAOS_CHECK_RETURN(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
26!
1047

1048
  taosArrayDestroy(pUpdateArray);
26✔
1049
  return 0;
26✔
1050
}
1051

1052
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
4✔
1053
                                  bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
1054
  bool updateIsSync = false;
4✔
1055

1056
  (void)taosThreadMutexLock(&pGroup->mutex);
4✔
1057

1058
  if (pGroup->assignedLeader.dnodeId != 0) {
4!
UNCOV
1059
    terrno = TSDB_CODE_SUCCESS;
×
UNCOV
1060
    mInfo("skip to update arb sync, vgId:%d has assigned leader:%d", vgId, pGroup->assignedLeader.dnodeId);
×
UNCOV
1061
    goto _OVER;
×
1062
  }
1063

1064
  char *local0Token = pGroup->members[0].state.token;
4✔
1065
  char *local1Token = pGroup->members[1].state.token;
4✔
1066
  if (mndArbCheckToken(local0Token, member0Token) != 0 || mndArbCheckToken(local1Token, member1Token) != 0) {
4!
1067
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
1✔
1068
    mInfo("skip to update arb sync, memberToken mismatch local:[%s][%s], msg:[%s][%s]", local0Token, local1Token,
1!
1069
          member0Token, member1Token);
1070
    goto _OVER;
1✔
1071
  }
1072

1073
  if (pGroup->isSync != newIsSync) {
3!
1074
    mndArbGroupDupObj(pGroup, pNewGroup);
3✔
1075
    pNewGroup->isSync = newIsSync;
3✔
1076
    pNewGroup->code = code;
3✔
1077
    pNewGroup->updateTimeMs = taosGetTimestampMs();
3✔
1078

1079
    mInfo("vgId:%d, arb isSync updating, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync, pNewGroup->updateTimeMs);
3!
1080
    updateIsSync = true;
3✔
1081
  }
1082

UNCOV
1083
_OVER:
×
1084
  (void)taosThreadMutexUnlock(&pGroup->mutex);
4✔
1085
  return updateIsSync;
4✔
1086
}
1087

1088
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync,
2✔
1089
                                int32_t rsp_code) {
1090
  int32_t    code = 0;
2✔
1091
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
2✔
1092
  if (pGroup == NULL) {
2!
UNCOV
1093
    mInfo("failed to update arb sync, vgId:%d not found", vgId);
×
UNCOV
1094
    code = -1;
×
UNCOV
1095
    if (terrno != 0) code = terrno;
×
1096
    TAOS_RETURN(code);
×
1097
  }
1098

1099
  SArbGroup newGroup = {0};
2✔
1100
  bool      updateIsSync =
1101
      mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
2✔
1102
  if (updateIsSync) {
2!
1103
    if (mndUpdateArbGroup(pMnode, &newGroup) != 0) {
2!
UNCOV
1104
      mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
×
1105
    }
1106
  }
1107

1108
  sdbRelease(pMnode->pSdb, pGroup);
2✔
1109
  return 0;
2✔
1110
}
1111

1112
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
26✔
1113
  if (pRsp->contLen == 0) {
26!
UNCOV
1114
    mDebug("arb hb-rsp contLen is 0");
×
1115
    return 0;
×
1116
  }
1117

1118
  int32_t code = -1;
26✔
1119

1120
  SMnode *pMnode = pRsp->info.node;
26✔
1121
  SSdb   *pSdb = pMnode->pSdb;
26✔
1122

1123
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1124
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
26!
1125
    mError("failed to get arb token for arb-hb response");
×
1126
    TAOS_RETURN(code);
×
1127
  }
1128

1129
  SVArbHeartBeatRsp arbHbRsp = {0};
26✔
1130
  if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
26!
UNCOV
1131
    mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
×
UNCOV
1132
    TAOS_RETURN(code);
×
1133
  }
1134

1135
  if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
26!
1136
    mInfo("arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]", arbHbRsp.dnodeId,
×
1137
          arbToken, arbHbRsp.arbToken);
UNCOV
1138
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
UNCOV
1139
    goto _OVER;
×
1140
  }
1141

1142
  TAOS_CHECK_GOTO(mndUpdateArbGroupsByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
26!
1143
  code = 0;
26✔
1144

1145
_OVER:
26✔
1146
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
26✔
1147
  return code;
26✔
1148
}
1149

1150
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
3✔
1151
  if (pRsp->contLen == 0) {
3✔
1152
    mDebug("arb check-sync-rsp contLen is 0");
1!
1153
    return 0;
1✔
1154
  }
1155

1156
  int32_t code = -1;
2✔
1157

1158
  SMnode *pMnode = pRsp->info.node;
2✔
1159
  SSdb   *pSdb = pMnode->pSdb;
2✔
1160

1161
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1162
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
2!
UNCOV
1163
    mError("failed to get arb token for arb-check-sync response");
×
UNCOV
1164
    TAOS_RETURN(code);
×
1165
  }
1166

1167
  SVArbCheckSyncRsp syncRsp = {0};
2✔
1168
  if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
2!
UNCOV
1169
    mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
×
UNCOV
1170
    if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
×
UNCOV
1171
      terrno = TSDB_CODE_SUCCESS;
×
UNCOV
1172
      return 0;
×
1173
    }
1174
    TAOS_RETURN(code);
×
1175
  }
1176

1177
  mInfo("vgId:%d, vnode-arb-check-sync-rsp received, errCode:%d", syncRsp.vgId, syncRsp.errCode);
2!
1178
  if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
2!
UNCOV
1179
    mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
×
1180
          syncRsp.arbToken);
1181
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1182
    goto _OVER;
×
1183
  }
1184

1185
  bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
2✔
1186
  if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
2!
1187
                               syncRsp.errCode)) != 0) {
UNCOV
1188
    mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
×
1189
    goto _OVER;
×
1190
  }
1191

1192
  code = 0;
2✔
1193

1194
_OVER:
2✔
1195
  tFreeSVArbCheckSyncRsp(&syncRsp);
2✔
1196
  TAOS_RETURN(code);
2✔
1197
}
1198

1199
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
3✔
1200
                                          SArbGroup *pNewGroup) {
1201
  bool updateAssigned = false;
3✔
1202

1203
  (void)taosThreadMutexLock(&pGroup->mutex);
3✔
1204
  if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
3✔
1205
    mInfo("skip update arb assigned for vgId:%d, member token mismatch, local:[%s] msg:[%s]", vgId,
1!
1206
          pGroup->assignedLeader.token, memberToken);
1207
    goto _OVER;
1✔
1208
  }
1209

1210
  if (errcode != TSDB_CODE_SUCCESS) {
2✔
1211
    mInfo("skip update arb assigned for vgId:%d, since:%s", vgId, tstrerror(errcode));
1!
1212
    goto _OVER;
1✔
1213
  }
1214

1215
  if (pGroup->assignedLeader.acked == false) {
1!
1216
    mndArbGroupDupObj(pGroup, pNewGroup);
1✔
1217
    pNewGroup->isSync = false;
1✔
1218
    pNewGroup->assignedLeader.acked = true;
1✔
1219

1220
    mInfo("vgId:%d, arb received assigned ack", vgId);
1!
1221
    updateAssigned = true;
1✔
1222
    goto _OVER;
1✔
1223
  }
1224

UNCOV
1225
_OVER:
×
1226
  (void)taosThreadMutexUnlock(&pGroup->mutex);
3✔
1227
  return updateAssigned;
3✔
1228
}
1229

UNCOV
1230
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
×
UNCOV
1231
  if (pRsp->contLen == 0) {
×
UNCOV
1232
    mDebug("arb set-assigned-rsp contLen is 0");
×
UNCOV
1233
    return 0;
×
1234
  }
1235

UNCOV
1236
  int32_t code = -1;
×
1237

UNCOV
1238
  SMnode *pMnode = pRsp->info.node;
×
1239
  SSdb   *pSdb = pMnode->pSdb;
×
1240

1241
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1242
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
UNCOV
1243
    mError("failed to get arb token for arb-set-assigned response");
×
UNCOV
1244
    TAOS_RETURN(code);
×
1245
  }
1246

1247
  SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
×
1248
  if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
×
UNCOV
1249
    mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
×
UNCOV
1250
    TAOS_RETURN(code);
×
1251
  }
1252

1253
  if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
×
UNCOV
1254
    mInfo("skip update arb assigned for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
×
1255
          arbToken, setAssignedRsp.arbToken);
1256
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1257
    goto _OVER;
×
1258
  }
1259

UNCOV
1260
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
×
UNCOV
1261
  if (!pGroup) {
×
1262
    mError("failed to set arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
×
1263
    code = -1;
×
UNCOV
1264
    if (terrno != 0) code = terrno;
×
1265
    goto _OVER;
×
1266
  }
1267

UNCOV
1268
  SArbGroup newGroup = {0};
×
1269
  bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
×
1270
                                                             pRsp->code, &newGroup);
1271
  if (updateAssigned) {
×
1272
    if ((code = mndUpdateArbGroup(pMnode, &newGroup)) != 0) {
×
1273
      mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
×
1274
      goto _OVER;
×
1275
    }
1276
  }
1277

1278
  code = 0;
×
1279

1280
_OVER:
×
1281
  tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
×
1282
  return code;
×
1283
}
1284

UNCOV
1285
static char *formatTimestamp(char *buf, int64_t val, int precision) {
×
1286
  time_t tt;
1287
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
UNCOV
1288
    tt = (time_t)(val / 1000000);
×
1289
  }
1290
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
1291
    tt = (time_t)(val / 1000000000);
×
1292
  } else {
UNCOV
1293
    tt = (time_t)(val / 1000);
×
1294
  }
1295

1296
  struct tm tm;
1297
  if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
×
1298
    mError("failed to get local time");
×
1299
    return NULL;
×
1300
  }
1301
  size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
×
1302

1303
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1304
    sprintf(buf + pos, ".%06d", (int)(val % 1000000));
×
1305
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
UNCOV
1306
    sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
×
1307
  } else {
UNCOV
1308
    sprintf(buf + pos, ".%03d", (int)(val % 1000));
×
1309
  }
1310

1311
  return buf;
×
1312
}
1313

1314
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1315
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
1316
  SSdb      *pSdb = pMnode->pSdb;
×
UNCOV
1317
  int32_t    numOfRows = 0;
×
1318
  int32_t    cols = 0;
×
1319
  SArbGroup *pGroup = NULL;
×
UNCOV
1320
  int32_t    code = 0;
×
1321
  int32_t    lino = 0;
×
1322

1323
  while (numOfRows < rows) {
×
UNCOV
1324
    pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
×
1325
    if (pShow->pIter == NULL) break;
×
1326

UNCOV
1327
    (void)taosThreadMutexLock(&pGroup->mutex);
×
1328

1329
    cols = 0;
×
1330
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1331
    SVgObj          *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
×
UNCOV
1332
    if (!pVgObj) {
×
UNCOV
1333
      (void)taosThreadMutexUnlock(&pGroup->mutex);
×
UNCOV
1334
      sdbRelease(pSdb, pGroup);
×
1335
      continue;
×
1336
    }
1337
    char dbNameInGroup[TSDB_DB_FNAME_LEN];
1338
    tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
×
1339
    sdbRelease(pSdb, pVgObj);
×
1340

UNCOV
1341
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1342
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(dbNameInGroup), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1343
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pGroup, &lino, _OVER);
×
1344

1345
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1346
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false), pGroup, &lino, _OVER);
×
1347

1348
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
1349
      SArbGroupMember *pMember = &pGroup->members[i];
×
UNCOV
1350
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1351
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false), pGroup,
×
1352
                          &lino, _OVER);
1353
    }
1354

1355
    mInfo("vgId:%d, arb group sync:%d, code:%s, update time:%" PRId64, pGroup->vgId, pGroup->isSync,
×
1356
          tstrerror(pGroup->code), pGroup->updateTimeMs);
1357

1358
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1359
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
×
1360

UNCOV
1361
    char strCheckSyncCode[100] = {0};
×
1362
    char bufUpdateTime[40] = {0};
×
UNCOV
1363
    (void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
×
1364
    tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
×
1365

UNCOV
1366
    char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1367
    STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);
×
1368
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1369
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)checkSyncCode, false), pGroup, &lino, _OVER);
×
1370

UNCOV
1371
    if (pGroup->assignedLeader.dnodeId != 0) {
×
1372
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1373
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false),
×
1374
                          pGroup, &lino, _OVER);
1375

1376
      char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
×
1377
      STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1378
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1379
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)token, false), pGroup, &lino, _OVER);
×
1380

1381
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1382
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false),
×
1383
                          pGroup, &lino, _OVER);
1384
    } else {
UNCOV
1385
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1386
      colDataSetNULL(pColInfo, numOfRows);
×
1387

UNCOV
1388
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1389
      colDataSetNULL(pColInfo, numOfRows);
×
1390

UNCOV
1391
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1392
      colDataSetNULL(pColInfo, numOfRows);
×
1393
    }
1394

UNCOV
1395
    (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1396

UNCOV
1397
    numOfRows++;
×
UNCOV
1398
    sdbRelease(pSdb, pGroup);
×
1399
  }
1400

UNCOV
1401
_OVER:
×
UNCOV
1402
  if (code != 0) mError("failed to restrieve arb group at line:%d, since %s", lino, tstrerror(code));
×
UNCOV
1403
  pShow->numOfRows += numOfRows;
×
1404

UNCOV
1405
  return numOfRows;
×
1406
}
1407

UNCOV
1408
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter) {
×
UNCOV
1409
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1410
  sdbCancelFetchByType(pSdb, pIter, SDB_ARBGROUP);
×
UNCOV
1411
}
×
1412

UNCOV
1413
int32_t mndGetArbGroupSize(SMnode *pMnode) {
×
UNCOV
1414
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1415
  return sdbGetSize(pSdb, SDB_ARBGROUP);
×
1416
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc