• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3778

28 Mar 2025 06:51AM UTC coverage: 63.374% (+0.4%) from 62.934%
#3778

push

travis-ci

web-flow
fix(tdb): disable page recycling (#30529)

155771 of 313582 branches covered (49.67%)

Branch coverage included in aggregate %.

241569 of 313390 relevant lines covered (77.08%)

20705705.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.47
/source/dnode/mnode/impl/src/mndArbGroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndArbGroup.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "mndSync.h"
24

25
#define ARBGROUP_VER_NUMBER   1
26
#define ARBGROUP_RESERVE_SIZE 51
27

28
static SHashObj *arbUpdateHash = NULL;
29

30
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
31
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
32
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
33

34
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
35
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
36
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
37

38
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup);
39
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray);
40

41
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
42
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
43
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
44
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
45
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
46
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
47
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
48
static void    mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
49
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq);
50

51
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
148✔
52
  if (token1 == NULL || token2 == NULL) return -1;
148!
53
  if (strlen(token1) == 0 || strlen(token2) == 0) return -1;
148!
54
  return strncmp(token1, token2, TSDB_ARB_TOKEN_SIZE);
97✔
55
}
56

57
int32_t mndInitArbGroup(SMnode *pMnode) {
1,939✔
58
  int32_t   code = 0;
1,939✔
59
  SSdbTable table = {
1,939✔
60
      .sdbType = SDB_ARBGROUP,
61
      .keyType = SDB_KEY_INT32,
62
      .encodeFp = (SdbEncodeFp)mndArbGroupActionEncode,
63
      .decodeFp = (SdbDecodeFp)mndArbGroupActionDecode,
64
      .insertFp = (SdbInsertFp)mndArbGroupActionInsert,
65
      .updateFp = (SdbUpdateFp)mndArbGroupActionUpdate,
66
      .deleteFp = (SdbDeleteFp)mndArbGroupActionDelete,
67
  };
68

69
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
1,939✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer);
1,939✔
71
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
1,939✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
1,939✔
73
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
1,939✔
74
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
1,939✔
75
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_ASSIGN_LEADER, mndProcessAssignLeaderMsg);
1,939✔
76

77
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
1,939✔
78
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
1,939✔
79

80
  arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,939✔
81
  if (arbUpdateHash == NULL) {
1,939!
82
    code = terrno;
×
83
    TAOS_RETURN(code);
×
84
  }
85

86
  return sdbSetTable(pMnode->pSdb, table);
1,939✔
87
}
88

89
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
1,938✔
90

91
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
×
92
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
×
93
  if (pGroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
94
    terrno = TSDB_CODE_MND_ARBGROUP_NOT_EXIST;
×
95
  }
96
  return pGroup;
×
97
}
98

99
void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
×
100
  SSdb *pSdb = pMnode->pSdb;
×
101
  sdbRelease(pSdb, pGroup);
×
102
}
×
103

104
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
14✔
105
  if (pVgObj->replica != 2) {
14!
106
    TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
107
  }
108
  (void)memset(outGroup, 0, sizeof(SArbGroup));
14✔
109
  outGroup->dbUid = pVgObj->dbUid;
14✔
110
  outGroup->vgId = pVgObj->vgId;
14✔
111
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
42✔
112
    SArbGroupMember *pMember = &outGroup->members[i];
28✔
113
    pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
28✔
114
  }
115

116
  TAOS_RETURN(TSDB_CODE_SUCCESS);
14✔
117
}
118

119
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
41✔
120
  int32_t code = 0;
41✔
121
  int32_t lino = 0;
41✔
122
  terrno = TSDB_CODE_OUT_OF_MEMORY;
41✔
123

124
  int32_t  size = sizeof(SArbGroup) + ARBGROUP_RESERVE_SIZE;
41✔
125
  SSdbRaw *pRaw = sdbAllocRaw(SDB_ARBGROUP, ARBGROUP_VER_NUMBER, size);
41✔
126
  if (pRaw == NULL) goto _OVER;
41!
127

128
  int32_t dataPos = 0;
41✔
129
  SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
41!
130
  SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
41!
131
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
123✔
132
    SArbGroupMember *pMember = &pGroup->members[i];
82✔
133
    SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
82!
134
    SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
82!
135
  }
136
  SDB_SET_INT8(pRaw, dataPos, pGroup->isSync, _OVER)
41!
137

138
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
41✔
139
  SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
41!
140
  SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
41!
141
  SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
41!
142
  SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
41!
143
  SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
41!
144
  SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
41!
145

146
  SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
41!
147

148
  terrno = 0;
41✔
149

150
_OVER:
41✔
151
  if (terrno != 0) {
41!
152
    mError("arbgroup:%d, failed to encode to raw:%p since %s", pGroup->vgId, pRaw, terrstr());
×
153
    sdbFreeRaw(pRaw);
×
154
    return NULL;
×
155
  }
156

157
  mTrace("arbgroup:%d, encode to raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
41!
158
  return pRaw;
41✔
159
}
160

161
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
33✔
162
  int32_t code = 0;
33✔
163
  int32_t lino = 0;
33✔
164
  terrno = TSDB_CODE_OUT_OF_MEMORY;
33✔
165
  SSdbRow   *pRow = NULL;
33✔
166
  SArbGroup *pGroup = NULL;
33✔
167

168
  int8_t sver = 0;
33✔
169
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
33!
170

171
  if (sver != ARBGROUP_VER_NUMBER) {
33!
172
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
173
    goto _OVER;
×
174
  }
175

176
  pRow = sdbAllocRow(sizeof(SArbGroup));
33✔
177
  if (pRow == NULL) goto _OVER;
33!
178

179
  pGroup = sdbGetRowObj(pRow);
33✔
180
  if (pGroup == NULL) goto _OVER;
33!
181

182
  int32_t dataPos = 0;
33✔
183
  SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
33!
184
  SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
33!
185
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
99✔
186
    SArbGroupMember *pMember = &pGroup->members[i];
66✔
187
    SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
66!
188
    SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
66!
189

190
    pMember->state.nextHbSeq = 0;
66✔
191
    pMember->state.responsedHbSeq = -1;
66✔
192
    pMember->state.lastHbMs = 0;
66✔
193
  }
194
  SDB_GET_INT8(pRaw, dataPos, &pGroup->isSync, _OVER)
33!
195

196
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
33✔
197
  SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
33!
198
  SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
33!
199
  SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
33!
200
  SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
33!
201
  SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
33!
202
  SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
33!
203

204
  pGroup->mutexInited = false;
33✔
205

206
  SDB_GET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
33!
207

208
  terrno = 0;
33✔
209

210
_OVER:
33✔
211
  if (terrno != 0) {
33!
212
    mError("arbgroup:%d, failed to decode from raw:%p since %s", pGroup == NULL ? 0 : pGroup->vgId, pRaw, terrstr());
×
213
    taosMemoryFreeClear(pRow);
×
214
    return NULL;
×
215
  }
216

217
  mTrace("arbgroup:%d, decode from raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
33!
218
  return pRow;
33✔
219
}
220

221
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
8✔
222
  mTrace("arbgroup:%d, perform insert action, row:%p", pGroup->vgId, pGroup);
8!
223
  if (!pGroup->mutexInited && (taosThreadMutexInit(&pGroup->mutex, NULL) == 0)) {
8!
224
    pGroup->mutexInited = true;
8✔
225
  }
226

227
  return pGroup->mutexInited ? 0 : -1;
8!
228
}
229

230
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
32✔
231
  mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
32!
232
  if (pGroup->mutexInited) {
32✔
233
    (void)taosThreadMutexDestroy(&pGroup->mutex);
8✔
234
    pGroup->mutexInited = false;
8✔
235
  }
236
  return 0;
32✔
237
}
238

239
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
18✔
240
  mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
18!
241
  (void)taosThreadMutexLock(&pOld->mutex);
18✔
242

243
  if (pOld->version != pNew->version) {
18!
244
    mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
×
245
          " new version:%" PRId64,
246
          pOld->vgId, pOld, pNew, pOld->version, pNew->version);
247
    goto _OVER;
×
248
  }
249

250
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
54✔
251
    tstrncpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
36✔
252
  }
253
  pOld->isSync = pNew->isSync;
18✔
254
  pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
18✔
255
  tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
18✔
256
  pOld->assignedLeader.acked = pNew->assignedLeader.acked;
18✔
257
  pOld->version++;
18✔
258
  pOld->code = pNew->code;
18✔
259
  pOld->updateTimeMs = pNew->updateTimeMs;
18✔
260

261
  mInfo(
18!
262
      "arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
263
      "as-token:%s, as-acked:%d, version:%" PRId64,
264
      pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
265
      pOld->assignedLeader.dnodeId, pOld->assignedLeader.token, pOld->assignedLeader.acked, pOld->version);
266

267
_OVER:
×
268
  (void)taosThreadMutexUnlock(&pOld->mutex);
18✔
269

270
  if (mndIsLeader(pSdb->pMnode)) {
18!
271
    if (taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)) != 0) {
18✔
272
      mError("arbgroup:%d, failed to remove from arbUpdateHash", pOld->vgId);
6!
273
    }
274
  }
275
  return 0;
18✔
276
}
277

278
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
279
  int32_t  code = 0;
×
280
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
281
  if (pRedoRaw == NULL) {
×
282
    code = terrno;
×
283
    TAOS_RETURN(code);
×
284
  }
285
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
286
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
×
287
  return 0;
×
288
}
289

290
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
7✔
291
  int32_t  code = 0;
7✔
292
  SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
7✔
293
  if (pUndoRaw == NULL) {
7!
294
    code = terrno;
×
295
    TAOS_RETURN(code);
×
296
  }
297
  if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
7!
298
  if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
7!
299
  return 0;
7✔
300
}
301

302
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
19✔
303
  int32_t  code = 0;
19✔
304
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
19✔
305
  if (pCommitRaw == NULL) {
19!
306
    code = terrno;
×
307
    TAOS_RETURN(code);
×
308
  }
309
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
19!
310
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
19!
311
  return 0;
19✔
312
}
313

314
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
315
  int32_t  code = 0;
6✔
316
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
6✔
317
  if (pRedoRaw == NULL) {
6!
318
    code = terrno;
×
319
    TAOS_RETURN(code);
×
320
  }
321
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
6!
322
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
6!
323
  return 0;
6✔
324
}
325

326
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
327
  int32_t  code = 0;
×
328
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
329
  if (pRedoRaw == NULL) {
×
330
    code = terrno;
×
331
    TAOS_RETURN(code);
×
332
  }
333
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
334
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
335
  return 0;
×
336
}
337

338
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
339
  int32_t  code = 0;
6✔
340
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
6✔
341
  if (pCommitRaw == NULL) {
6!
342
    code = terrno;
×
343
    TAOS_RETURN(code);
×
344
  }
345
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
6!
346
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
6!
347
  return 0;
6✔
348
}
349

350
static void *mndBuildArbHeartBeatReq(int32_t *pContLen, char *arbToken, int32_t dnodeId, int64_t arbTerm,
56✔
351
                                     SArray *hbMembers) {
352
  SVArbHeartBeatReq req = {0};
56✔
353
  req.dnodeId = dnodeId;
56✔
354
  req.arbToken = arbToken;
56✔
355
  req.arbTerm = arbTerm;
56✔
356
  req.hbMembers = hbMembers;
56✔
357

358
  int32_t contLen = tSerializeSVArbHeartBeatReq(NULL, 0, &req);
56✔
359
  if (contLen <= 0) return NULL;
56!
360

361
  void *pReq = rpcMallocCont(contLen);
56✔
362
  if (pReq == NULL) return NULL;
56!
363

364
  if (tSerializeSVArbHeartBeatReq(pReq, contLen, &req) <= 0) {
56!
365
    rpcFreeCont(pReq);
×
366
    return NULL;
×
367
  }
368
  *pContLen = contLen;
56✔
369
  return pReq;
56✔
370
}
371

372
static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t arbTerm, SArray *hbMembers) {
56✔
373
  int32_t contLen = 0;
56✔
374
  void   *pHead = mndBuildArbHeartBeatReq(&contLen, arbToken, pDnode->id, arbTerm, hbMembers);
56✔
375
  if (pHead == NULL) {
56!
376
    mError("dnodeId:%d, failed to build arb-hb request", pDnode->id);
×
377
    return -1;
×
378
  }
379
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_HEARTBEAT, .pCont = pHead, .contLen = contLen};
56✔
380

381
  SEpSet epSet = mndGetDnodeEpset(pDnode);
56✔
382
  if (epSet.numOfEps == 0) {
56!
383
    mError("dnodeId:%d, failed to send arb-hb request to dnode since no epSet found", pDnode->id);
×
384
    rpcFreeCont(pHead);
×
385
    return -1;
×
386
  }
387

388
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
56✔
389
  if (code != 0) {
56!
390
    mError("dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code);
×
391
  } else {
392
    mTrace("dnodeId:%d, send arb-hb request to dnode", pDnode->id);
56!
393
  }
394
  return code;
56✔
395
}
396

397
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
45,582✔
398
  int32_t    code = 0;
45,582✔
399
  SMnode    *pMnode = pReq->info.node;
45,582✔
400
  SSdb      *pSdb = pMnode->pSdb;
45,582✔
401
  SArbGroup *pArbGroup = NULL;
45,582✔
402
  void      *pIter = NULL;
45,582✔
403

404
  SHashObj *pDnodeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
45,582✔
405

406
  // collect member of same dnode
407
  while (1) {
408
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
45,619✔
409
    if (pIter == NULL) break;
45,619✔
410

411
    (void)taosThreadMutexLock(&pArbGroup->mutex);
37✔
412

413
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
111✔
414
      SArbGroupMember *pMember = &pArbGroup->members[i];
74✔
415
      int32_t          dnodeId = pMember->info.dnodeId;
74✔
416
      void            *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
74✔
417
      SArray          *hbMembers = NULL;
74✔
418
      if (pObj) {
74✔
419
        hbMembers = *(SArray **)pObj;
18✔
420
      } else {
421
        hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
56✔
422
        if (taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES) != 0) {
56!
423
          mError("dnodeId:%d, failed to push hb member inty]o hash, but conitnue next at this timer round", dnodeId);
×
424
        }
425
      }
426
      SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
74✔
427
      if (taosArrayPush(hbMembers, &reqMember) == NULL) {
148!
428
        mError("dnodeId:%d, failed to push hb member, but conitnue next at this timer round", dnodeId);
×
429
      }
430
    }
431

432
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
37✔
433
    sdbRelease(pSdb, pArbGroup);
37✔
434
  }
435

436
  char arbToken[TSDB_ARB_TOKEN_SIZE];
437
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
45,582!
438
    mError("failed to get arb token for arb-hb timer");
×
439
    pIter = taosHashIterate(pDnodeHash, NULL);
×
440
    while (pIter) {
×
441
      SArray *hbMembers = *(SArray **)pIter;
×
442
      taosArrayDestroy(hbMembers);
×
443
      pIter = taosHashIterate(pDnodeHash, pIter);
×
444
    }
445
    taosHashCleanup(pDnodeHash);
×
446
    TAOS_RETURN(code);
×
447
  }
448

449
  int64_t nowMs = taosGetTimestampMs();
45,582✔
450

451
  pIter = NULL;
45,582✔
452
  while (1) {
56✔
453
    pIter = taosHashIterate(pDnodeHash, pIter);
45,638✔
454
    if (pIter == NULL) break;
45,638✔
455

456
    int32_t dnodeId = *(int32_t *)taosHashGetKey(pIter, NULL);
56✔
457
    SArray *hbMembers = *(SArray **)pIter;
56✔
458

459
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
56✔
460
    if (pDnode == NULL) {
56!
461
      mError("dnodeId:%d, timer failed to acquire dnode", dnodeId);
×
462
      taosArrayDestroy(hbMembers);
×
463
      continue;
×
464
    }
465

466
    int64_t mndTerm = mndGetTerm(pMnode);
56✔
467

468
    if (mndIsDnodeOnline(pDnode, nowMs)) {
56!
469
      int32_t sendCode = mndSendArbHeartBeatReq(pDnode, arbToken, mndTerm, hbMembers);
56✔
470
      if (TSDB_CODE_SUCCESS != sendCode) {
56!
471
        mError("dnodeId:%d, timer failed to send arb-hb request", dnodeId);
×
472
      }
473
    }
474

475
    mndReleaseDnode(pMnode, pDnode);
56✔
476
    taosArrayDestroy(hbMembers);
56✔
477
  }
478
  taosHashCleanup(pDnodeHash);
45,582✔
479

480
  return 0;
45,582✔
481
}
482

483
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
16✔
484
                                     char *member0Token, char *member1Token) {
485
  SVArbCheckSyncReq req = {0};
16✔
486
  req.arbToken = arbToken;
16✔
487
  req.arbTerm = arbTerm;
16✔
488
  req.member0Token = member0Token;
16✔
489
  req.member1Token = member1Token;
16✔
490

491
  int32_t reqLen = tSerializeSVArbCheckSyncReq(NULL, 0, &req);
16✔
492
  int32_t contLen = reqLen + sizeof(SMsgHead);
16✔
493

494
  if (contLen <= 0) return NULL;
16!
495
  SMsgHead *pHead = rpcMallocCont(contLen);
16✔
496
  if (pHead == NULL) return NULL;
16!
497

498
  pHead->contLen = htonl(contLen);
16✔
499
  pHead->vgId = htonl(vgId);
16✔
500
  if (tSerializeSVArbCheckSyncReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
16!
501
    rpcFreeCont(pHead);
×
502
    return NULL;
×
503
  }
504
  *pContLen = contLen;
16✔
505
  return pHead;
16✔
506
}
507

508
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
16✔
509
                                      char *member1Token) {
510
  int32_t code = 0;
16✔
511
  int32_t contLen = 0;
16✔
512
  void   *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
16✔
513
  if (!pHead) {
16!
514
    mError("vgId:%d, failed to build check-sync request", vgId);
×
515
    return -1;
×
516
  }
517
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_CHECK_SYNC, .pCont = pHead, .contLen = contLen};
16✔
518

519
  SEpSet epSet = mndGetVgroupEpsetById(pMnode, vgId);
16✔
520
  if (epSet.numOfEps == 0) {
16!
521
    mError("vgId:%d, failed to send check-sync request since no epSet found", vgId);
×
522
    rpcFreeCont(pHead);
×
523
    code = -1;
×
524
    if (terrno != 0) code = terrno;
×
525
    TAOS_RETURN(code);
×
526
  }
527

528
  code = tmsgSendReq(&epSet, &rpcMsg);
16✔
529
  if (code != 0) {
16!
530
    mError("vgId:%d, failed to send check-sync request since 0x%x", vgId, code);
×
531
  } else {
532
    mDebug("vgId:%d, send check-sync request", vgId);
16!
533
  }
534
  return code;
16✔
535
}
536

537
static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) {
54✔
538
  SArbGroupMember *pArbMember = &pArbGroup->members[index];
54✔
539
  return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutSec * 1000);
54✔
540
}
541

542
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
×
543
                                             char *memberToken, bool force) {
544
  SVArbSetAssignedLeaderReq req = {0};
×
545
  req.arbToken = arbToken;
×
546
  req.arbTerm = arbTerm;
×
547
  req.memberToken = memberToken;
×
548
  if (force) req.force = 1;
×
549

550
  int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
×
551
  int32_t contLen = reqLen + sizeof(SMsgHead);
×
552

553
  if (contLen <= 0) return NULL;
×
554
  SMsgHead *pHead = rpcMallocCont(contLen);
×
555
  if (pHead == NULL) return NULL;
×
556

557
  pHead->contLen = htonl(contLen);
×
558
  pHead->vgId = htonl(vgId);
×
559
  if (tSerializeSVArbSetAssignedLeaderReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
×
560
    rpcFreeCont(pHead);
×
561
    return NULL;
×
562
  }
563
  *pContLen = contLen;
×
564
  return pHead;
×
565
}
566

567
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
×
568
                                              int64_t term, char *memberToken, bool force) {
569
  int32_t code = 0;
×
570
  int32_t contLen = 0;
×
571
  void   *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force);
×
572
  if (!pHead) {
×
573
    mError("vgId:%d, failed to build set-assigned request", vgId);
×
574
    code = -1;
×
575
    if (terrno != 0) code = terrno;
×
576
    TAOS_RETURN(code);
×
577
  }
578
  SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
×
579

580
  SEpSet epSet = mndGetDnodeEpsetById(pMnode, dnodeId);
×
581
  if (epSet.numOfEps == 0) {
×
582
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since no epSet found", dnodeId, vgId);
×
583
    rpcFreeCont(pHead);
×
584
    code = -1;
×
585
    if (terrno != 0) code = terrno;
×
586
    TAOS_RETURN(code);
×
587
  }
588
  code = tmsgSendReq(&epSet, &rpcMsg);
×
589
  if (code != 0) {
×
590
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since 0x%x", dnodeId, vgId, code);
×
591
  } else {
592
    mInfo("dnodeId:%d vgId:%d, send arb-set-assigned request to dnode", dnodeId, vgId);
×
593
  }
594
  return code;
×
595
}
596

597
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
27✔
598
  *pOp = CHECK_SYNC_NONE;
27✔
599
  int32_t code = 0;
27✔
600

601
  int32_t vgId = pArbGroup->vgId;
27✔
602

603
  bool                member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
27✔
604
  bool                member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
27✔
605
  SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
27✔
606
  int32_t             currentAssignedDnodeId = pAssignedLeader->dnodeId;
27✔
607

608
  // 1. has assigned && no response => send req
609
  if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
27!
610
    *pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
2✔
611
    return;
2✔
612
  }
613

614
  // 2. both of the two members are timeout => skip
615
  if (member0IsTimeout && member1IsTimeout) {
25!
616
    return;
5✔
617
  }
618

619
  // 3. no member is timeout => check sync
620
  if (member0IsTimeout == false && member1IsTimeout == false) {
20!
621
    // no assigned leader and not sync
622
    if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
19!
623
      *pOp = CHECK_SYNC_CHECK_SYNC;
17✔
624
    }
625
    return;
19✔
626
  }
627

628
  // 4. one of the members is timeout => set assigned leader
629
  int32_t          candidateIndex = member0IsTimeout ? 1 : 0;
1✔
630
  SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
1✔
631

632
  // has assigned leader and dnodeId not match => skip
633
  if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
1!
634
    mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, assigned leader has been set to dnodeId:%d", vgId,
×
635
          pMember->info.dnodeId, currentAssignedDnodeId);
636
    return;
×
637
  }
638

639
  // not sync => skip
640
  if (pArbGroup->isSync == false) {
1!
641
    if (currentAssignedDnodeId == pMember->info.dnodeId) {
×
642
      mDebug("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
643
             pMember->info.dnodeId);
644
    } else {
645
      mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
646
            pMember->info.dnodeId);
647
    }
648
    //*pOp = CHECK_SYNC_CHECK_SYNC;
649
    return;
×
650
  }
651

652
  // is sync && no assigned leader => write to sdb
653
  mndArbGroupDupObj(pArbGroup, pNewGroup);
1✔
654
  mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
1✔
655
  *pOp = CHECK_SYNC_UPDATE;
1✔
656
}
657

658
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) {
×
659
  SMnode    *pMnode = pReq->info.node;
×
660
  int32_t    code = -1, lino = 0;
×
661
  SArray    *pArray = NULL;
×
662
  void      *pIter = NULL;
×
663
  SSdb      *pSdb = pMnode->pSdb;
×
664
  SArbGroup *pArbGroup = NULL;
×
665

666
  SAssignLeaderReq req = {0};
×
667
  if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) {
×
668
    code = TSDB_CODE_INVALID_MSG;
×
669
    goto _exit;
×
670
  }
671

672
  mInfo("begin to process assign leader");
×
673

674
  char arbToken[TSDB_ARB_TOKEN_SIZE];
675
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
×
676

677
  int64_t term = mndGetTerm(pMnode);
×
678
  if (term < 0) {
×
679
    mError("arb failed to get term since %s", terrstr());
×
680
    code = -1;
×
681
    if (terrno != 0) code = terrno;
×
682
    TAOS_RETURN(code);
×
683
  }
684

685
  while (1) {
×
686
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
×
687
    if (pIter == NULL) break;
×
688

689
    SArbGroup arbGroupDup = {0};
×
690

691
    (void)taosThreadMutexLock(&pArbGroup->mutex);
×
692
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
×
693
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
×
694

695
    sdbRelease(pSdb, pArbGroup);
×
696

697
    int32_t dnodeId = 0;
×
698
    for (int32_t i = 0; i < 2; i++) {
×
699
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId);
×
700
      bool       isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
701
      mndReleaseDnode(pMnode, pDnode);
×
702
      if (isonline) {
×
703
        dnodeId = arbGroupDup.members[i].info.dnodeId;
×
704
        break;
×
705
      }
706
    }
707

708
    (void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true);
×
709
    mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId);
×
710
  }
711

712
  code = 0;
×
713

714
  // auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen);
715

716
_exit:
×
717
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
718
    mError("failed to assign leader since %s", tstrerror(code));
×
719
  }
720

721
  tFreeSAssignLeaderReq(&req);
×
722
  TAOS_RETURN(code);
×
723
}
724

725
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
30,163✔
726
  int32_t    code = 0, lino = 0;
30,163✔
727
  SMnode    *pMnode = pReq->info.node;
30,163✔
728
  SSdb      *pSdb = pMnode->pSdb;
30,163✔
729
  SArbGroup *pArbGroup = NULL;
30,163✔
730
  void      *pIter = NULL;
30,163✔
731
  SArray    *pUpdateArray = NULL;
30,163✔
732

733
  char arbToken[TSDB_ARB_TOKEN_SIZE];
734
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
30,163!
735

736
  int64_t term = mndGetTerm(pMnode);
30,163✔
737
  if (term < 0) {
30,163!
738
    mError("arb failed to get term since %s", terrstr());
×
739
    code = -1;
×
740
    if (terrno != 0) code = terrno;
×
741
    TAOS_RETURN(code);
×
742
  }
743

744
  int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
30,163✔
745
  int64_t nowMs = taosGetTimestampMs();
30,163✔
746
  if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
30,163✔
747
    mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs,
1,390!
748
          nowMs);
749
    return 0;
1,390✔
750
  }
751

752
  while (1) {
23✔
753
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
28,796✔
754
    if (pIter == NULL) break;
28,796✔
755

756
    SArbGroup arbGroupDup = {0};
23✔
757

758
    (void)taosThreadMutexLock(&pArbGroup->mutex);
23✔
759
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
23✔
760
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
23✔
761

762
    sdbRelease(pSdb, pArbGroup);
23✔
763

764
    ECheckSyncOp op = CHECK_SYNC_NONE;
23✔
765
    SArbGroup    newGroup = {0};
23✔
766
    mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
23✔
767

768
    int32_t             vgId = arbGroupDup.vgId;
23✔
769
    SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
23✔
770
    int32_t             assgndDnodeId = pAssgndLeader->dnodeId;
23✔
771

772
    switch (op) {
23!
773
      case CHECK_SYNC_NONE:
7✔
774
        mTrace("vgId:%d, arb skip to send msg by check sync", vgId);
7!
775
        break;
7✔
776
      case CHECK_SYNC_SET_ASSIGNED_LEADER:
×
777
        (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false);
×
778
        mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
×
779
        break;
×
780
      case CHECK_SYNC_CHECK_SYNC:
16✔
781
        (void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
16✔
782
                                     arbGroupDup.members[1].state.token);
783
        mInfo("vgId:%d, arb send check sync request", vgId);
16!
784
        break;
16✔
785
      case CHECK_SYNC_UPDATE:
×
786
        if (!pUpdateArray) {
×
787
          pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
×
788
          if (!pUpdateArray) {
×
789
            TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
790
          }
791
        }
792

793
        if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
×
794
          TAOS_CHECK_EXIT(terrno);
×
795
        }
796
        break;
×
797
      default:
×
798
        mError("vgId:%d, arb unknown check sync op:%d", vgId, op);
×
799
        break;
×
800
    }
801
  }
802

803
  TAOS_CHECK_EXIT(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
28,773!
804

805
_exit:
28,773✔
806
  if (code != 0) {
28,773!
807
    mError("failed to check sync at line %d since %s", lino, terrstr());
×
808
  }
809

810
  taosArrayDestroy(pUpdateArray);
28,773✔
811
  return 0;
28,773✔
812
}
813

814
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
10✔
815
  SMArbUpdateGroupBatchReq req = {0};
10✔
816
  req.updateArray = updateArray;
10✔
817

818
  int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
10✔
819
  if (contLen <= 0) return NULL;
10!
820
  SMsgHead *pHead = rpcMallocCont(contLen);
10✔
821
  if (pHead == NULL) return NULL;
10!
822

823
  if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
10!
824
    rpcFreeCont(pHead);
×
825
    return NULL;
×
826
  }
827
  *pContLen = contLen;
10✔
828
  return pHead;
10✔
829
}
830

831
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
12✔
832
  outGroup->vgId = pGroup->vgId;
12✔
833
  outGroup->dbUid = pGroup->dbUid;
12✔
834
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
36✔
835
    outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
24✔
836
    outGroup->members[i].token = pGroup->members[i].state.token;  // just copy the pointer
24✔
837
  }
838
  outGroup->isSync = pGroup->isSync;
12✔
839
  outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
12✔
840
  outGroup->assignedLeader.token = pGroup->assignedLeader.token;  // just copy the pointer
12✔
841
  outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
12✔
842
  outGroup->version = pGroup->version;
12✔
843
  outGroup->code = pGroup->code;
12✔
844
  outGroup->updateTimeMs = pGroup->updateTimeMs;
12✔
845
}
12✔
846

847
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
3✔
848
  if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
3✔
849
    mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
1!
850
    return 0;
1✔
851
  }
852

853
  int32_t ret = -1;
2✔
854

855
  SMArbUpdateGroup newGroup = {0};
2✔
856
  mndInitArbUpdateGroup(pNewGroup, &newGroup);
2✔
857

858
  SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
2✔
859
  if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
2!
860

861
  int32_t contLen = 0;
2✔
862
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
2✔
863
  if (!pHead) {
2!
864
    mError("failed to build arb-update-group request");
×
865
    goto _OVER;
×
866
  }
867

868
  SRpcMsg rpcMsg = {
2✔
869
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
870
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
2✔
871
  if (ret != 0) goto _OVER;
2!
872

873
  if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
2!
874

875
_OVER:
2✔
876
  taosArrayDestroy(pArray);
2✔
877
  return ret;
2✔
878
}
879

880
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray) {
28,829✔
881
  int32_t ret = -1;
28,829✔
882

883
  size_t  sz = taosArrayGetSize(newGroupArray);
28,829✔
884
  SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
28,829✔
885
  for (size_t i = 0; i < sz; i++) {
28,883✔
886
    SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
54✔
887
    if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
54✔
888
      mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
44!
889
      continue;
44✔
890
    }
891

892
    SMArbUpdateGroup newGroup = {0};
10✔
893
    mndInitArbUpdateGroup(pNewGroup, &newGroup);
10✔
894

895
    if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
10!
896
    if (taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0) != 0) goto _OVER;
10!
897
  }
898

899
  if (taosArrayGetSize(pArray) == 0) {
28,829✔
900
    ret = 0;
28,821✔
901
    goto _OVER;
28,821✔
902
  }
903

904
  int32_t contLen = 0;
8✔
905
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
8✔
906
  if (!pHead) {
8!
907
    mError("failed to build arb-update-group request");
×
908
    goto _OVER;
×
909
  }
910

911
  SRpcMsg rpcMsg = {
8✔
912
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
913
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
8✔
914

915
_OVER:
28,829✔
916
  taosArrayDestroy(pArray);
28,829✔
917

918
  if (ret != 0) {
28,829!
919
    for (size_t i = 0; i < sz; i++) {
×
920
      SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
921
      if (taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != 0) {
×
922
        mError("failed to remove vgId:%d from arbUpdateHash", pNewGroup->vgId);
×
923
      }
924
    }
925
  }
926

927
  return ret;
28,829✔
928
}
929

930
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
10✔
931
  int    code = -1;
10✔
932
  size_t sz = 0;
10✔
933

934
  SMArbUpdateGroupBatchReq req = {0};
10✔
935
  if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
10!
936
    mError("arb failed to decode arb-update-group request");
×
937
    TAOS_RETURN(code);
×
938
  }
939

940
  SMnode *pMnode = pReq->info.node;
10✔
941
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
10✔
942
  if (pTrans == NULL) {
10!
943
    mError("failed to update arbgroup in create trans, since %s", terrstr());
×
944
    goto _OVER;
×
945
  }
946

947
  sz = taosArrayGetSize(req.updateArray);
10✔
948
  for (size_t i = 0; i < sz; i++) {
22✔
949
    SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
12✔
950
    SArbGroup         newGroup = {0};
12✔
951
    newGroup.vgId = pUpdateGroup->vgId;
12✔
952
    newGroup.dbUid = pUpdateGroup->dbUid;
12✔
953
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
36✔
954
      newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
24✔
955
      tstrncpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
24✔
956
    }
957

958
    newGroup.isSync = pUpdateGroup->isSync;
12✔
959
    newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
12✔
960
    tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
12✔
961
    newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
12✔
962
    newGroup.version = pUpdateGroup->version;
12✔
963
    newGroup.code = pUpdateGroup->code;
12✔
964
    newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
12✔
965

966
    mInfo(
12!
967
        "trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
968
        "%" PRId64,
969
        pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
970
        newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
971
        newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked,
972
        pUpdateGroup->code, pUpdateGroup->updateTimeMs);
973

974
    SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
12✔
975
    if (!pOldGroup) {
12!
976
      mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
×
977
      if (taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)) != 0) {
×
978
        mError("failed to remove vgId:%d from arbUpdateHash", newGroup.vgId);
×
979
      }
980
      continue;
×
981
    }
982

983
    mndTransAddArbGroupId(pTrans, newGroup.vgId);
12✔
984

985
    if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
12!
986
      mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
×
987
             terrstr());
988
      goto _OVER;
×
989
    }
990

991
    mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
12!
992
          pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
993
          newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
994
          newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
995

996
    sdbRelease(pMnode->pSdb, pOldGroup);
12✔
997
  }
998

999
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
10!
1000
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
10!
1001

1002
  code = 0;
10✔
1003

1004
_OVER:
10✔
1005
  if (code != 0) {
10!
1006
    // failed to update arbgroup
1007
    for (size_t i = 0; i < sz; i++) {
×
1008
      SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
×
1009
      if (taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)) != 0) {
×
1010
        mError("failed to remove vgId:%d from arbUpdateHash", pUpdateGroup->vgId);
×
1011
      }
1012
    }
1013
  }
1014

1015
  mndTransDrop(pTrans);
10✔
1016
  tFreeSMArbUpdateGroupBatchReq(&req);
10✔
1017
  return code;
10✔
1018
}
1019

1020
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
84✔
1021
  (void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
84✔
1022
}
84✔
1023

1024
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
1✔
1025
  SArbGroupMember *pMember = &pGroup->members[index];
1✔
1026

1027
  pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
1✔
1028
  tstrncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
1✔
1029
  pGroup->assignedLeader.acked = false;
1✔
1030
}
1✔
1031

1032
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
1✔
1033
  pGroup->assignedLeader.dnodeId = 0;
1✔
1034
  (void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
1✔
1035
  pGroup->assignedLeader.acked = false;
1✔
1036
}
1✔
1037

1038
bool mndCheckArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
77✔
1039
                                 SArbGroup *pNewGroup) {
1040
  bool             updateToken = false;
77✔
1041
  SArbGroupMember *pMember = NULL;
77✔
1042

1043
  (void)taosThreadMutexLock(&pGroup->mutex);
77✔
1044

1045
  int index = 0;
77✔
1046
  for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
114!
1047
    pMember = &pGroup->members[index];
114✔
1048
    if (pMember->info.dnodeId == dnodeId) {
114✔
1049
      break;
77✔
1050
    }
1051
    pMember = NULL;
37✔
1052
  }
1053

1054
  if (pMember == NULL) {
77!
1055
    mInfo("dnodeId:%d vgId:%d, arb token update check failed, no obj found", dnodeId, pRspMember->vgId);
×
1056
    goto _OVER;
×
1057
  }
1058

1059
  if (pMember->state.responsedHbSeq >= pRspMember->hbSeq) {
77✔
1060
    // skip
1061
    mInfo("dnodeId:%d vgId:%d, skip arb token update, heart beat seq expired, local:%d msg:%d", dnodeId,
1!
1062
          pRspMember->vgId, pMember->state.responsedHbSeq, pRspMember->hbSeq);
1063
    goto _OVER;
1✔
1064
  }
1065

1066
  // update hb state
1067
  pMember->state.responsedHbSeq = pRspMember->hbSeq;
76✔
1068
  pMember->state.lastHbMs = nowMs;
76✔
1069
  if (mndArbCheckToken(pMember->state.token, pRspMember->memberToken) == 0) {
76✔
1070
    // skip
1071
    mDebug("dnodeId:%d vgId:%d, skip arb token update, token matched", dnodeId, pRspMember->vgId);
21✔
1072
    goto _OVER;
21✔
1073
  }
1074

1075
  // update token
1076
  mndArbGroupDupObj(pGroup, pNewGroup);
55✔
1077
  tstrncpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
55✔
1078
  pNewGroup->isSync = false;
55✔
1079

1080
  bool resetAssigned = false;
55✔
1081
  if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) {
55✔
1082
    mndArbGroupResetAssignedLeader(pNewGroup);
1✔
1083
    resetAssigned = true;
1✔
1084
  }
1085

1086
  updateToken = true;
55✔
1087
  mInfo("dnodeId:%d vgId:%d, arb token updating, resetAssigned:%d", dnodeId, pRspMember->vgId, resetAssigned);
55!
1088

1089
_OVER:
×
1090
  (void)taosThreadMutexUnlock(&pGroup->mutex);
77✔
1091
  return updateToken;
77✔
1092
}
1093

1094
static int32_t mndUpdateArbGroupsByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
56✔
1095
  int64_t nowMs = taosGetTimestampMs();
56✔
1096
  size_t  size = taosArrayGetSize(memberArray);
56✔
1097
  SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
56✔
1098

1099
  for (size_t i = 0; i < size; i++) {
130✔
1100
    SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
74✔
1101

1102
    SArbGroup  newGroup = {0};
74✔
1103
    SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &pRspMember->vgId);
74✔
1104
    if (pGroup == NULL) {
74!
1105
      mInfo("failed to update arb token, vgId:%d not found", pRspMember->vgId);
×
1106
      continue;
×
1107
    }
1108

1109
    bool updateToken = mndCheckArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
74✔
1110
    if (updateToken) {
74✔
1111
      if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
54!
1112
        mError("failed to push newGroup to updateArray, but continue at this hearbear");
×
1113
      }
1114
    }
1115

1116
    sdbRelease(pMnode->pSdb, pGroup);
74✔
1117
  }
1118

1119
  TAOS_CHECK_RETURN(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
56!
1120

1121
  taosArrayDestroy(pUpdateArray);
56✔
1122
  return 0;
56✔
1123
}
1124

1125
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
5✔
1126
                                  bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
1127
  bool updateIsSync = false;
5✔
1128

1129
  (void)taosThreadMutexLock(&pGroup->mutex);
5✔
1130

1131
  if (pGroup->assignedLeader.dnodeId != 0) {
5!
1132
    terrno = TSDB_CODE_SUCCESS;
×
1133
    mInfo("skip to update arb sync, vgId:%d has assigned leader:%d", vgId, pGroup->assignedLeader.dnodeId);
×
1134
    goto _OVER;
×
1135
  }
1136

1137
  char *local0Token = pGroup->members[0].state.token;
5✔
1138
  char *local1Token = pGroup->members[1].state.token;
5✔
1139
  if (mndArbCheckToken(local0Token, member0Token) != 0 || mndArbCheckToken(local1Token, member1Token) != 0) {
5!
1140
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
1✔
1141
    mInfo("skip to update arb sync, memberToken mismatch local:[%s][%s], msg:[%s][%s]", local0Token, local1Token,
1!
1142
          member0Token, member1Token);
1143
    goto _OVER;
1✔
1144
  }
1145

1146
  if (pGroup->isSync != newIsSync) {
4!
1147
    mndArbGroupDupObj(pGroup, pNewGroup);
4✔
1148
    pNewGroup->isSync = newIsSync;
4✔
1149
    pNewGroup->code = code;
4✔
1150
    pNewGroup->updateTimeMs = taosGetTimestampMs();
4✔
1151

1152
    mInfo("vgId:%d, arb isSync updating, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync, pNewGroup->updateTimeMs);
4!
1153
    updateIsSync = true;
4✔
1154
  }
1155

1156
_OVER:
×
1157
  (void)taosThreadMutexUnlock(&pGroup->mutex);
5✔
1158
  return updateIsSync;
5✔
1159
}
1160

1161
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync,
3✔
1162
                                int32_t rsp_code) {
1163
  int32_t    code = 0;
3✔
1164
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
3✔
1165
  if (pGroup == NULL) {
3!
1166
    mInfo("failed to update arb sync, vgId:%d not found", vgId);
×
1167
    code = -1;
×
1168
    if (terrno != 0) code = terrno;
×
1169
    TAOS_RETURN(code);
×
1170
  }
1171

1172
  SArbGroup newGroup = {0};
3✔
1173
  bool      updateIsSync =
1174
      mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
3✔
1175
  if (updateIsSync) {
3!
1176
    if (mndUpdateArbGroup(pMnode, &newGroup) != 0) {
3!
1177
      mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
×
1178
    }
1179
  }
1180

1181
  sdbRelease(pMnode->pSdb, pGroup);
3✔
1182
  return 0;
3✔
1183
}
1184

1185
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
56✔
1186
  if (pRsp->contLen == 0) {
56!
1187
    mDebug("arb hb-rsp contLen is 0");
×
1188
    return 0;
×
1189
  }
1190

1191
  int32_t code = -1;
56✔
1192

1193
  SMnode *pMnode = pRsp->info.node;
56✔
1194
  SSdb   *pSdb = pMnode->pSdb;
56✔
1195

1196
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1197
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
56!
1198
    mError("failed to get arb token for arb-hb response");
×
1199
    TAOS_RETURN(code);
×
1200
  }
1201

1202
  SVArbHeartBeatRsp arbHbRsp = {0};
56✔
1203
  if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
56!
1204
    mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1205
    TAOS_RETURN(code);
×
1206
  }
1207

1208
  if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
56!
1209
    mInfo("arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]", arbHbRsp.dnodeId,
×
1210
          arbToken, arbHbRsp.arbToken);
1211
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1212
    goto _OVER;
×
1213
  }
1214

1215
  TAOS_CHECK_GOTO(mndUpdateArbGroupsByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
56!
1216
  code = 0;
56✔
1217

1218
_OVER:
56✔
1219
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
56✔
1220
  return code;
56✔
1221
}
1222

1223
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
16✔
1224
  if (pRsp->contLen == 0) {
16✔
1225
    mDebug("arb check-sync-rsp contLen is 0");
13!
1226
    return 0;
13✔
1227
  }
1228

1229
  int32_t code = -1;
3✔
1230

1231
  SMnode *pMnode = pRsp->info.node;
3✔
1232
  SSdb   *pSdb = pMnode->pSdb;
3✔
1233

1234
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1235
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
3!
1236
    mError("failed to get arb token for arb-check-sync response");
×
1237
    TAOS_RETURN(code);
×
1238
  }
1239

1240
  SVArbCheckSyncRsp syncRsp = {0};
3✔
1241
  if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
3!
1242
    mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1243
    if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
×
1244
      terrno = TSDB_CODE_SUCCESS;
×
1245
      return 0;
×
1246
    }
1247
    TAOS_RETURN(code);
×
1248
  }
1249

1250
  mInfo("vgId:%d, vnode-arb-check-sync-rsp received, errCode:%d", syncRsp.vgId, syncRsp.errCode);
3!
1251
  if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
3!
1252
    mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
×
1253
          syncRsp.arbToken);
1254
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1255
    goto _OVER;
×
1256
  }
1257

1258
  bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
3✔
1259
  if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
3!
1260
                               syncRsp.errCode)) != 0) {
1261
    mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
×
1262
    goto _OVER;
×
1263
  }
1264

1265
  code = 0;
3✔
1266

1267
_OVER:
3✔
1268
  tFreeSVArbCheckSyncRsp(&syncRsp);
3✔
1269
  TAOS_RETURN(code);
3✔
1270
}
1271

1272
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
3✔
1273
                                          SArbGroup *pNewGroup) {
1274
  bool updateAssigned = false;
3✔
1275

1276
  (void)taosThreadMutexLock(&pGroup->mutex);
3✔
1277
  if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
3✔
1278
    mInfo("skip update arb assigned for vgId:%d, member token mismatch, local:[%s] msg:[%s]", vgId,
1!
1279
          pGroup->assignedLeader.token, memberToken);
1280
    goto _OVER;
1✔
1281
  }
1282

1283
  if (errcode != TSDB_CODE_SUCCESS) {
2✔
1284
    mInfo("skip update arb assigned for vgId:%d, since:%s", vgId, tstrerror(errcode));
1!
1285
    goto _OVER;
1✔
1286
  }
1287

1288
  if (pGroup->assignedLeader.acked == false) {
1!
1289
    mndArbGroupDupObj(pGroup, pNewGroup);
1✔
1290
    pNewGroup->isSync = false;
1✔
1291
    pNewGroup->assignedLeader.acked = true;
1✔
1292

1293
    mInfo("vgId:%d, arb received assigned ack", vgId);
1!
1294
    updateAssigned = true;
1✔
1295
    goto _OVER;
1✔
1296
  }
1297

1298
_OVER:
×
1299
  (void)taosThreadMutexUnlock(&pGroup->mutex);
3✔
1300
  return updateAssigned;
3✔
1301
}
1302

1303
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
×
1304
  if (pRsp->contLen == 0) {
×
1305
    mDebug("arb set-assigned-rsp contLen is 0");
×
1306
    return 0;
×
1307
  }
1308

1309
  int32_t code = -1;
×
1310

1311
  SMnode *pMnode = pRsp->info.node;
×
1312
  SSdb   *pSdb = pMnode->pSdb;
×
1313

1314
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1315
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1316
    mError("failed to get arb token for arb-set-assigned response");
×
1317
    TAOS_RETURN(code);
×
1318
  }
1319

1320
  SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
×
1321
  if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
×
1322
    mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1323
    TAOS_RETURN(code);
×
1324
  }
1325

1326
  if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
×
1327
    mInfo("skip update arb assigned for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
×
1328
          arbToken, setAssignedRsp.arbToken);
1329
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1330
    goto _OVER;
×
1331
  }
1332

1333
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
×
1334
  if (!pGroup) {
×
1335
    mError("failed to set arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
×
1336
    code = -1;
×
1337
    if (terrno != 0) code = terrno;
×
1338
    goto _OVER;
×
1339
  }
1340

1341
  SArbGroup newGroup = {0};
×
1342
  bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
×
1343
                                                             pRsp->code, &newGroup);
1344
  if (updateAssigned) {
×
1345
    if ((code = mndUpdateArbGroup(pMnode, &newGroup)) != 0) {
×
1346
      mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
×
1347
      goto _OVER;
×
1348
    }
1349
  }
1350

1351
  code = 0;
×
1352

1353
_OVER:
×
1354
  tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
×
1355
  return code;
×
1356
}
1357

1358
static char *formatTimestamp(char *buf, int64_t val, int precision) {
×
1359
  time_t tt;
1360
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1361
    tt = (time_t)(val / 1000000);
×
1362
  }
1363
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
1364
    tt = (time_t)(val / 1000000000);
×
1365
  } else {
1366
    tt = (time_t)(val / 1000);
×
1367
  }
1368

1369
  struct tm tm;
1370
  if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
×
1371
    mError("failed to get local time");
×
1372
    return NULL;
×
1373
  }
1374
  size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
×
1375

1376
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1377
    sprintf(buf + pos, ".%06d", (int)(val % 1000000));
×
1378
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
1379
    sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
×
1380
  } else {
1381
    sprintf(buf + pos, ".%03d", (int)(val % 1000));
×
1382
  }
1383

1384
  return buf;
×
1385
}
1386

1387
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1388
  SMnode    *pMnode = pReq->info.node;
×
1389
  SSdb      *pSdb = pMnode->pSdb;
×
1390
  int32_t    numOfRows = 0;
×
1391
  int32_t    cols = 0;
×
1392
  SArbGroup *pGroup = NULL;
×
1393
  int32_t    code = 0;
×
1394
  int32_t    lino = 0;
×
1395

1396
  while (numOfRows < rows) {
×
1397
    pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
×
1398
    if (pShow->pIter == NULL) break;
×
1399

1400
    (void)taosThreadMutexLock(&pGroup->mutex);
×
1401

1402
    cols = 0;
×
1403
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1404
    SVgObj          *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
×
1405
    if (!pVgObj) {
×
1406
      (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1407
      sdbRelease(pSdb, pGroup);
×
1408
      continue;
×
1409
    }
1410
    char dbNameInGroup[TSDB_DB_FNAME_LEN];
1411
    tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
×
1412
    sdbRelease(pSdb, pVgObj);
×
1413

1414
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1415
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(dbNameInGroup), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1416
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pGroup, &lino, _OVER);
×
1417

1418
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1419
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false), pGroup, &lino, _OVER);
×
1420

1421
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
1422
      SArbGroupMember *pMember = &pGroup->members[i];
×
1423
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1424
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false), pGroup,
×
1425
                          &lino, _OVER);
1426
    }
1427

1428
    mInfo("vgId:%d, arb group sync:%d, code:%s, update time:%" PRId64, pGroup->vgId, pGroup->isSync,
×
1429
          tstrerror(pGroup->code), pGroup->updateTimeMs);
1430

1431
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1432
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
×
1433

1434
    char strCheckSyncCode[100] = {0};
×
1435
    char bufUpdateTime[40] = {0};
×
1436
    (void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
×
1437
    (void)tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
×
1438

1439
    char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
×
1440
    STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);
×
1441
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1442
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)checkSyncCode, false), pGroup, &lino, _OVER);
×
1443

1444
    if (pGroup->assignedLeader.dnodeId != 0) {
×
1445
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1446
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false),
×
1447
                          pGroup, &lino, _OVER);
1448

1449
      char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
×
1450
      STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1451
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1452
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)token, false), pGroup, &lino, _OVER);
×
1453

1454
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1455
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false),
×
1456
                          pGroup, &lino, _OVER);
1457
    } else {
1458
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1459
      colDataSetNULL(pColInfo, numOfRows);
×
1460

1461
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1462
      colDataSetNULL(pColInfo, numOfRows);
×
1463

1464
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1465
      colDataSetNULL(pColInfo, numOfRows);
×
1466
    }
1467

1468
    (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1469

1470
    numOfRows++;
×
1471
    sdbRelease(pSdb, pGroup);
×
1472
  }
1473

1474
_OVER:
×
1475
  if (code != 0) mError("failed to restrieve arb group at line:%d, since %s", lino, tstrerror(code));
×
1476
  pShow->numOfRows += numOfRows;
×
1477

1478
  return numOfRows;
×
1479
}
1480

1481
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter) {
×
1482
  SSdb *pSdb = pMnode->pSdb;
×
1483
  sdbCancelFetchByType(pSdb, pIter, SDB_ARBGROUP);
×
1484
}
×
1485

1486
int32_t mndGetArbGroupSize(SMnode *pMnode) {
×
1487
  SSdb *pSdb = pMnode->pSdb;
×
1488
  return sdbGetSize(pSdb, SDB_ARBGROUP);
×
1489
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc