• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4827

28 Oct 2025 01:21AM UTC coverage: 59.754% (+0.02%) from 59.732%
#4827

push

travis-ci

web-flow
test: tidy up cases on main branch(10-27) (#33381)

121341 of 258518 branches covered (46.94%)

Branch coverage included in aggregate %.

193621 of 268583 relevant lines covered (72.09%)

4339204.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.51
/source/dnode/mnode/impl/src/mndArbGroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndArbGroup.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "mndSync.h"
24

25
#define ARBGROUP_VER_NUMBER   1
26
#define ARBGROUP_RESERVE_SIZE 51
27

28
static SHashObj *arbUpdateHash = NULL;
29

30
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
31
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
32
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
33

34
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
35
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
36
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
37

38
static int32_t mndArbPutUpdateArbIntoWQ(SMnode *pMnode, SArbGroup *pNewGroup);
39
static int32_t mndArbPutBatchUpdateIntoWQ(SMnode *pMnode, SArray *newGroupArray);
40

41
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
42
static int32_t mndArbProcessTimer(SRpcMsg *pReq);
43
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
44
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
45
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
46
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
47
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
48
static void    mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
49
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq);
50

51
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
389✔
52
  if (token1 == NULL || token2 == NULL) return -1;
389!
53
  if (strlen(token1) == 0 || strlen(token2) == 0) return -1;
389!
54
  return strncmp(token1, token2, TSDB_ARB_TOKEN_SIZE);
321✔
55
}
56

57
int32_t mndInitArbGroup(SMnode *pMnode) {
1,310✔
58
  int32_t   code = 0;
1,310✔
59
  SSdbTable table = {
1,310✔
60
      .sdbType = SDB_ARBGROUP,
61
      .keyType = SDB_KEY_INT32,
62
      .encodeFp = (SdbEncodeFp)mndArbGroupActionEncode,
63
      .decodeFp = (SdbDecodeFp)mndArbGroupActionDecode,
64
      .insertFp = (SdbInsertFp)mndArbGroupActionInsert,
65
      .updateFp = (SdbUpdateFp)mndArbGroupActionUpdate,
66
      .deleteFp = (SdbDeleteFp)mndArbGroupActionDelete,
67
  };
68

69
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
1,310✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndArbProcessTimer);
1,310✔
71
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
1,310✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
1,310✔
73
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
1,310✔
74
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
1,310✔
75
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_ASSIGN_LEADER, mndProcessAssignLeaderMsg);
1,310✔
76

77
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
1,310✔
78
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
1,310✔
79

80
  arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,310✔
81
  if (arbUpdateHash == NULL) {
1,310!
82
    code = terrno;
×
83
    TAOS_RETURN(code);
×
84
  }
85

86
  return sdbSetTable(pMnode->pSdb, table);
1,310✔
87
}
88

89
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
1,310✔
90

91
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
×
92
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
×
93
  if (pGroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
94
    terrno = TSDB_CODE_MND_ARBGROUP_NOT_EXIST;
×
95
  }
96
  return pGroup;
×
97
}
98

99
void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
×
100
  SSdb *pSdb = pMnode->pSdb;
×
101
  sdbRelease(pSdb, pGroup);
×
102
}
×
103

104
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
18✔
105
  if (pVgObj->replica != 2) {
18!
106
    TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
107
  }
108
  (void)memset(outGroup, 0, sizeof(SArbGroup));
18✔
109
  outGroup->dbUid = pVgObj->dbUid;
18✔
110
  outGroup->vgId = pVgObj->vgId;
18✔
111
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
54✔
112
    SArbGroupMember *pMember = &outGroup->members[i];
36✔
113
    pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
36✔
114
  }
115

116
  TAOS_RETURN(TSDB_CODE_SUCCESS);
18✔
117
}
118

119
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
94✔
120
  int32_t code = 0;
94✔
121
  int32_t lino = 0;
94✔
122
  terrno = TSDB_CODE_OUT_OF_MEMORY;
94✔
123

124
  int32_t  size = sizeof(SArbGroup) + ARBGROUP_RESERVE_SIZE;
94✔
125
  SSdbRaw *pRaw = sdbAllocRaw(SDB_ARBGROUP, ARBGROUP_VER_NUMBER, size);
94✔
126
  if (pRaw == NULL) goto _OVER;
94!
127

128
  int32_t dataPos = 0;
94✔
129
  SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
94!
130
  SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
94!
131
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
282✔
132
    SArbGroupMember *pMember = &pGroup->members[i];
188✔
133
    SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
188!
134
    SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
188!
135
  }
136
  SDB_SET_INT8(pRaw, dataPos, pGroup->isSync, _OVER)
94!
137

138
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
94✔
139
  SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
94!
140
  SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
94!
141
  SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
94!
142
  SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
94!
143
  SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
94!
144
  SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
94!
145

146
  SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
94!
147

148
  terrno = 0;
94✔
149

150
_OVER:
94✔
151
  if (terrno != 0) {
94!
152
    mError("arbgroup:%d, failed to encode to raw:%p since %s", pGroup->vgId, pRaw, terrstr());
×
153
    sdbFreeRaw(pRaw);
×
154
    return NULL;
×
155
  }
156

157
  mTrace("arbgroup:%d, encode to raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
94!
158
  return pRaw;
94✔
159
}
160

161
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
41✔
162
  int32_t code = 0;
41✔
163
  int32_t lino = 0;
41✔
164
  terrno = TSDB_CODE_OUT_OF_MEMORY;
41✔
165
  SSdbRow   *pRow = NULL;
41✔
166
  SArbGroup *pGroup = NULL;
41✔
167

168
  int8_t sver = 0;
41✔
169
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
41!
170

171
  if (sver != ARBGROUP_VER_NUMBER) {
41!
172
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
173
    goto _OVER;
×
174
  }
175

176
  pRow = sdbAllocRow(sizeof(SArbGroup));
41✔
177
  if (pRow == NULL) goto _OVER;
41!
178

179
  pGroup = sdbGetRowObj(pRow);
41✔
180
  if (pGroup == NULL) goto _OVER;
41!
181

182
  int32_t dataPos = 0;
41✔
183
  SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
41!
184
  SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
41!
185
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
123✔
186
    SArbGroupMember *pMember = &pGroup->members[i];
82✔
187
    SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
82!
188
    SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
82!
189

190
    pMember->state.nextHbSeq = 0;
82✔
191
    pMember->state.responsedHbSeq = -1;
82✔
192
    pMember->state.lastHbMs = 0;
82✔
193
  }
194
  SDB_GET_INT8(pRaw, dataPos, &pGroup->isSync, _OVER)
41!
195

196
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
41✔
197
  SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
41!
198
  SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
41!
199
  SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
41!
200
  SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
41!
201
  SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
41!
202
  SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
41!
203

204
  pGroup->mutexInited = false;
41✔
205

206
  SDB_GET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
41!
207

208
  terrno = 0;
41✔
209

210
_OVER:
41✔
211
  if (terrno != 0) {
41!
212
    mError("arbgroup:%d, failed to decode from raw:%p since %s", pGroup == NULL ? 0 : pGroup->vgId, pRaw, terrstr());
×
213
    taosMemoryFreeClear(pRow);
×
214
    return NULL;
×
215
  }
216

217
  mTrace("arbgroup:%d, decode from raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
41!
218
  return pRow;
41✔
219
}
220

221
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
10✔
222
  mTrace("arbgroup:%d, perform insert action, row:%p", pGroup->vgId, pGroup);
10!
223
  if (!pGroup->mutexInited && (taosThreadMutexInit(&pGroup->mutex, NULL) == 0)) {
10!
224
    pGroup->mutexInited = true;
10✔
225
  }
226

227
  return pGroup->mutexInited ? 0 : -1;
10!
228
}
229

230
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
38✔
231
  mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
38!
232
  if (pGroup->mutexInited) {
38✔
233
    (void)taosThreadMutexDestroy(&pGroup->mutex);
10✔
234
    pGroup->mutexInited = false;
10✔
235
  }
236
  return 0;
38✔
237
}
238

239
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
22✔
240
  mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
22!
241
  (void)taosThreadMutexLock(&pOld->mutex);
22✔
242

243
  if (pOld->version != pNew->version) {
22!
244
    mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
×
245
          " new version:%" PRId64,
246
          pOld->vgId, pOld, pNew, pOld->version, pNew->version);
247
    goto _OVER;
×
248
  }
249

250
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
66✔
251
    tstrncpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
44✔
252
  }
253
  pOld->isSync = pNew->isSync;
22✔
254
  pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
22✔
255
  tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
22✔
256
  pOld->assignedLeader.acked = pNew->assignedLeader.acked;
22✔
257
  pOld->version++;
22✔
258
  pOld->code = pNew->code;
22✔
259
  pOld->updateTimeMs = pNew->updateTimeMs;
22✔
260

261
  mInfo(
22!
262
      "arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
263
      "as-token:%s, as-acked:%d, version:%" PRId64,
264
      pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
265
      pOld->assignedLeader.dnodeId, pOld->assignedLeader.token, pOld->assignedLeader.acked, pOld->version);
266

267
_OVER:
×
268
  (void)taosThreadMutexUnlock(&pOld->mutex);
22✔
269

270
  if (mndIsLeader(pSdb->pMnode)) {
22!
271
    mInfo("arbgroup:%d, remove from arb Update Hash", pOld->vgId);
22!
272
    if (taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)) != 0) {
22✔
273
      mError("arbgroup:%d, failed to remove from arb Update Hash", pOld->vgId);
6!
274
    }
275
  }
276
  return 0;
22✔
277
}
278

279
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
280
  int32_t  code = 0;
×
281
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
282
  if (pRedoRaw == NULL) {
×
283
    code = terrno;
×
284
    TAOS_RETURN(code);
×
285
  }
286
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
287
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
×
288
  return 0;
×
289
}
290

291
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
9✔
292
  int32_t  code = 0;
9✔
293
  SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
9✔
294
  if (pUndoRaw == NULL) {
9!
295
    code = terrno;
×
296
    TAOS_RETURN(code);
×
297
  }
298
  if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
9!
299
  if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
9!
300
  return 0;
9✔
301
}
302

303
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
66✔
304
  int32_t  code = 0;
66✔
305
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
66✔
306
  if (pCommitRaw == NULL) {
66!
307
    code = terrno;
×
308
    TAOS_RETURN(code);
×
309
  }
310
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
66!
311
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
66!
312
  return 0;
66✔
313
}
314

315
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
316
  int32_t  code = 0;
6✔
317
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
6✔
318
  if (pRedoRaw == NULL) {
6!
319
    code = terrno;
×
320
    TAOS_RETURN(code);
×
321
  }
322
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
6!
323
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
6!
324
  return 0;
6✔
325
}
326

327
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
328
  int32_t  code = 0;
×
329
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
330
  if (pRedoRaw == NULL) {
×
331
    code = terrno;
×
332
    TAOS_RETURN(code);
×
333
  }
334
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
335
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
336
  return 0;
×
337
}
338

339
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
340
  int32_t  code = 0;
6✔
341
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
6✔
342
  if (pCommitRaw == NULL) {
6!
343
    code = terrno;
×
344
    TAOS_RETURN(code);
×
345
  }
346
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
6!
347
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
6!
348
  return 0;
6✔
349
}
350

351
static void *mndBuildArbHeartBeatReq(int32_t *pContLen, char *arbToken, int32_t dnodeId, int64_t arbTerm,
176✔
352
                                     SArray *hbMembers) {
353
  SVArbHeartBeatReq req = {0};
176✔
354
  req.dnodeId = dnodeId;
176✔
355
  req.arbToken = arbToken;
176✔
356
  req.arbTerm = arbTerm;
176✔
357
  req.hbMembers = hbMembers;
176✔
358

359
  int32_t contLen = tSerializeSVArbHeartBeatReq(NULL, 0, &req);
176✔
360
  if (contLen <= 0) return NULL;
176!
361

362
  void *pReq = rpcMallocCont(contLen);
176✔
363
  if (pReq == NULL) return NULL;
176!
364

365
  if (tSerializeSVArbHeartBeatReq(pReq, contLen, &req) <= 0) {
176!
366
    rpcFreeCont(pReq);
×
367
    return NULL;
×
368
  }
369
  *pContLen = contLen;
176✔
370
  return pReq;
176✔
371
}
372

373
static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t arbTerm, SArray *hbMembers) {
176✔
374
  int32_t contLen = 0;
176✔
375
  void   *pHead = mndBuildArbHeartBeatReq(&contLen, arbToken, pDnode->id, arbTerm, hbMembers);
176✔
376
  if (pHead == NULL) {
176!
377
    mError("arbgroup:0, dnodeId:%d, failed to build arb-hb request", pDnode->id);
×
378
    return -1;
×
379
  }
380
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_HEARTBEAT, .pCont = pHead, .contLen = contLen};
176✔
381

382
  SEpSet epSet = mndGetDnodeEpset(pDnode);
176✔
383
  if (epSet.numOfEps == 0) {
176!
384
    mError("arbgroup:0, dnodeId:%d, failed to send arb-hb request to dnode since no epSet found", pDnode->id);
×
385
    rpcFreeCont(pHead);
×
386
    return -1;
×
387
  }
388

389
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
176✔
390
  if (code != 0) {
176!
391
    mError("arbgroup:0, dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code);
×
392
  } else {
393
    if (tsSyncLogHeartbeat) {
176!
394
      mInfo("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id);
×
395
    } else {
396
      mTrace("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id);
176!
397
    }
398
  }
399
  return code;
176✔
400
}
401

402
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
21,214✔
403
  int32_t    code = 0;
21,214✔
404
  SMnode    *pMnode = pReq->info.node;
21,214✔
405
  SSdb      *pSdb = pMnode->pSdb;
21,214✔
406
  SArbGroup *pArbGroup = NULL;
21,214✔
407
  void      *pIter = NULL;
21,214✔
408

409
  SHashObj *pDnodeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
21,214✔
410

411
  // collect member of same dnode
412
  while (1) {
413
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
21,311✔
414
    if (pIter == NULL) break;
21,311✔
415

416
    (void)taosThreadMutexLock(&pArbGroup->mutex);
97✔
417

418
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
291✔
419
      SArbGroupMember *pMember = &pArbGroup->members[i];
194✔
420
      int32_t          dnodeId = pMember->info.dnodeId;
194✔
421
      void            *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
194✔
422
      SArray          *hbMembers = NULL;
194✔
423
      if (pObj) {
194✔
424
        hbMembers = *(SArray **)pObj;
18✔
425
      } else {
426
        hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
176✔
427
        if (taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES) != 0) {
176!
428
          mError("arbgroup:0, dnodeId:%d, failed to push hb member inty]o hash, but conitnue next at this timer round",
×
429
                 dnodeId);
430
        }
431
      }
432
      SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
194✔
433
      if (taosArrayPush(hbMembers, &reqMember) == NULL) {
388!
434
        mError("arbgroup:0, dnodeId:%d, failed to push hb member, but conitnue next at this timer round", dnodeId);
×
435
      }
436
    }
437

438
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
97✔
439
    sdbRelease(pSdb, pArbGroup);
97✔
440
  }
441

442
  char arbToken[TSDB_ARB_TOKEN_SIZE];
443
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
21,214!
444
    mError("arbgroup:0, failed to get arb token for arb-hb timer");
×
445
    pIter = taosHashIterate(pDnodeHash, NULL);
×
446
    while (pIter) {
×
447
      SArray *hbMembers = *(SArray **)pIter;
×
448
      taosArrayDestroy(hbMembers);
×
449
      pIter = taosHashIterate(pDnodeHash, pIter);
×
450
    }
451
    taosHashCleanup(pDnodeHash);
×
452
    TAOS_RETURN(code);
×
453
  }
454

455
  int64_t nowMs = taosGetTimestampMs();
21,214✔
456

457
  pIter = NULL;
21,214✔
458
  while (1) {
176✔
459
    pIter = taosHashIterate(pDnodeHash, pIter);
21,390✔
460
    if (pIter == NULL) break;
21,390✔
461

462
    int32_t dnodeId = *(int32_t *)taosHashGetKey(pIter, NULL);
176✔
463
    SArray *hbMembers = *(SArray **)pIter;
176✔
464

465
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
176✔
466
    if (pDnode == NULL) {
176!
467
      mError("arbgroup:0, dnodeId:%d, timer failed to acquire dnode", dnodeId);
×
468
      taosArrayDestroy(hbMembers);
×
469
      continue;
×
470
    }
471

472
    int64_t mndTerm = mndGetTerm(pMnode);
176✔
473

474
    if (mndIsDnodeOnline(pDnode, nowMs)) {
176!
475
      int32_t sendCode = mndSendArbHeartBeatReq(pDnode, arbToken, mndTerm, hbMembers);
176✔
476
      if (TSDB_CODE_SUCCESS != sendCode) {
176!
477
        mError("arbgroup:0, dnodeId:%d, timer failed to send arb-hb request", dnodeId);
×
478
      }
479
    }
480

481
    mndReleaseDnode(pMnode, pDnode);
176✔
482
    taosArrayDestroy(hbMembers);
176✔
483
  }
484
  taosHashCleanup(pDnodeHash);
21,214✔
485

486
  return 0;
21,214✔
487
}
488

489
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
23✔
490
                                     char *member0Token, char *member1Token) {
491
  SVArbCheckSyncReq req = {0};
23✔
492
  req.arbToken = arbToken;
23✔
493
  req.arbTerm = arbTerm;
23✔
494
  req.member0Token = member0Token;
23✔
495
  req.member1Token = member1Token;
23✔
496

497
  int32_t reqLen = tSerializeSVArbCheckSyncReq(NULL, 0, &req);
23✔
498
  int32_t contLen = reqLen + sizeof(SMsgHead);
23✔
499

500
  if (contLen <= 0) return NULL;
23!
501
  SMsgHead *pHead = rpcMallocCont(contLen);
23✔
502
  if (pHead == NULL) return NULL;
23!
503

504
  pHead->contLen = htonl(contLen);
23✔
505
  pHead->vgId = htonl(vgId);
23✔
506
  if (tSerializeSVArbCheckSyncReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
23!
507
    rpcFreeCont(pHead);
×
508
    return NULL;
×
509
  }
510
  *pContLen = contLen;
23✔
511
  return pHead;
23✔
512
}
513

514
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
23✔
515
                                      char *member1Token) {
516
  int32_t code = 0;
23✔
517
  int32_t contLen = 0;
23✔
518
  void   *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
23✔
519
  if (!pHead) {
23!
520
    mError("arbgroup:%d, failed to build check-sync request", vgId);
×
521
    return -1;
×
522
  }
523
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_CHECK_SYNC, .pCont = pHead, .contLen = contLen};
23✔
524
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
23✔
525
  TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
23✔
526
  
527
  SEpSet epSet = mndGetVgroupEpsetById(pMnode, vgId);
23✔
528
  if (epSet.numOfEps == 0) {
23!
529
    mError("arbgroup:%d, failed to send check-sync request since no epSet found", vgId);
×
530
    rpcFreeCont(pHead);
×
531
    code = -1;
×
532
    if (terrno != 0) code = terrno;
×
533
    TAOS_RETURN(code);
×
534
  }
535

536
  code = tmsgSendReq(&epSet, &rpcMsg);
23✔
537
  if (code != 0) {
23!
538
    mError("arbgroup:%d, failed to send check-sync request since 0x%x", vgId, code);
×
539
  } else {
540
    mDebug("arbgroup:%d, send check-sync request", vgId);
23!
541
  }
542
  return code;
23✔
543
}
544

545
static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) {
148✔
546
  SArbGroupMember *pArbMember = &pArbGroup->members[index];
148✔
547
  return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutMs);
148✔
548
}
549

550
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
×
551
                                             char *memberToken, bool force) {
552
  SVArbSetAssignedLeaderReq req = {0};
×
553
  req.arbToken = arbToken;
×
554
  req.arbTerm = arbTerm;
×
555
  req.memberToken = memberToken;
×
556
  if (force) req.force = 1;
×
557

558
  int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
×
559
  int32_t contLen = reqLen + sizeof(SMsgHead);
×
560

561
  if (contLen <= 0) return NULL;
×
562
  SMsgHead *pHead = rpcMallocCont(contLen);
×
563
  if (pHead == NULL) return NULL;
×
564

565
  pHead->contLen = htonl(contLen);
×
566
  pHead->vgId = htonl(vgId);
×
567
  if (tSerializeSVArbSetAssignedLeaderReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
×
568
    rpcFreeCont(pHead);
×
569
    return NULL;
×
570
  }
571
  *pContLen = contLen;
×
572
  return pHead;
×
573
}
574

575
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
×
576
                                              int64_t term, char *memberToken, bool force) {
577
  int32_t code = 0;
×
578
  int32_t contLen = 0;
×
579
  void   *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force);
×
580
  if (!pHead) {
×
581
    mError("arbgroup:%d, failed to build set-assigned request", vgId);
×
582
    code = -1;
×
583
    if (terrno != 0) code = terrno;
×
584
    TAOS_RETURN(code);
×
585
  }
586
  SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
×
587

588
  SEpSet epSet = mndGetDnodeEpsetById(pMnode, dnodeId);
×
589
  if (epSet.numOfEps == 0) {
×
590
    mError("arbgroup:%d, failed to send arb-set-assigned request to dnode:%d since no epSet found", vgId, dnodeId);
×
591
    rpcFreeCont(pHead);
×
592
    code = -1;
×
593
    if (terrno != 0) code = terrno;
×
594
    TAOS_RETURN(code);
×
595
  }
596
  code = tmsgSendReq(&epSet, &rpcMsg);
×
597
  if (code != 0) {
×
598
    mError("arbgroup:%d, failed to send arb-set-assigned request to dnode:%d since 0x%x", vgId, dnodeId, code);
×
599
  } else {
600
    mInfo("arbgroup:%d, send arb-set-assigned request to dnode:%d", vgId, dnodeId);
×
601
  }
602
  return code;
×
603
}
604

605
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
74✔
606
  *pOp = CHECK_SYNC_NONE;
74✔
607
  int32_t code = 0;
74✔
608

609
  int32_t vgId = pArbGroup->vgId;
74✔
610

611
  bool                member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
74✔
612
  bool                member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
74✔
613
  SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
74✔
614
  int32_t             currentAssignedDnodeId = pAssignedLeader->dnodeId;
74✔
615

616
  // 1. has assigned && no response => send req
617
  if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
74!
618
    *pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
6✔
619
    return;
6✔
620
  }
621

622
  // 2. both of the two members are timeout => skip
623
  if (member0IsTimeout && member1IsTimeout) {
68✔
624
    return;
4✔
625
  }
626

627
  // 3. no member is timeout => check sync
628
  if (member0IsTimeout == false && member1IsTimeout == false) {
64✔
629
    // no assigned leader and not sync
630
    if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
54!
631
      *pOp = CHECK_SYNC_CHECK_SYNC;
26✔
632
    }
633
    return;
54✔
634
  }
635

636
  // 4. one of the members is timeout => set assigned leader
637
  int32_t          candidateIndex = member0IsTimeout ? 1 : 0;
10✔
638
  SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
10✔
639

640
  // has assigned leader and dnodeId not match => skip
641
  if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
10!
642
    mInfo("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, assigned leader has been set to dnodeId:%d",
×
643
          vgId, pMember->info.dnodeId, currentAssignedDnodeId);
644
    return;
×
645
  }
646

647
  // not sync => skip
648
  if (pArbGroup->isSync == false) {
10!
649
    if (currentAssignedDnodeId == pMember->info.dnodeId) {
×
650
      mDebug("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, arb group is not sync", vgId,
×
651
             pMember->info.dnodeId);
652
    } else {
653
      mInfo("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, arb group is not sync", vgId,
×
654
            pMember->info.dnodeId);
655
    }
656
    *pOp = CHECK_SYNC_CHECK_SYNC;
×
657
    return;
×
658
  }
659

660
  // is sync && no assigned leader => write to sdb
661
  mndArbGroupDupObj(pArbGroup, pNewGroup);
10✔
662
  mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
10✔
663
  *pOp = CHECK_SYNC_UPDATE;
10✔
664
}
665

666
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) {
×
667
  SMnode    *pMnode = pReq->info.node;
×
668
  int32_t    code = -1, lino = 0;
×
669
  SArray    *pArray = NULL;
×
670
  void      *pIter = NULL;
×
671
  SSdb      *pSdb = pMnode->pSdb;
×
672
  SArbGroup *pArbGroup = NULL;
×
673

674
  SAssignLeaderReq req = {0};
×
675
  if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) {
×
676
    code = TSDB_CODE_INVALID_MSG;
×
677
    goto _exit;
×
678
  }
679

680
  mInfo("arbgroup:0, begin to process assign leader");
×
681

682
  char arbToken[TSDB_ARB_TOKEN_SIZE];
683
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
×
684

685
  int64_t term = mndGetTerm(pMnode);
×
686
  if (term < 0) {
×
687
    mError("arbgroup:0, arb failed to get term since %s", terrstr());
×
688
    code = -1;
×
689
    if (terrno != 0) code = terrno;
×
690
    TAOS_RETURN(code);
×
691
  }
692

693
  while (1) {
×
694
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
×
695
    if (pIter == NULL) break;
×
696

697
    SArbGroup arbGroupDup = {0};
×
698

699
    (void)taosThreadMutexLock(&pArbGroup->mutex);
×
700
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
×
701
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
×
702

703
    sdbRelease(pSdb, pArbGroup);
×
704

705
    int32_t dnodeId = 0;
×
706
    for (int32_t i = 0; i < 2; i++) {
×
707
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId);
×
708
      bool       isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
709
      mndReleaseDnode(pMnode, pDnode);
×
710
      if (isonline) {
×
711
        dnodeId = arbGroupDup.members[i].info.dnodeId;
×
712
        break;
×
713
      }
714
    }
715

716
    (void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true);
×
717
    mInfo("arbgroup:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId);
×
718
  }
719

720
  code = 0;
×
721

722
  // auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen);
723

724
_exit:
×
725
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
726
    mError("arbgroup:0, failed to assign leader since %s", tstrerror(code));
×
727
  }
728

729
  tFreeSAssignLeaderReq(&req);
×
730
  TAOS_RETURN(code);
×
731
}
732

733
static int32_t mndArbProcessTimer(SRpcMsg *pReq) {
13,935✔
734
  int32_t    code = 0, lino = 0;
13,935✔
735
  SMnode    *pMnode = pReq->info.node;
13,935✔
736
  SSdb      *pSdb = pMnode->pSdb;
13,935✔
737
  SArbGroup *pArbGroup = NULL;
13,935✔
738
  void      *pIter = NULL;
13,935✔
739
  SArray    *pUpdateArray = NULL;
13,935✔
740

741
  char arbToken[TSDB_ARB_TOKEN_SIZE];
742
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
13,935!
743

744
  int64_t term = mndGetTerm(pMnode);
13,935✔
745
  if (term < 0) {
13,935!
746
    mError("arbgroup:0, arb failed to get term since %s", terrstr());
×
747
    code = -1;
×
748
    if (terrno != 0) code = terrno;
×
749
    TAOS_RETURN(code);
×
750
  }
751

752
  int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
13,935✔
753
  int64_t nowMs = taosGetTimestampMs();
13,935✔
754
  if (nowMs - roleTimeMs < tsArbHeartBeatIntervalMs * 2) {
13,935✔
755
    mInfo("arbgroup:0, arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64,
1,001!
756
          roleTimeMs, nowMs);
757
    return 0;
1,001✔
758
  }
759

760
  while (1) {
62✔
761
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
12,996✔
762
    if (pIter == NULL) break;
12,996✔
763

764
    SArbGroup arbGroupDup = {0};
62✔
765

766
    (void)taosThreadMutexLock(&pArbGroup->mutex);
62✔
767
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
62✔
768
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
62✔
769

770
    sdbRelease(pSdb, pArbGroup);
62✔
771

772
    ECheckSyncOp op = CHECK_SYNC_NONE;
62✔
773
    SArbGroup    newGroup = {0};
62✔
774
    mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
62✔
775

776
    int32_t             vgId = arbGroupDup.vgId;
62✔
777
    SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
62✔
778
    int32_t             assgndDnodeId = pAssgndLeader->dnodeId;
62✔
779

780
    switch (op) {
62!
781
      case CHECK_SYNC_NONE:
32✔
782
        mTrace("arbgroup:%d, arb skip to send msg by check sync", vgId);
32!
783
        break;
32✔
784
      case CHECK_SYNC_SET_ASSIGNED_LEADER:
×
785
        (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false);
×
786
        mInfo("arbgroup:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
×
787
        break;
×
788
      case CHECK_SYNC_CHECK_SYNC:
23✔
789
        (void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
23✔
790
                                     arbGroupDup.members[1].state.token);
791
        mInfo("arbgroup:%d, send vnode-arb-check-sync request", vgId);
23!
792
        break;
23✔
793
      case CHECK_SYNC_UPDATE:
7✔
794
        if (!pUpdateArray) {
7!
795
          pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
7✔
796
          if (!pUpdateArray) {
7!
797
            TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
798
          }
799
        }
800

801
        if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
7!
802
          TAOS_CHECK_EXIT(terrno);
×
803
        }
804
        break;
7✔
805
      default:
×
806
        mError("arbgroup:%d, arb unknown check sync op:%d", vgId, op);
×
807
        break;
×
808
    }
809
  }
810

811
  TAOS_CHECK_EXIT(mndArbPutBatchUpdateIntoWQ(pMnode, pUpdateArray));
12,934!
812

813
_exit:
12,934✔
814
  if (code != 0) {
12,934!
815
    mError("arbgroup:0, failed to check sync at line %d since %s", lino, terrstr());
×
816
  }
817

818
  taosArrayDestroy(pUpdateArray);
12,934✔
819
  return 0;
12,934✔
820
}
821

822
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
55✔
823
  SMArbUpdateGroupBatchReq req = {0};
55✔
824
  req.updateArray = updateArray;
55✔
825

826
  int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
55✔
827
  if (contLen <= 0) return NULL;
55!
828
  SMsgHead *pHead = rpcMallocCont(contLen);
55✔
829
  if (pHead == NULL) return NULL;
55!
830

831
  if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
55!
832
    rpcFreeCont(pHead);
×
833
    return NULL;
×
834
  }
835
  *pContLen = contLen;
55✔
836
  return pHead;
55✔
837
}
838

839
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
57✔
840
  outGroup->vgId = pGroup->vgId;
57✔
841
  outGroup->dbUid = pGroup->dbUid;
57✔
842
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
171✔
843
    outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
114✔
844
    outGroup->members[i].token = pGroup->members[i].state.token;  // just copy the pointer
114✔
845
  }
846
  outGroup->isSync = pGroup->isSync;
57✔
847
  outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
57✔
848
  outGroup->assignedLeader.token = pGroup->assignedLeader.token;  // just copy the pointer
57✔
849
  outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
57✔
850
  outGroup->version = pGroup->version;
57✔
851
  outGroup->code = pGroup->code;
57✔
852
  outGroup->updateTimeMs = pGroup->updateTimeMs;
57✔
853
}
57✔
854

855
static int32_t mndArbPutUpdateArbIntoWQ(SMnode *pMnode, SArbGroup *pNewGroup) {
5✔
856
  if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
5✔
857
    mInfo("arbgroup:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
2!
858
    return 0;
2✔
859
  }
860

861
  int32_t ret = -1;
3✔
862

863
  SMArbUpdateGroup newGroup = {0};
3✔
864
  mndInitArbUpdateGroup(pNewGroup, &newGroup);
3✔
865

866
  SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
3✔
867
  if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
3!
868

869
  int32_t contLen = 0;
3✔
870
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
3✔
871
  if (!pHead) {
3!
872
    mError("arbgroup:0, failed to build arb-update-group request");
×
873
    goto _OVER;
×
874
  }
875

876
  SRpcMsg rpcMsg = {
3✔
877
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
878
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
3✔
879
  if (ret != 0) goto _OVER;
3!
880

881
  mInfo("arbgroup:%d, put into arb update hash", pNewGroup->vgId);
3!
882
  if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
3!
883

884
_OVER:
3✔
885
  taosArrayDestroy(pArray);
3✔
886
  if (ret != 0) {
3!
887
    mError("arbgroup:%d, failed to put arb group update into write queue since %s", pNewGroup->vgId, tstrerror(ret));
×
888
  }
889
  return ret;
3✔
890
}
891

892
static int32_t mndArbPutBatchUpdateIntoWQ(SMnode *pMnode, SArray *newGroupArray) {
13,110✔
893
  int32_t ret = -1;
13,110✔
894

895
  size_t  sz = taosArrayGetSize(newGroupArray);
13,110✔
896
  SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
13,110✔
897
  for (size_t i = 0; i < sz; i++) {
13,239✔
898
    SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
129✔
899
    if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
129✔
900
      mInfo("arbgroup:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
75!
901
      continue;
75✔
902
    }
903

904
    SMArbUpdateGroup newGroup = {0};
54✔
905
    mndInitArbUpdateGroup(pNewGroup, &newGroup);
54✔
906

907
    if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
54!
908
    mInfo("arbgroup:%d, put into arb update hash in array", pNewGroup->vgId);
54!
909
    if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) {
54!
910
      mError("arbgroup:%d, failed to put into arb update hash since %s", pNewGroup->vgId, tstrerror(ret));
×
911
      goto _OVER;
×
912
    }
913
  }
914

915
  if (taosArrayGetSize(pArray) == 0) {
13,110✔
916
    ret = 0;
13,058✔
917
    goto _OVER;
13,058✔
918
  }
919

920
  int32_t contLen = 0;
52✔
921
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
52✔
922
  if (!pHead) {
52!
923
    mError("arbgroup:0, failed to build arb-update-group request");
×
924
    goto _OVER;
×
925
  }
926

927
  SRpcMsg rpcMsg = {
52✔
928
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
929
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
52✔
930

931
_OVER:
13,110✔
932
  taosArrayDestroy(pArray);
13,110✔
933

934
  if (ret != 0) {
13,110!
935
    mError("arbgroup:0, failed to put arb group update into write queue since %s", tstrerror(ret));
×
936
    for (size_t i = 0; i < sz; i++) {
×
937
      SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
938
      if (taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != 0) {
×
939
        mError("arbgroup:%d, failed to remove from arb Update Hash", pNewGroup->vgId);
×
940
      }
941
    }
942
  }
943

944
  return ret;
13,110✔
945
}
946

947
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
55✔
948
  int    code = -1;
55✔
949
  size_t sz = 0;
55✔
950

951
  SMArbUpdateGroupBatchReq req = {0};
55✔
952
  if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
55!
953
    mError("arbgroup:0, arb failed to decode arb-update-group request");
×
954
    TAOS_RETURN(code);
×
955
  }
956

957
  SMnode *pMnode = pReq->info.node;
55✔
958
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
55✔
959
  if (pTrans == NULL) {
55!
960
    mError("arbgroup:0, failed to create update arbgroup trans, since %s", terrstr());
×
961
    tFreeSMArbUpdateGroupBatchReq(&req);
×
962
    TAOS_RETURN(terrno);
×
963
  }
964

965
  sz = taosArrayGetSize(req.updateArray);
55✔
966
  for (size_t i = 0; i < sz; i++) {
112✔
967
    SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
57✔
968
    SArbGroup         newGroup = {0};
57✔
969
    newGroup.vgId = pUpdateGroup->vgId;
57✔
970
    newGroup.dbUid = pUpdateGroup->dbUid;
57✔
971
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
171✔
972
      newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
114✔
973
      tstrncpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
114✔
974
    }
975

976
    newGroup.isSync = pUpdateGroup->isSync;
57✔
977
    newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
57✔
978
    tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
57✔
979
    newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
57✔
980
    newGroup.version = pUpdateGroup->version;
57✔
981
    newGroup.code = pUpdateGroup->code;
57✔
982
    newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
57✔
983

984
    mInfo(
57!
985
        "trans:%d, arbgroup:%d, used to update member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
986
        "%" PRId64,
987
        pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
988
        newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
989
        newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked,
990
        pUpdateGroup->code, pUpdateGroup->updateTimeMs);
991

992
    SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
57✔
993
    if (!pOldGroup) {
57!
994
      mError("trans:%d, arbgroup:%d, arb skip to update arbgroup, since no obj found", pTrans->id, newGroup.vgId);
×
995
      if (taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)) != 0) {
×
996
        mError("trans:%d, arbgroup:%d, failed to remove from arb Update Hash", pTrans->id, newGroup.vgId);
×
997
      }
998
      continue;
×
999
    }
1000

1001
    mndTransAddArbGroupId(pTrans, newGroup.vgId);
57✔
1002

1003
    if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
57!
1004
      mError("trans:%d, arbgroup:%d, failed to update arbgroup in set commit log since %s", pTrans->id, newGroup.vgId,
×
1005
             tstrerror(code));
1006
      goto _OVER;
×
1007
    }
1008

1009
    mInfo("trans:%d, arbgroup:%d, used to update member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
57!
1010
          pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
1011
          newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
1012
          newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
1013

1014
    sdbRelease(pMnode->pSdb, pOldGroup);
57✔
1015
  }
1016

1017
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
55✔
1018
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
14!
1019

1020
  code = 0;
14✔
1021

1022
_OVER:
55✔
1023
  if (code != 0) {
55✔
1024
    // failed to update arbgroup
1025
    mError("trans:%d, arbgroup:0, failed to update arbgroup since %s", pTrans->id, tstrerror(code));
41!
1026
    for (size_t i = 0; i < sz; i++) {
82✔
1027
      SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
41✔
1028
      if (taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)) != 0) {
41!
1029
        mError("trans:%d, arbgroup:%d failed to remove from arb Update Hash", pTrans->id, pUpdateGroup->vgId);
×
1030
      }
1031
    }
1032
  }
1033

1034
  mndTransDrop(pTrans);
55✔
1035
  tFreeSMArbUpdateGroupBatchReq(&req);
55✔
1036
  return code;
55✔
1037
}
1038

1039
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
208✔
1040
  (void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
208✔
1041
}
208✔
1042

1043
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
10✔
1044
  SArbGroupMember *pMember = &pGroup->members[index];
10✔
1045

1046
  pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
10✔
1047
  tstrncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
10✔
1048
  pGroup->assignedLeader.acked = false;
10✔
1049
}
10✔
1050

1051
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
3✔
1052
  pGroup->assignedLeader.dnodeId = 0;
3✔
1053
  (void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
3✔
1054
  pGroup->assignedLeader.acked = false;
3✔
1055
}
3✔
1056

1057
bool mndArbIsNeedUpdateTokenByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
180✔
1058
                                        SArbGroup *pNewGroup) {
1059
  bool             updateToken = false;
180✔
1060
  SArbGroupMember *pMember = NULL;
180✔
1061

1062
  (void)taosThreadMutexLock(&pGroup->mutex);
180✔
1063

1064
  int index = 0;
180✔
1065
  for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
266!
1066
    pMember = &pGroup->members[index];
266✔
1067
    if (pMember->info.dnodeId == dnodeId) {
266✔
1068
      break;
180✔
1069
    }
1070
    pMember = NULL;
86✔
1071
  }
1072

1073
  if (pMember == NULL) {
180!
1074
    mError("arbgroup:%d, arb token update check failed, dnodeId:%d not found", pRspMember->vgId, dnodeId);
×
1075
    goto _OVER;
×
1076
  }
1077

1078
  if (pMember->state.responsedHbSeq >= pRspMember->hbSeq) {
180✔
1079
    // skip
1080
    mError("arbgroup:%d, dnodeId:%d skip arb token update, heart beat seq expired, local:%d msg:%d", pRspMember->vgId,
3!
1081
           dnodeId, pMember->state.responsedHbSeq, pRspMember->hbSeq);
1082
    goto _OVER;
3✔
1083
  }
1084

1085
  // update hb state
1086
  pMember->state.responsedHbSeq = pRspMember->hbSeq;
177✔
1087
  pMember->state.lastHbMs = nowMs;
177✔
1088
  if (mndArbCheckToken(pMember->state.token, pRspMember->memberToken) == 0) {
177✔
1089
    // skip
1090
    mDebug("arbgroup:%d, dnodeId:%d skip arb token update, token matched", pRspMember->vgId, dnodeId);
52✔
1091
    goto _OVER;
52✔
1092
  }
1093

1094
  // update token
1095
  mndArbGroupDupObj(pGroup, pNewGroup);
125✔
1096
  tstrncpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
125✔
1097
  pNewGroup->isSync = false;
125✔
1098

1099
  bool resetAssigned = false;
125✔
1100
  if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) {
125✔
1101
    mndArbGroupResetAssignedLeader(pNewGroup);
3✔
1102
    resetAssigned = true;
3✔
1103
  }
1104

1105
  updateToken = true;
125✔
1106
  mInfo("arbgroup:%d, need to update token, by heartbeat from dnodeId:%d, resetAssigned:%d", pRspMember->vgId, dnodeId,
125!
1107
        resetAssigned);
1108

1109
_OVER:
×
1110
  (void)taosThreadMutexUnlock(&pGroup->mutex);
180✔
1111
  return updateToken;
180✔
1112
}
1113

1114
static int32_t mndArbUpdateByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
176✔
1115
  int64_t nowMs = taosGetTimestampMs();
176✔
1116
  size_t  size = taosArrayGetSize(memberArray);
176✔
1117
  SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
176✔
1118

1119
  for (size_t i = 0; i < size; i++) {
347✔
1120
    SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
171✔
1121

1122
    SArbGroup  newGroup = {0};
171✔
1123
    SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &pRspMember->vgId);
171✔
1124
    if (pGroup == NULL) {
171!
1125
      mError("arbgroup:%d failed to update arb token, not found", pRspMember->vgId);
×
1126
      continue;
×
1127
    }
1128

1129
    bool updateToken = mndArbIsNeedUpdateTokenByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
171✔
1130
    if (updateToken) {
171✔
1131
      if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
122!
1132
        mError("arbgroup:0, failed to push newGroup to updateArray, but continue at this hearbear");
×
1133
      }
1134
    }
1135

1136
    sdbRelease(pMnode->pSdb, pGroup);
171✔
1137
  }
1138

1139
  TAOS_CHECK_RETURN(mndArbPutBatchUpdateIntoWQ(pMnode, pUpdateArray));
176!
1140

1141
  taosArrayDestroy(pUpdateArray);
176✔
1142
  return 0;
176✔
1143
}
1144

1145
bool mndArbIsNeedUpdateSyncStatusByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
11✔
1146
                                             bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
1147
  bool updateIsSync = false;
11✔
1148

1149
  (void)taosThreadMutexLock(&pGroup->mutex);
11✔
1150

1151
  if (pGroup->assignedLeader.dnodeId != 0) {
11!
1152
    terrno = TSDB_CODE_SUCCESS;
×
1153
    mInfo("arbgroup:%d, skip to update arb sync, has assigned leader:%d", vgId, pGroup->assignedLeader.dnodeId);
×
1154
    goto _OVER;
×
1155
  }
1156

1157
  char *local0Token = pGroup->members[0].state.token;
11✔
1158
  char *local1Token = pGroup->members[1].state.token;
11✔
1159
  if (mndArbCheckToken(local0Token, member0Token) != 0 || mndArbCheckToken(local1Token, member1Token) != 0) {
11!
1160
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
3✔
1161
    mInfo("arbgroup:0, skip to update arb sync, memberToken mismatch local:[%s][%s], msg:[%s][%s]", local0Token,
3!
1162
          local1Token, member0Token, member1Token);
1163
    goto _OVER;
3✔
1164
  }
1165

1166
  if (pGroup->isSync != newIsSync) {
8!
1167
    mndArbGroupDupObj(pGroup, pNewGroup);
8✔
1168
    pNewGroup->isSync = newIsSync;
8✔
1169
    pNewGroup->code = code;
8✔
1170
    pNewGroup->updateTimeMs = taosGetTimestampMs();
8✔
1171

1172
    mInfo("arbgroup:%d, need to update isSync status, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync,
8!
1173
          pNewGroup->updateTimeMs);
1174
    updateIsSync = true;
8✔
1175
  }
1176

1177
_OVER:
×
1178
  (void)taosThreadMutexUnlock(&pGroup->mutex);
11✔
1179
  return updateIsSync;
11✔
1180
}
1181

1182
static int32_t mndArbUpdateByCheckSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token,
5✔
1183
                                       bool newIsSync, int32_t rsp_code) {
1184
  int32_t    code = 0;
5✔
1185
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
5✔
1186
  if (pGroup == NULL) {
5!
1187
    mError("arbgroup:%d, failed to update arb sync, not found", vgId);
×
1188
    code = -1;
×
1189
    if (terrno != 0) code = terrno;
×
1190
    TAOS_RETURN(code);
×
1191
  }
1192

1193
  SArbGroup newGroup = {0};
5✔
1194
  bool      updateIsSync =
1195
      mndArbIsNeedUpdateSyncStatusByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
5✔
1196
  if (updateIsSync) {
5!
1197
    if (mndArbPutUpdateArbIntoWQ(pMnode, &newGroup) != 0) {
5!
1198
      mError("arbgroup:%d, failed to pullup update arb sync, since %s", vgId, terrstr());
×
1199
    }
1200
  }
1201

1202
  sdbRelease(pMnode->pSdb, pGroup);
5✔
1203
  return 0;
5✔
1204
}
1205

1206
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
176✔
1207
  if (pRsp->contLen == 0) {
176!
1208
    mDebug("arbgroup:0, arb hb-rsp contLen is 0");
×
1209
    return 0;
×
1210
  }
1211

1212
  int32_t code = -1;
176✔
1213

1214
  SMnode *pMnode = pRsp->info.node;
176✔
1215
  SSdb   *pSdb = pMnode->pSdb;
176✔
1216

1217
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1218
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
176!
1219
    mError("arbgroup:0, failed to get arb token for arb-hb response");
×
1220
    TAOS_RETURN(code);
×
1221
  }
1222

1223
  SVArbHeartBeatRsp arbHbRsp = {0};
176✔
1224
  if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
176!
1225
    mInfo("arbgroup:0, arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1226
    TAOS_RETURN(code);
×
1227
  }
1228

1229
  if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
176!
1230
    mInfo("arbgroup:0, arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]",
×
1231
          arbHbRsp.dnodeId, arbToken, arbHbRsp.arbToken);
1232
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1233
    goto _OVER;
×
1234
  }
1235

1236
  TAOS_CHECK_GOTO(mndArbUpdateByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
176!
1237
  code = 0;
176✔
1238

1239
_OVER:
176✔
1240
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
176✔
1241
  return code;
176✔
1242
}
1243

1244
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
23✔
1245
  if (pRsp->contLen == 0) {
23✔
1246
    mDebug("arbgroup:0, arb check-sync-rsp contLen is 0");
18!
1247
    return 0;
18✔
1248
  }
1249

1250
  int32_t code = -1;
5✔
1251

1252
  SMnode *pMnode = pRsp->info.node;
5✔
1253
  SSdb   *pSdb = pMnode->pSdb;
5✔
1254

1255
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1256
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
5!
1257
    mError("arbgroup:0, failed to get arb token from vnode-arb-check-sync-rsp");
×
1258
    TAOS_RETURN(code);
×
1259
  }
1260

1261
  SVArbCheckSyncRsp syncRsp = {0};
5✔
1262
  if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
5!
1263
    mInfo("arbgroup:0, arb vnode-arb-check-sync-rsp deserialize failed, since:%s", tstrerror(pRsp->code));
×
1264
    if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
×
1265
      terrno = TSDB_CODE_SUCCESS;
×
1266
      return 0;
×
1267
    }
1268
    TAOS_RETURN(code);
×
1269
  }
1270

1271
  mInfo("arbgroup:%d, vnode-arb-check-sync-rsp received, QID:0x%" PRIx64 ":0x%" PRIx64 ", seqNum:%" PRIx64
5!
1272
        ", errCode:%d",
1273
        syncRsp.vgId, pRsp->info.traceId.rootId, pRsp->info.traceId.msgId, pRsp->info.seqNum, syncRsp.errCode);
1274
  if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
5!
1275
    mError("arbgroup:%d, skip update arb sync for arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
×
1276
           syncRsp.arbToken);
1277
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1278
    goto _OVER;
×
1279
  }
1280

1281
  bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
5✔
1282
  if ((code = mndArbUpdateByCheckSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
5!
1283
                                      syncRsp.errCode)) != 0) {
1284
    mError("arbgroup:%d, failed to update arb sync for since:%s", syncRsp.vgId, terrstr());
×
1285
    goto _OVER;
×
1286
  }
1287

1288
  code = 0;
5✔
1289

1290
_OVER:
5✔
1291
  tFreeSVArbCheckSyncRsp(&syncRsp);
5✔
1292
  TAOS_RETURN(code);
5✔
1293
}
1294

1295
bool mndArbIsNeedUpdateAssignedBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
9✔
1296
                                                   SArbGroup *pNewGroup) {
1297
  bool updateAssigned = false;
9✔
1298

1299
  (void)taosThreadMutexLock(&pGroup->mutex);
9✔
1300
  if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
9✔
1301
    mError("arbgroup:%d, skip update arb assigned for member token mismatch, local:[%s] msg:[%s]", vgId,
3!
1302
           pGroup->assignedLeader.token, memberToken);
1303
    goto _OVER;
3✔
1304
  }
1305

1306
  if (errcode != TSDB_CODE_SUCCESS) {
6✔
1307
    mError("arbgroup:%d, skip update arb assigned for since:%s", vgId, tstrerror(errcode));
3!
1308
    goto _OVER;
3✔
1309
  }
1310

1311
  if (pGroup->assignedLeader.acked == false) {
3!
1312
    mndArbGroupDupObj(pGroup, pNewGroup);
3✔
1313
    pNewGroup->isSync = false;
3✔
1314
    pNewGroup->assignedLeader.acked = true;
3✔
1315

1316
    mInfo("arbgroup:%d, arb received assigned ack", vgId);
3!
1317
    updateAssigned = true;
3✔
1318
    goto _OVER;
3✔
1319
  }
1320

1321
_OVER:
×
1322
  (void)taosThreadMutexUnlock(&pGroup->mutex);
9✔
1323
  return updateAssigned;
9✔
1324
}
1325

1326
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
×
1327
  if (pRsp->contLen == 0) {
×
1328
    mDebug("arbgroup:0, arb set-assigned-rsp contLen is 0");
×
1329
    return 0;
×
1330
  }
1331

1332
  int32_t code = -1;
×
1333

1334
  SMnode *pMnode = pRsp->info.node;
×
1335
  SSdb   *pSdb = pMnode->pSdb;
×
1336

1337
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1338
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1339
    mError("arbgroup:0, failed to get arb token for arb-set-assigned response");
×
1340
    TAOS_RETURN(code);
×
1341
  }
1342

1343
  SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
×
1344
  if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
×
1345
    mError("arbgroup:0, arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1346
    TAOS_RETURN(code);
×
1347
  }
1348

1349
  if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
×
1350
    mError("arbgroup:%d, skip update arb assigned for arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
×
1351
           arbToken, setAssignedRsp.arbToken);
1352
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1353
    goto _OVER;
×
1354
  }
1355

1356
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
×
1357
  if (!pGroup) {
×
1358
    mError("arbgroup:%d, failed to set arb assigned for since:%s", setAssignedRsp.vgId, terrstr());
×
1359
    code = -1;
×
1360
    if (terrno != 0) code = terrno;
×
1361
    goto _OVER;
×
1362
  }
1363

1364
  SArbGroup newGroup = {0};
×
1365
  bool      updateAssigned = mndArbIsNeedUpdateAssignedBySetAssignedLeader(
×
1366
      pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken, pRsp->code, &newGroup);
1367
  if (updateAssigned) {
×
1368
    if ((code = mndArbPutUpdateArbIntoWQ(pMnode, &newGroup)) != 0) {
×
1369
      mError("arbgroup:%d, failed to pullup update arb assigned since:%s", setAssignedRsp.vgId, tstrerror(code));
×
1370
      goto _OVER;
×
1371
    }
1372
  }
1373

1374
  code = 0;
×
1375

1376
_OVER:
×
1377
  tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
×
1378
  return code;
×
1379
}
1380

1381
static char *formatTimestamp(char *buf, int64_t val, int precision) {
×
1382
  time_t tt;
1383
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1384
    tt = (time_t)(val / 1000000);
×
1385
  }
1386
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
1387
    tt = (time_t)(val / 1000000000);
×
1388
  } else {
1389
    tt = (time_t)(val / 1000);
×
1390
  }
1391

1392
  struct tm tm;
1393
  if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
×
1394
    mError("failed to get local time");
×
1395
    return NULL;
×
1396
  }
1397
  size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
×
1398

1399
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1400
    sprintf(buf + pos, ".%06d", (int)(val % 1000000));
×
1401
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
1402
    sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
×
1403
  } else {
1404
    sprintf(buf + pos, ".%03d", (int)(val % 1000));
×
1405
  }
1406

1407
  return buf;
×
1408
}
1409

1410
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1411
  SMnode    *pMnode = pReq->info.node;
×
1412
  SSdb      *pSdb = pMnode->pSdb;
×
1413
  int32_t    numOfRows = 0;
×
1414
  int32_t    cols = 0;
×
1415
  SArbGroup *pGroup = NULL;
×
1416
  int32_t    code = 0;
×
1417
  int32_t    lino = 0;
×
1418

1419
  while (numOfRows < rows) {
×
1420
    pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
×
1421
    if (pShow->pIter == NULL) break;
×
1422

1423
    (void)taosThreadMutexLock(&pGroup->mutex);
×
1424

1425
    cols = 0;
×
1426
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1427
    SVgObj          *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
×
1428
    if (!pVgObj) {
×
1429
      (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1430
      sdbRelease(pSdb, pGroup);
×
1431
      continue;
×
1432
    }
1433
    char dbNameInGroup[TSDB_DB_FNAME_LEN];
1434
    tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
×
1435
    sdbRelease(pSdb, pVgObj);
×
1436

1437
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1438
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(dbNameInGroup), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1439
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pGroup, &lino, _OVER);
×
1440

1441
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1442
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false), pGroup, &lino, _OVER);
×
1443

1444
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
1445
      SArbGroupMember *pMember = &pGroup->members[i];
×
1446
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1447
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false), pGroup,
×
1448
                          &lino, _OVER);
1449
    }
1450

1451
    mInfo("arbgroup:%d, arb group sync:%d, code:%s, update time:%" PRId64, pGroup->vgId, pGroup->isSync,
×
1452
          tstrerror(pGroup->code), pGroup->updateTimeMs);
1453

1454
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1455
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
×
1456

1457
    char strCheckSyncCode[100] = {0};
×
1458
    char bufUpdateTime[40] = {0};
×
1459
    (void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
×
1460
    (void)tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
×
1461

1462
    char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
×
1463
    STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);
×
1464
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1465
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)checkSyncCode, false), pGroup, &lino, _OVER);
×
1466

1467
    if (pGroup->assignedLeader.dnodeId != 0) {
×
1468
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1469
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false),
×
1470
                          pGroup, &lino, _OVER);
1471

1472
      char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
×
1473
      STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1474
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1475
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)token, false), pGroup, &lino, _OVER);
×
1476

1477
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1478
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false),
×
1479
                          pGroup, &lino, _OVER);
1480
    } else {
1481
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1482
      colDataSetNULL(pColInfo, numOfRows);
×
1483

1484
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1485
      colDataSetNULL(pColInfo, numOfRows);
×
1486

1487
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1488
      colDataSetNULL(pColInfo, numOfRows);
×
1489
    }
1490

1491
    (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1492

1493
    numOfRows++;
×
1494
    sdbRelease(pSdb, pGroup);
×
1495
  }
1496

1497
_OVER:
×
1498
  if (code != 0) mError("arbgroup:0, failed to restrieve arb group at line:%d, since %s", lino, tstrerror(code));
×
1499
  pShow->numOfRows += numOfRows;
×
1500

1501
  return numOfRows;
×
1502
}
1503

1504
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter) {
×
1505
  SSdb *pSdb = pMnode->pSdb;
×
1506
  sdbCancelFetchByType(pSdb, pIter, SDB_ARBGROUP);
×
1507
}
×
1508

1509
int32_t mndGetArbGroupSize(SMnode *pMnode) {
×
1510
  SSdb *pSdb = pMnode->pSdb;
×
1511
  return sdbGetSize(pSdb, SDB_ARBGROUP);
×
1512
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc