• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4693

29 Aug 2025 06:36AM UTC coverage: 58.007% (-1.1%) from 59.151%
#4693

push

travis-ci

web-flow
fix(gpt): fix race-condition in preparing tmp files (#32800)

132676 of 291873 branches covered (45.46%)

Branch coverage included in aggregate %.

5 of 34 new or added lines in 6 files covered. (14.71%)

4288 existing lines in 199 files now uncovered.

201114 of 283561 relevant lines covered (70.92%)

5433357.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.51
/source/dnode/mnode/impl/src/mndArbGroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndArbGroup.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "mndSync.h"
24

25
#define ARBGROUP_VER_NUMBER   1
26
#define ARBGROUP_RESERVE_SIZE 51
27

28
static SHashObj *arbUpdateHash = NULL;
29

30
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
31
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
32
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
33

34
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
35
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
36
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
37

38
static int32_t mndArbPutUpdateArbIntoWQ(SMnode *pMnode, SArbGroup *pNewGroup);
39
static int32_t mndArbPutBatchUpdateIntoWQ(SMnode *pMnode, SArray *newGroupArray);
40

41
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
42
static int32_t mndArbProcessTimer(SRpcMsg *pReq);
43
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
44
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
45
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
46
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
47
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
48
static void    mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
49
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq);
50

51
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
346✔
52
  if (token1 == NULL || token2 == NULL) return -1;
346!
53
  if (strlen(token1) == 0 || strlen(token2) == 0) return -1;
346✔
54
  return strncmp(token1, token2, TSDB_ARB_TOKEN_SIZE);
272✔
55
}
56

57
int32_t mndInitArbGroup(SMnode *pMnode) {
1,923✔
58
  int32_t   code = 0;
1,923✔
59
  SSdbTable table = {
1,923✔
60
      .sdbType = SDB_ARBGROUP,
61
      .keyType = SDB_KEY_INT32,
62
      .encodeFp = (SdbEncodeFp)mndArbGroupActionEncode,
63
      .decodeFp = (SdbDecodeFp)mndArbGroupActionDecode,
64
      .insertFp = (SdbInsertFp)mndArbGroupActionInsert,
65
      .updateFp = (SdbUpdateFp)mndArbGroupActionUpdate,
66
      .deleteFp = (SdbDeleteFp)mndArbGroupActionDelete,
67
  };
68

69
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
1,923✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndArbProcessTimer);
1,923✔
71
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
1,923✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
1,923✔
73
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
1,923✔
74
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
1,923✔
75
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_ASSIGN_LEADER, mndProcessAssignLeaderMsg);
1,923✔
76

77
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
1,923✔
78
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
1,923✔
79

80
  arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,923✔
81
  if (arbUpdateHash == NULL) {
1,923!
82
    code = terrno;
×
83
    TAOS_RETURN(code);
×
84
  }
85

86
  return sdbSetTable(pMnode->pSdb, table);
1,923✔
87
}
88

89
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
1,923✔
90

91
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
2✔
92
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
2✔
93
  if (pGroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
2!
94
    terrno = TSDB_CODE_MND_ARBGROUP_NOT_EXIST;
×
95
  }
96
  return pGroup;
2✔
97
}
98

99
void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
×
100
  SSdb *pSdb = pMnode->pSdb;
×
101
  sdbRelease(pSdb, pGroup);
×
102
}
×
103

104
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
18✔
105
  if (pVgObj->replica != 2) {
18!
106
    TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
107
  }
108
  (void)memset(outGroup, 0, sizeof(SArbGroup));
18✔
109
  outGroup->dbUid = pVgObj->dbUid;
18✔
110
  outGroup->vgId = pVgObj->vgId;
18✔
111
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
54✔
112
    SArbGroupMember *pMember = &outGroup->members[i];
36✔
113
    pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
36✔
114
  }
115

116
  TAOS_RETURN(TSDB_CODE_SUCCESS);
18✔
117
}
118

119
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
61✔
120
  int32_t code = 0;
61✔
121
  int32_t lino = 0;
61✔
122
  terrno = TSDB_CODE_OUT_OF_MEMORY;
61✔
123

124
  int32_t  size = sizeof(SArbGroup) + ARBGROUP_RESERVE_SIZE;
61✔
125
  SSdbRaw *pRaw = sdbAllocRaw(SDB_ARBGROUP, ARBGROUP_VER_NUMBER, size);
61✔
126
  if (pRaw == NULL) goto _OVER;
61!
127

128
  int32_t dataPos = 0;
61✔
129
  SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
61!
130
  SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
61!
131
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
183✔
132
    SArbGroupMember *pMember = &pGroup->members[i];
122✔
133
    SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
122!
134
    SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
122!
135
  }
136
  SDB_SET_INT8(pRaw, dataPos, pGroup->isSync, _OVER)
61!
137

138
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
61✔
139
  SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
61!
140
  SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
61!
141
  SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
61!
142
  SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
61!
143
  SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
61!
144
  SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
61!
145

146
  SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
61!
147

148
  terrno = 0;
61✔
149

150
_OVER:
61✔
151
  if (terrno != 0) {
61!
152
    mError("arbgroup:%d, failed to encode to raw:%p since %s", pGroup->vgId, pRaw, terrstr());
×
153
    sdbFreeRaw(pRaw);
×
154
    return NULL;
×
155
  }
156

157
  mTrace("arbgroup:%d, encode to raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
61!
158
  return pRaw;
61✔
159
}
160

161
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
49✔
162
  int32_t code = 0;
49✔
163
  int32_t lino = 0;
49✔
164
  terrno = TSDB_CODE_OUT_OF_MEMORY;
49✔
165
  SSdbRow   *pRow = NULL;
49✔
166
  SArbGroup *pGroup = NULL;
49✔
167

168
  int8_t sver = 0;
49✔
169
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
49!
170

171
  if (sver != ARBGROUP_VER_NUMBER) {
49!
172
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
173
    goto _OVER;
×
174
  }
175

176
  pRow = sdbAllocRow(sizeof(SArbGroup));
49✔
177
  if (pRow == NULL) goto _OVER;
49!
178

179
  pGroup = sdbGetRowObj(pRow);
49✔
180
  if (pGroup == NULL) goto _OVER;
49!
181

182
  int32_t dataPos = 0;
49✔
183
  SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
49!
184
  SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
49!
185
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
147✔
186
    SArbGroupMember *pMember = &pGroup->members[i];
98✔
187
    SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
98!
188
    SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
98!
189

190
    pMember->state.nextHbSeq = 0;
98✔
191
    pMember->state.responsedHbSeq = -1;
98✔
192
    pMember->state.lastHbMs = 0;
98✔
193
  }
194
  SDB_GET_INT8(pRaw, dataPos, &pGroup->isSync, _OVER)
49!
195

196
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
49✔
197
  SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
49!
198
  SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
49!
199
  SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
49!
200
  SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
49!
201
  SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
49!
202
  SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
49!
203

204
  pGroup->mutexInited = false;
49✔
205

206
  SDB_GET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
49!
207

208
  terrno = 0;
49✔
209

210
_OVER:
49✔
211
  if (terrno != 0) {
49!
212
    mError("arbgroup:%d, failed to decode from raw:%p since %s", pGroup == NULL ? 0 : pGroup->vgId, pRaw, terrstr());
×
213
    taosMemoryFreeClear(pRow);
×
214
    return NULL;
×
215
  }
216

217
  mTrace("arbgroup:%d, decode from raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
49!
218
  return pRow;
49✔
219
}
220

221
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
10✔
222
  mTrace("arbgroup:%d, perform insert action, row:%p", pGroup->vgId, pGroup);
10!
223
  if (!pGroup->mutexInited && (taosThreadMutexInit(&pGroup->mutex, NULL) == 0)) {
10!
224
    pGroup->mutexInited = true;
10✔
225
  }
226

227
  return pGroup->mutexInited ? 0 : -1;
10!
228
}
229

230
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
46✔
231
  mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
46!
232
  if (pGroup->mutexInited) {
46✔
233
    (void)taosThreadMutexDestroy(&pGroup->mutex);
10✔
234
    pGroup->mutexInited = false;
10✔
235
  }
236
  return 0;
46✔
237
}
238

239
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
30✔
240
  mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
30!
241
  (void)taosThreadMutexLock(&pOld->mutex);
30✔
242

243
  if (pOld->version != pNew->version) {
30!
244
    mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
×
245
          " new version:%" PRId64,
246
          pOld->vgId, pOld, pNew, pOld->version, pNew->version);
247
    goto _OVER;
×
248
  }
249

250
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
90✔
251
    tstrncpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
60✔
252
  }
253
  pOld->isSync = pNew->isSync;
30✔
254
  pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
30✔
255
  tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
30✔
256
  pOld->assignedLeader.acked = pNew->assignedLeader.acked;
30✔
257
  pOld->version++;
30✔
258
  pOld->code = pNew->code;
30✔
259
  pOld->updateTimeMs = pNew->updateTimeMs;
30✔
260

261
  mInfo(
30!
262
      "arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
263
      "as-token:%s, as-acked:%d, version:%" PRId64,
264
      pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
265
      pOld->assignedLeader.dnodeId, pOld->assignedLeader.token, pOld->assignedLeader.acked, pOld->version);
266

267
_OVER:
×
268
  (void)taosThreadMutexUnlock(&pOld->mutex);
30✔
269

270
  if (mndIsLeader(pSdb->pMnode)) {
30!
271
    mInfo("arbgroup:%d, remove from arb Update Hash", pOld->vgId);
30!
272
    if (taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)) != 0) {
30✔
273
      mError("arbgroup:%d, failed to remove from arb Update Hash", pOld->vgId);
6!
274
    }
275
  }
276
  return 0;
30✔
277
}
278

279
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
280
  int32_t  code = 0;
×
281
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
282
  if (pRedoRaw == NULL) {
×
283
    code = terrno;
×
284
    TAOS_RETURN(code);
×
285
  }
286
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
287
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
×
288
  return 0;
×
289
}
290

291
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
9✔
292
  int32_t  code = 0;
9✔
293
  SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
9✔
294
  if (pUndoRaw == NULL) {
9!
295
    code = terrno;
×
296
    TAOS_RETURN(code);
×
297
  }
298
  if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
9!
299
  if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
9!
300
  return 0;
9✔
301
}
302

303
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
33✔
304
  int32_t  code = 0;
33✔
305
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
33✔
306
  if (pCommitRaw == NULL) {
33!
307
    code = terrno;
×
308
    TAOS_RETURN(code);
×
309
  }
310
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
33!
311
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
33!
312
  return 0;
33✔
313
}
314

315
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
316
  int32_t  code = 0;
6✔
317
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
6✔
318
  if (pRedoRaw == NULL) {
6!
319
    code = terrno;
×
320
    TAOS_RETURN(code);
×
321
  }
322
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
6!
323
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
6!
324
  return 0;
6✔
325
}
326

327
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
328
  int32_t  code = 0;
×
329
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
330
  if (pRedoRaw == NULL) {
×
331
    code = terrno;
×
332
    TAOS_RETURN(code);
×
333
  }
334
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
335
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
336
  return 0;
×
337
}
338

339
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
340
  int32_t  code = 0;
6✔
341
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
6✔
342
  if (pCommitRaw == NULL) {
6!
343
    code = terrno;
×
344
    TAOS_RETURN(code);
×
345
  }
346
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
6!
347
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
6!
348
  return 0;
6✔
349
}
350

351
static void *mndBuildArbHeartBeatReq(int32_t *pContLen, char *arbToken, int32_t dnodeId, int64_t arbTerm,
136✔
352
                                     SArray *hbMembers) {
353
  SVArbHeartBeatReq req = {0};
136✔
354
  req.dnodeId = dnodeId;
136✔
355
  req.arbToken = arbToken;
136✔
356
  req.arbTerm = arbTerm;
136✔
357
  req.hbMembers = hbMembers;
136✔
358

359
  int32_t contLen = tSerializeSVArbHeartBeatReq(NULL, 0, &req);
136✔
360
  if (contLen <= 0) return NULL;
136!
361

362
  void *pReq = rpcMallocCont(contLen);
136✔
363
  if (pReq == NULL) return NULL;
136!
364

365
  if (tSerializeSVArbHeartBeatReq(pReq, contLen, &req) <= 0) {
136!
366
    rpcFreeCont(pReq);
×
367
    return NULL;
×
368
  }
369
  *pContLen = contLen;
136✔
370
  return pReq;
136✔
371
}
372

373
static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t arbTerm, SArray *hbMembers) {
136✔
374
  int32_t contLen = 0;
136✔
375
  void   *pHead = mndBuildArbHeartBeatReq(&contLen, arbToken, pDnode->id, arbTerm, hbMembers);
136✔
376
  if (pHead == NULL) {
136!
377
    mError("arbgroup:0, dnodeId:%d, failed to build arb-hb request", pDnode->id);
×
378
    return -1;
×
379
  }
380
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_HEARTBEAT, .pCont = pHead, .contLen = contLen};
136✔
381

382
  SEpSet epSet = mndGetDnodeEpset(pDnode);
136✔
383
  if (epSet.numOfEps == 0) {
136!
384
    mError("arbgroup:0, dnodeId:%d, failed to send arb-hb request to dnode since no epSet found", pDnode->id);
×
385
    rpcFreeCont(pHead);
×
386
    return -1;
×
387
  }
388

389
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
136✔
390
  if (code != 0) {
136!
391
    mError("arbgroup:0, dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code);
×
392
  } else {
393
    mTrace("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id);
136!
394
  }
395
  return code;
136✔
396
}
397

398
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
28,095✔
399
  int32_t    code = 0;
28,095✔
400
  SMnode    *pMnode = pReq->info.node;
28,095✔
401
  SSdb      *pSdb = pMnode->pSdb;
28,095✔
402
  SArbGroup *pArbGroup = NULL;
28,095✔
403
  void      *pIter = NULL;
28,095✔
404

405
  SHashObj *pDnodeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
28,095✔
406

407
  // collect member of same dnode
408
  while (1) {
409
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
28,180✔
410
    if (pIter == NULL) break;
28,180✔
411

412
    (void)taosThreadMutexLock(&pArbGroup->mutex);
85✔
413

414
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
255✔
415
      SArbGroupMember *pMember = &pArbGroup->members[i];
170✔
416
      int32_t          dnodeId = pMember->info.dnodeId;
170✔
417
      void            *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
170✔
418
      SArray          *hbMembers = NULL;
170✔
419
      if (pObj) {
170✔
420
        hbMembers = *(SArray **)pObj;
20✔
421
      } else {
422
        hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
150✔
423
        if (taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES) != 0) {
150!
424
          mError("arbgroup:0, dnodeId:%d, failed to push hb member inty]o hash, but conitnue next at this timer round",
×
425
                 dnodeId);
426
        }
427
      }
428
      SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
170✔
429
      if (taosArrayPush(hbMembers, &reqMember) == NULL) {
340!
430
        mError("arbgroup:0, dnodeId:%d, failed to push hb member, but conitnue next at this timer round", dnodeId);
×
431
      }
432
    }
433

434
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
85✔
435
    sdbRelease(pSdb, pArbGroup);
85✔
436
  }
437

438
  char arbToken[TSDB_ARB_TOKEN_SIZE];
439
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
28,095!
440
    mError("arbgroup:0, failed to get arb token for arb-hb timer");
×
441
    pIter = taosHashIterate(pDnodeHash, NULL);
×
442
    while (pIter) {
×
443
      SArray *hbMembers = *(SArray **)pIter;
×
444
      taosArrayDestroy(hbMembers);
×
445
      pIter = taosHashIterate(pDnodeHash, pIter);
×
446
    }
447
    taosHashCleanup(pDnodeHash);
×
448
    TAOS_RETURN(code);
×
449
  }
450

451
  int64_t nowMs = taosGetTimestampMs();
28,095✔
452

453
  pIter = NULL;
28,095✔
454
  while (1) {
150✔
455
    pIter = taosHashIterate(pDnodeHash, pIter);
28,245✔
456
    if (pIter == NULL) break;
28,245✔
457

458
    int32_t dnodeId = *(int32_t *)taosHashGetKey(pIter, NULL);
150✔
459
    SArray *hbMembers = *(SArray **)pIter;
150✔
460

461
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
150✔
462
    if (pDnode == NULL) {
150!
463
      mError("arbgroup:0, dnodeId:%d, timer failed to acquire dnode", dnodeId);
×
464
      taosArrayDestroy(hbMembers);
×
465
      continue;
×
466
    }
467

468
    int64_t mndTerm = mndGetTerm(pMnode);
150✔
469

470
    if (mndIsDnodeOnline(pDnode, nowMs)) {
150✔
471
      int32_t sendCode = mndSendArbHeartBeatReq(pDnode, arbToken, mndTerm, hbMembers);
136✔
472
      if (TSDB_CODE_SUCCESS != sendCode) {
136!
473
        mError("arbgroup:0, dnodeId:%d, timer failed to send arb-hb request", dnodeId);
×
474
      }
475
    }
476

477
    mndReleaseDnode(pMnode, pDnode);
150✔
478
    taosArrayDestroy(hbMembers);
150✔
479
  }
480
  taosHashCleanup(pDnodeHash);
28,095✔
481

482
  return 0;
28,095✔
483
}
484

485
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
28✔
486
                                     char *member0Token, char *member1Token) {
487
  SVArbCheckSyncReq req = {0};
28✔
488
  req.arbToken = arbToken;
28✔
489
  req.arbTerm = arbTerm;
28✔
490
  req.member0Token = member0Token;
28✔
491
  req.member1Token = member1Token;
28✔
492

493
  int32_t reqLen = tSerializeSVArbCheckSyncReq(NULL, 0, &req);
28✔
494
  int32_t contLen = reqLen + sizeof(SMsgHead);
28✔
495

496
  if (contLen <= 0) return NULL;
28!
497
  SMsgHead *pHead = rpcMallocCont(contLen);
28✔
498
  if (pHead == NULL) return NULL;
28!
499

500
  pHead->contLen = htonl(contLen);
28✔
501
  pHead->vgId = htonl(vgId);
28✔
502
  if (tSerializeSVArbCheckSyncReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
28!
503
    rpcFreeCont(pHead);
×
504
    return NULL;
×
505
  }
506
  *pContLen = contLen;
28✔
507
  return pHead;
28✔
508
}
509

510
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
28✔
511
                                      char *member1Token) {
512
  int32_t code = 0;
28✔
513
  int32_t contLen = 0;
28✔
514
  void   *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
28✔
515
  if (!pHead) {
28!
516
    mError("arbgroup:%d, failed to build check-sync request", vgId);
×
517
    return -1;
×
518
  }
519
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_CHECK_SYNC, .pCont = pHead, .contLen = contLen};
28✔
520
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
28✔
521
  TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
28✔
522
  
523
  SEpSet epSet = mndGetVgroupEpsetById(pMnode, vgId);
28✔
524
  if (epSet.numOfEps == 0) {
28!
525
    mError("arbgroup:%d, failed to send check-sync request since no epSet found", vgId);
×
526
    rpcFreeCont(pHead);
×
527
    code = -1;
×
528
    if (terrno != 0) code = terrno;
×
529
    TAOS_RETURN(code);
×
530
  }
531

532
  code = tmsgSendReq(&epSet, &rpcMsg);
28✔
533
  if (code != 0) {
28!
534
    mError("arbgroup:%d, failed to send check-sync request since 0x%x", vgId, code);
×
535
  } else {
536
    mDebug("arbgroup:%d, send check-sync request", vgId);
28!
537
  }
538
  return code;
28✔
539
}
540

541
static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) {
134✔
542
  SArbGroupMember *pArbMember = &pArbGroup->members[index];
134✔
543
  return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutSec * 1000);
134✔
544
}
545

546
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
2✔
547
                                             char *memberToken, bool force) {
548
  SVArbSetAssignedLeaderReq req = {0};
2✔
549
  req.arbToken = arbToken;
2✔
550
  req.arbTerm = arbTerm;
2✔
551
  req.memberToken = memberToken;
2✔
552
  if (force) req.force = 1;
2✔
553

554
  int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
2✔
555
  int32_t contLen = reqLen + sizeof(SMsgHead);
2✔
556

557
  if (contLen <= 0) return NULL;
2!
558
  SMsgHead *pHead = rpcMallocCont(contLen);
2✔
559
  if (pHead == NULL) return NULL;
2!
560

561
  pHead->contLen = htonl(contLen);
2✔
562
  pHead->vgId = htonl(vgId);
2✔
563
  if (tSerializeSVArbSetAssignedLeaderReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
2!
564
    rpcFreeCont(pHead);
×
565
    return NULL;
×
566
  }
567
  *pContLen = contLen;
2✔
568
  return pHead;
2✔
569
}
570

571
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
2✔
572
                                              int64_t term, char *memberToken, bool force) {
573
  int32_t code = 0;
2✔
574
  int32_t contLen = 0;
2✔
575
  void   *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force);
2✔
576
  if (!pHead) {
2!
577
    mError("arbgroup:%d, failed to build set-assigned request", vgId);
×
578
    code = -1;
×
579
    if (terrno != 0) code = terrno;
×
580
    TAOS_RETURN(code);
×
581
  }
582
  SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
2✔
583

584
  SEpSet epSet = mndGetDnodeEpsetById(pMnode, dnodeId);
2✔
585
  if (epSet.numOfEps == 0) {
2!
586
    mError("arbgroup:%d, failed to send arb-set-assigned request to dnode:%d since no epSet found", vgId, dnodeId);
×
587
    rpcFreeCont(pHead);
×
588
    code = -1;
×
589
    if (terrno != 0) code = terrno;
×
590
    TAOS_RETURN(code);
×
591
  }
592
  code = tmsgSendReq(&epSet, &rpcMsg);
2✔
593
  if (code != 0) {
2!
594
    mError("arbgroup:%d, failed to send arb-set-assigned request to dnode:%d since 0x%x", vgId, dnodeId, code);
×
595
  } else {
596
    mInfo("arbgroup:%d, send arb-set-assigned request to dnode:%d", vgId, dnodeId);
2!
597
  }
598
  return code;
2✔
599
}
600

601
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
67✔
602
  *pOp = CHECK_SYNC_NONE;
67✔
603
  int32_t code = 0;
67✔
604

605
  int32_t vgId = pArbGroup->vgId;
67✔
606

607
  bool                member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
67✔
608
  bool                member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
67✔
609
  SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
67✔
610
  int32_t             currentAssignedDnodeId = pAssignedLeader->dnodeId;
67✔
611

612
  // 1. has assigned && no response => send req
613
  if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
67!
614
    *pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
7✔
615
    return;
7✔
616
  }
617

618
  // 2. both of the two members are timeout => skip
619
  if (member0IsTimeout && member1IsTimeout) {
60!
620
    return;
7✔
621
  }
622

623
  // 3. no member is timeout => check sync
624
  if (member0IsTimeout == false && member1IsTimeout == false) {
53!
625
    // no assigned leader and not sync
626
    if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
49!
627
      *pOp = CHECK_SYNC_CHECK_SYNC;
31✔
628
    }
629
    return;
49✔
630
  }
631

632
  // 4. one of the members is timeout => set assigned leader
633
  int32_t          candidateIndex = member0IsTimeout ? 1 : 0;
4✔
634
  SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
4✔
635

636
  // has assigned leader and dnodeId not match => skip
637
  if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
4!
638
    mInfo("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, assigned leader has been set to dnodeId:%d",
×
639
          vgId, pMember->info.dnodeId, currentAssignedDnodeId);
640
    return;
×
641
  }
642

643
  // not sync => skip
644
  if (pArbGroup->isSync == false) {
4!
645
    if (currentAssignedDnodeId == pMember->info.dnodeId) {
×
646
      mDebug("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, arb group is not sync", vgId,
×
647
             pMember->info.dnodeId);
648
    } else {
649
      mInfo("arbgroup:%d, arb skip to set assigned leader to dnodeId:%d, arb group is not sync", vgId,
×
650
            pMember->info.dnodeId);
651
    }
652
    *pOp = CHECK_SYNC_CHECK_SYNC;
×
653
    return;
×
654
  }
655

656
  // is sync && no assigned leader => write to sdb
657
  mndArbGroupDupObj(pArbGroup, pNewGroup);
4✔
658
  mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
4✔
659
  *pOp = CHECK_SYNC_UPDATE;
4✔
660
}
661

662
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) {
1✔
663
  SMnode    *pMnode = pReq->info.node;
1✔
664
  int32_t    code = -1, lino = 0;
1✔
665
  SArray    *pArray = NULL;
1✔
666
  void      *pIter = NULL;
1✔
667
  SSdb      *pSdb = pMnode->pSdb;
1✔
668
  SArbGroup *pArbGroup = NULL;
1✔
669

670
  SAssignLeaderReq req = {0};
1✔
671
  if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) {
1!
672
    code = TSDB_CODE_INVALID_MSG;
×
673
    goto _exit;
×
674
  }
675

676
  mInfo("arbgroup:0, begin to process assign leader");
1!
677

678
  char arbToken[TSDB_ARB_TOKEN_SIZE];
679
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
1!
680

681
  int64_t term = mndGetTerm(pMnode);
1✔
682
  if (term < 0) {
1!
683
    mError("arbgroup:0, arb failed to get term since %s", terrstr());
×
684
    code = -1;
×
685
    if (terrno != 0) code = terrno;
×
686
    TAOS_RETURN(code);
×
687
  }
688

689
  while (1) {
1✔
690
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
2✔
691
    if (pIter == NULL) break;
2✔
692

693
    SArbGroup arbGroupDup = {0};
1✔
694

695
    (void)taosThreadMutexLock(&pArbGroup->mutex);
1✔
696
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
1✔
697
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
1✔
698

699
    sdbRelease(pSdb, pArbGroup);
1✔
700

701
    int32_t dnodeId = 0;
1✔
702
    for (int32_t i = 0; i < 2; i++) {
1!
703
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId);
1✔
704
      bool       isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
1✔
705
      mndReleaseDnode(pMnode, pDnode);
1✔
706
      if (isonline) {
1!
707
        dnodeId = arbGroupDup.members[i].info.dnodeId;
1✔
708
        break;
1✔
709
      }
710
    }
711

712
    (void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true);
1✔
713
    mInfo("arbgroup:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId);
1!
714
  }
715

716
  code = 0;
1✔
717

718
  // auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen);
719

720
_exit:
1✔
721
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1!
722
    mError("arbgroup:0, failed to assign leader since %s", tstrerror(code));
×
723
  }
724

725
  tFreeSAssignLeaderReq(&req);
1✔
726
  TAOS_RETURN(code);
1✔
727
}
728

729
static int32_t mndArbProcessTimer(SRpcMsg *pReq) {
18,441✔
730
  int32_t    code = 0, lino = 0;
18,441✔
731
  SMnode    *pMnode = pReq->info.node;
18,441✔
732
  SSdb      *pSdb = pMnode->pSdb;
18,441✔
733
  SArbGroup *pArbGroup = NULL;
18,441✔
734
  void      *pIter = NULL;
18,441✔
735
  SArray    *pUpdateArray = NULL;
18,441✔
736

737
  char arbToken[TSDB_ARB_TOKEN_SIZE];
738
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
18,441!
739

740
  int64_t term = mndGetTerm(pMnode);
18,441✔
741
  if (term < 0) {
18,441!
742
    mError("arbgroup:0, arb failed to get term since %s", terrstr());
×
743
    code = -1;
×
744
    if (terrno != 0) code = terrno;
×
745
    TAOS_RETURN(code);
×
746
  }
747

748
  int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
18,441✔
749
  int64_t nowMs = taosGetTimestampMs();
18,441✔
750
  if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
18,441✔
751
    mInfo("arbgroup:0, arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64,
1,529!
752
          roleTimeMs, nowMs);
753
    return 0;
1,529✔
754
  }
755

756
  while (1) {
55✔
757
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
16,967✔
758
    if (pIter == NULL) break;
16,967✔
759

760
    SArbGroup arbGroupDup = {0};
55✔
761

762
    (void)taosThreadMutexLock(&pArbGroup->mutex);
55✔
763
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
55✔
764
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
55✔
765

766
    sdbRelease(pSdb, pArbGroup);
55✔
767

768
    ECheckSyncOp op = CHECK_SYNC_NONE;
55✔
769
    SArbGroup    newGroup = {0};
55✔
770
    mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
55✔
771

772
    int32_t             vgId = arbGroupDup.vgId;
55✔
773
    SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
55✔
774
    int32_t             assgndDnodeId = pAssgndLeader->dnodeId;
55✔
775

776
    switch (op) {
55!
777
      case CHECK_SYNC_NONE:
25✔
778
        mTrace("arbgroup:%d, arb skip to send msg by check sync", vgId);
25!
779
        break;
25✔
780
      case CHECK_SYNC_SET_ASSIGNED_LEADER:
1✔
781
        (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false);
1✔
782
        mInfo("arbgroup:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
1!
783
        break;
1✔
784
      case CHECK_SYNC_CHECK_SYNC:
28✔
785
        (void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
28✔
786
                                     arbGroupDup.members[1].state.token);
787
        mInfo("arbgroup:%d, send vnode-arb-check-sync request", vgId);
28!
788
        break;
28✔
789
      case CHECK_SYNC_UPDATE:
1✔
790
        if (!pUpdateArray) {
1!
791
          pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
1✔
792
          if (!pUpdateArray) {
1!
793
            TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
794
          }
795
        }
796

797
        if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
1!
798
          TAOS_CHECK_EXIT(terrno);
×
799
        }
800
        break;
1✔
801
      default:
×
802
        mError("arbgroup:%d, arb unknown check sync op:%d", vgId, op);
×
803
        break;
×
804
    }
805
  }
806

807
  TAOS_CHECK_EXIT(mndArbPutBatchUpdateIntoWQ(pMnode, pUpdateArray));
16,912!
808

809
_exit:
16,912✔
810
  if (code != 0) {
16,912!
811
    mError("arbgroup:0, failed to check sync at line %d since %s", lino, terrstr());
×
812
  }
813

814
  taosArrayDestroy(pUpdateArray);
16,912✔
815
  return 0;
16,912✔
816
}
817

818
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
22✔
819
  SMArbUpdateGroupBatchReq req = {0};
22✔
820
  req.updateArray = updateArray;
22✔
821

822
  int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
22✔
823
  if (contLen <= 0) return NULL;
22!
824
  SMsgHead *pHead = rpcMallocCont(contLen);
22✔
825
  if (pHead == NULL) return NULL;
22!
826

827
  if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
22!
828
    rpcFreeCont(pHead);
×
829
    return NULL;
×
830
  }
831
  *pContLen = contLen;
22✔
832
  return pHead;
22✔
833
}
834

835
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
24✔
836
  outGroup->vgId = pGroup->vgId;
24✔
837
  outGroup->dbUid = pGroup->dbUid;
24✔
838
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
72✔
839
    outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
48✔
840
    outGroup->members[i].token = pGroup->members[i].state.token;  // just copy the pointer
48✔
841
  }
842
  outGroup->isSync = pGroup->isSync;
24✔
843
  outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
24✔
844
  outGroup->assignedLeader.token = pGroup->assignedLeader.token;  // just copy the pointer
24✔
845
  outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
24✔
846
  outGroup->version = pGroup->version;
24✔
847
  outGroup->code = pGroup->code;
24✔
848
  outGroup->updateTimeMs = pGroup->updateTimeMs;
24✔
849
}
24✔
850

851
static int32_t mndArbPutUpdateArbIntoWQ(SMnode *pMnode, SArbGroup *pNewGroup) {
8✔
852
  if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
8✔
853
    mInfo("arbgroup:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
2!
854
    return 0;
2✔
855
  }
856

857
  int32_t ret = -1;
6✔
858

859
  SMArbUpdateGroup newGroup = {0};
6✔
860
  mndInitArbUpdateGroup(pNewGroup, &newGroup);
6✔
861

862
  SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
6✔
863
  if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
6!
864

865
  int32_t contLen = 0;
6✔
866
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
6✔
867
  if (!pHead) {
6!
868
    mError("arbgroup:0, failed to build arb-update-group request");
×
869
    goto _OVER;
×
870
  }
871

872
  SRpcMsg rpcMsg = {
6✔
873
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
874
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
6✔
875
  if (ret != 0) goto _OVER;
6!
876

877
  mInfo("arbgroup:%d, put into arb update hash", pNewGroup->vgId);
6!
878
  if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
6!
879

880
_OVER:
6✔
881
  taosArrayDestroy(pArray);
6✔
882
  if (ret != 0) {
6!
883
    mError("arbgroup:%d, failed to put arb group update into write queue since %s", pNewGroup->vgId, tstrerror(ret));
×
884
  }
885
  return ret;
6✔
886
}
887

888
static int32_t mndArbPutBatchUpdateIntoWQ(SMnode *pMnode, SArray *newGroupArray) {
17,046✔
889
  int32_t ret = -1;
17,046✔
890

891
  size_t  sz = taosArrayGetSize(newGroupArray);
17,046✔
892
  SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
17,046✔
893
  for (size_t i = 0; i < sz; i++) {
17,126✔
894
    SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
80✔
895
    if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
80✔
896
      mInfo("arbgroup:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
62!
897
      continue;
62✔
898
    }
899

900
    SMArbUpdateGroup newGroup = {0};
18✔
901
    mndInitArbUpdateGroup(pNewGroup, &newGroup);
18✔
902

903
    if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
18!
904
    mInfo("arbgroup:%d, put into arb update hash", pNewGroup->vgId);
18!
905
    if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) {
18!
906
      mError("arbgroup:%d, failed to put into arb update hash since %s", pNewGroup->vgId, tstrerror(ret));
×
907
      goto _OVER;
×
908
    }
909
  }
910

911
  if (taosArrayGetSize(pArray) == 0) {
17,046✔
912
    ret = 0;
17,030✔
913
    goto _OVER;
17,030✔
914
  }
915

916
  int32_t contLen = 0;
16✔
917
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
16✔
918
  if (!pHead) {
16!
919
    mError("arbgroup:0, failed to build arb-update-group request");
×
920
    goto _OVER;
×
921
  }
922

923
  SRpcMsg rpcMsg = {
16✔
924
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
925
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
16✔
926

927
_OVER:
17,046✔
928
  taosArrayDestroy(pArray);
17,046✔
929

930
  if (ret != 0) {
17,046!
931
    mError("arbgroup:0, failed to put arb group update into write queue since %s", tstrerror(ret));
×
932
    for (size_t i = 0; i < sz; i++) {
×
933
      SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
934
      if (taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != 0) {
×
935
        mError("arbgroup:%d, failed to remove from arb Update Hash", pNewGroup->vgId);
×
936
      }
937
    }
938
  }
939

940
  return ret;
17,046✔
941
}
942

943
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
22✔
944
  int    code = -1;
22✔
945
  size_t sz = 0;
22✔
946

947
  SMArbUpdateGroupBatchReq req = {0};
22✔
948
  if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
22!
949
    mError("arbgroup:0, arb failed to decode arb-update-group request");
×
950
    TAOS_RETURN(code);
×
951
  }
952

953
  SMnode *pMnode = pReq->info.node;
22✔
954
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
22✔
955
  if (pTrans == NULL) {
22!
956
    mError("arbgroup:0, failed to create update arbgroup trans, since %s", terrstr());
×
957
    tFreeSMArbUpdateGroupBatchReq(&req);
×
958
    TAOS_RETURN(terrno);
×
959
  }
960

961
  sz = taosArrayGetSize(req.updateArray);
22✔
962
  for (size_t i = 0; i < sz; i++) {
46✔
963
    SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
24✔
964
    SArbGroup         newGroup = {0};
24✔
965
    newGroup.vgId = pUpdateGroup->vgId;
24✔
966
    newGroup.dbUid = pUpdateGroup->dbUid;
24✔
967
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
72✔
968
      newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
48✔
969
      tstrncpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
48✔
970
    }
971

972
    newGroup.isSync = pUpdateGroup->isSync;
24✔
973
    newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
24✔
974
    tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
24✔
975
    newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
24✔
976
    newGroup.version = pUpdateGroup->version;
24✔
977
    newGroup.code = pUpdateGroup->code;
24✔
978
    newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
24✔
979

980
    mInfo(
24!
981
        "trans:%d, arbgroup:%d, used to update member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
982
        "%" PRId64,
983
        pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
984
        newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
985
        newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked,
986
        pUpdateGroup->code, pUpdateGroup->updateTimeMs);
987

988
    SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
24✔
989
    if (!pOldGroup) {
24!
990
      mError("trans:%d, arbgroup:%d, arb skip to update arbgroup, since no obj found", pTrans->id, newGroup.vgId);
×
991
      if (taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)) != 0) {
×
992
        mError("trans:%d, arbgroup:%d, failed to remove from arb Update Hash", pTrans->id, newGroup.vgId);
×
993
      }
994
      continue;
×
995
    }
996

997
    mndTransAddArbGroupId(pTrans, newGroup.vgId);
24✔
998

999
    if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
24!
1000
      mError("trans:%d, arbgroup:%d, failed to update arbgroup in set commit log since %s", pTrans->id, newGroup.vgId,
×
1001
             tstrerror(code));
1002
      goto _OVER;
×
1003
    }
1004

1005
    mInfo("trans:%d, arbgroup:%d, used to update member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
24!
1006
          pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
1007
          newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
1008
          newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
1009

1010
    sdbRelease(pMnode->pSdb, pOldGroup);
24✔
1011
  }
1012

1013
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
22!
1014
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
22!
1015

1016
  code = 0;
22✔
1017

1018
_OVER:
22✔
1019
  if (code != 0) {
22!
1020
    // failed to update arbgroup
1021
    mError("trans:%d, arbgroup:0, failed to update arbgroup since %s", pTrans->id, tstrerror(code));
×
1022
    for (size_t i = 0; i < sz; i++) {
×
1023
      SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
×
1024
      if (taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)) != 0) {
×
1025
        mError("trans:%d, arbgroup:%d failed to remove from arb Update Hash", pTrans->id, pUpdateGroup->vgId);
×
1026
      }
1027
    }
1028
  }
1029

1030
  mndTransDrop(pTrans);
22✔
1031
  tFreeSMArbUpdateGroupBatchReq(&req);
22✔
1032
  return code;
22✔
1033
}
1034

1035
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
156✔
1036
  (void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
156✔
1037
}
156✔
1038

1039
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
4✔
1040
  SArbGroupMember *pMember = &pGroup->members[index];
4✔
1041

1042
  pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
4✔
1043
  tstrncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
4✔
1044
  pGroup->assignedLeader.acked = false;
4✔
1045
}
4✔
1046

1047
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
3✔
1048
  pGroup->assignedLeader.dnodeId = 0;
3✔
1049
  (void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
3✔
1050
  pGroup->assignedLeader.acked = false;
3✔
1051
}
3✔
1052

1053
bool mndArbIsNeedUpdateTokenByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
163✔
1054
                                        SArbGroup *pNewGroup) {
1055
  bool             updateToken = false;
163✔
1056
  SArbGroupMember *pMember = NULL;
163✔
1057

1058
  (void)taosThreadMutexLock(&pGroup->mutex);
163✔
1059

1060
  int index = 0;
163✔
1061
  for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
235!
1062
    pMember = &pGroup->members[index];
235✔
1063
    if (pMember->info.dnodeId == dnodeId) {
235✔
1064
      break;
163✔
1065
    }
1066
    pMember = NULL;
72✔
1067
  }
1068

1069
  if (pMember == NULL) {
163!
1070
    mError("arbgroup:%d, arb token update check failed, dnodeId:%d not found", pRspMember->vgId, dnodeId);
×
1071
    goto _OVER;
×
1072
  }
1073

1074
  if (pMember->state.responsedHbSeq >= pRspMember->hbSeq) {
163✔
1075
    // skip
1076
    mError("arbgroup:%d, dnodeId:%d skip arb token update, heart beat seq expired, local:%d msg:%d", pRspMember->vgId,
3!
1077
           dnodeId, pMember->state.responsedHbSeq, pRspMember->hbSeq);
1078
    goto _OVER;
3✔
1079
  }
1080

1081
  // update hb state
1082
  pMember->state.responsedHbSeq = pRspMember->hbSeq;
160✔
1083
  pMember->state.lastHbMs = nowMs;
160✔
1084
  if (mndArbCheckToken(pMember->state.token, pRspMember->memberToken) == 0) {
160✔
1085
    // skip
1086
    mDebug("arbgroup:%d, dnodeId:%d skip arb token update, token matched", pRspMember->vgId, dnodeId);
78✔
1087
    goto _OVER;
78✔
1088
  }
1089

1090
  // update token
1091
  mndArbGroupDupObj(pGroup, pNewGroup);
82✔
1092
  tstrncpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
82✔
1093
  pNewGroup->isSync = false;
82✔
1094

1095
  bool resetAssigned = false;
82✔
1096
  if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) {
82✔
1097
    mndArbGroupResetAssignedLeader(pNewGroup);
3✔
1098
    resetAssigned = true;
3✔
1099
  }
1100

1101
  updateToken = true;
82✔
1102
  mInfo("arbgroup:%d, need to update token, by heartbeat from dnodeId:%d, resetAssigned:%d", pRspMember->vgId, dnodeId,
82!
1103
        resetAssigned);
1104

1105
_OVER:
×
1106
  (void)taosThreadMutexUnlock(&pGroup->mutex);
163✔
1107
  return updateToken;
163✔
1108
}
1109

1110
static int32_t mndArbUpdateByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
134✔
1111
  int64_t nowMs = taosGetTimestampMs();
134✔
1112
  size_t  size = taosArrayGetSize(memberArray);
134✔
1113
  SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
134✔
1114

1115
  for (size_t i = 0; i < size; i++) {
288✔
1116
    SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
154✔
1117

1118
    SArbGroup  newGroup = {0};
154✔
1119
    SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &pRspMember->vgId);
154✔
1120
    if (pGroup == NULL) {
154!
1121
      mError("arbgroup:%d failed to update arb token, not found", pRspMember->vgId);
×
1122
      continue;
×
1123
    }
1124

1125
    bool updateToken = mndArbIsNeedUpdateTokenByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
154✔
1126
    if (updateToken) {
154✔
1127
      if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
79!
1128
        mError("arbgroup:0, failed to push newGroup to updateArray, but continue at this hearbear");
×
1129
      }
1130
    }
1131

1132
    sdbRelease(pMnode->pSdb, pGroup);
154✔
1133
  }
1134

1135
  TAOS_CHECK_RETURN(mndArbPutBatchUpdateIntoWQ(pMnode, pUpdateArray));
134!
1136

1137
  taosArrayDestroy(pUpdateArray);
134✔
1138
  return 0;
134✔
1139
}
1140

1141
bool mndArbIsNeedUpdateSyncStatusByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
15✔
1142
                                             bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
1143
  bool updateIsSync = false;
15✔
1144

1145
  (void)taosThreadMutexLock(&pGroup->mutex);
15✔
1146

1147
  if (pGroup->assignedLeader.dnodeId != 0) {
15!
1148
    terrno = TSDB_CODE_SUCCESS;
×
1149
    mInfo("arbgroup:%d, skip to update arb sync, has assigned leader:%d", vgId, pGroup->assignedLeader.dnodeId);
×
1150
    goto _OVER;
×
1151
  }
1152

1153
  char *local0Token = pGroup->members[0].state.token;
15✔
1154
  char *local1Token = pGroup->members[1].state.token;
15✔
1155
  if (mndArbCheckToken(local0Token, member0Token) != 0 || mndArbCheckToken(local1Token, member1Token) != 0) {
15!
1156
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
5✔
1157
    mInfo("arbgroup:0, skip to update arb sync, memberToken mismatch local:[%s][%s], msg:[%s][%s]", local0Token,
5!
1158
          local1Token, member0Token, member1Token);
1159
    goto _OVER;
5✔
1160
  }
1161

1162
  if (pGroup->isSync != newIsSync) {
10!
1163
    mndArbGroupDupObj(pGroup, pNewGroup);
10✔
1164
    pNewGroup->isSync = newIsSync;
10✔
1165
    pNewGroup->code = code;
10✔
1166
    pNewGroup->updateTimeMs = taosGetTimestampMs();
10✔
1167

1168
    mInfo("arbgroup:%d, need to update isSync status, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync,
10!
1169
          pNewGroup->updateTimeMs);
1170
    updateIsSync = true;
10✔
1171
  }
1172

UNCOV
1173
_OVER:
×
1174
  (void)taosThreadMutexUnlock(&pGroup->mutex);
15✔
1175
  return updateIsSync;
15✔
1176
}
1177

1178
static int32_t mndArbUpdateByCheckSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token,
9✔
1179
                                       bool newIsSync, int32_t rsp_code) {
1180
  int32_t    code = 0;
9✔
1181
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
9✔
1182
  if (pGroup == NULL) {
9!
1183
    mError("arbgroup:%d, failed to update arb sync, not found", vgId);
×
1184
    code = -1;
×
1185
    if (terrno != 0) code = terrno;
×
1186
    TAOS_RETURN(code);
×
1187
  }
1188

1189
  SArbGroup newGroup = {0};
9✔
1190
  bool      updateIsSync =
1191
      mndArbIsNeedUpdateSyncStatusByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
9✔
1192
  if (updateIsSync) {
9✔
1193
    if (mndArbPutUpdateArbIntoWQ(pMnode, &newGroup) != 0) {
7!
1194
      mError("arbgroup:%d, failed to pullup update arb sync, since %s", vgId, terrstr());
×
1195
    }
1196
  }
1197

1198
  sdbRelease(pMnode->pSdb, pGroup);
9✔
1199
  return 0;
9✔
1200
}
1201

1202
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
134✔
1203
  if (pRsp->contLen == 0) {
134!
1204
    mDebug("arbgroup:0, arb hb-rsp contLen is 0");
×
1205
    return 0;
×
1206
  }
1207

1208
  int32_t code = -1;
134✔
1209

1210
  SMnode *pMnode = pRsp->info.node;
134✔
1211
  SSdb   *pSdb = pMnode->pSdb;
134✔
1212

1213
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1214
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
134!
1215
    mError("arbgroup:0, failed to get arb token for arb-hb response");
×
1216
    TAOS_RETURN(code);
×
1217
  }
1218

1219
  SVArbHeartBeatRsp arbHbRsp = {0};
134✔
1220
  if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
134!
1221
    mInfo("arbgroup:0, arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1222
    TAOS_RETURN(code);
×
1223
  }
1224

1225
  if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
134!
1226
    mInfo("arbgroup:0, arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]",
×
1227
          arbHbRsp.dnodeId, arbToken, arbHbRsp.arbToken);
1228
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1229
    goto _OVER;
×
1230
  }
1231

1232
  TAOS_CHECK_GOTO(mndArbUpdateByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
134!
1233
  code = 0;
134✔
1234

1235
_OVER:
134✔
1236
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
134✔
1237
  return code;
134✔
1238
}
1239

1240
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
28✔
1241
  if (pRsp->contLen == 0) {
28✔
1242
    mDebug("arbgroup:0, arb check-sync-rsp contLen is 0");
19!
1243
    return 0;
19✔
1244
  }
1245

1246
  int32_t code = -1;
9✔
1247

1248
  SMnode *pMnode = pRsp->info.node;
9✔
1249
  SSdb   *pSdb = pMnode->pSdb;
9✔
1250

1251
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1252
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
9!
1253
    mError("arbgroup:0, failed to get arb token from vnode-arb-check-sync-rsp");
×
1254
    TAOS_RETURN(code);
×
1255
  }
1256

1257
  SVArbCheckSyncRsp syncRsp = {0};
9✔
1258
  if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
9!
1259
    mInfo("arbgroup:0, arb vnode-arb-check-sync-rsp deserialize failed, since:%s", tstrerror(pRsp->code));
×
1260
    if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
×
1261
      terrno = TSDB_CODE_SUCCESS;
×
1262
      return 0;
×
1263
    }
1264
    TAOS_RETURN(code);
×
1265
  }
1266

1267
  mInfo("arbgroup:%d, vnode-arb-check-sync-rsp received, QID:0x%" PRIx64 ":0x%" PRIx64 ", seqNum:%" PRIx64
9!
1268
        ", errCode:%d",
1269
        syncRsp.vgId, pRsp->info.traceId.rootId, pRsp->info.traceId.msgId, pRsp->info.seqNum, syncRsp.errCode);
1270
  if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
9!
1271
    mError("arbgroup:%d, skip update arb sync for arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
×
1272
           syncRsp.arbToken);
1273
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1274
    goto _OVER;
×
1275
  }
1276

1277
  bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
9✔
1278
  if ((code = mndArbUpdateByCheckSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
9!
1279
                                      syncRsp.errCode)) != 0) {
1280
    mError("arbgroup:%d, failed to update arb sync for since:%s", syncRsp.vgId, terrstr());
×
1281
    goto _OVER;
×
1282
  }
1283

1284
  code = 0;
9✔
1285

1286
_OVER:
9✔
1287
  tFreeSVArbCheckSyncRsp(&syncRsp);
9✔
1288
  TAOS_RETURN(code);
9✔
1289
}
1290

1291
bool mndArbIsNeedUpdateAssignedBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
11✔
1292
                                                   SArbGroup *pNewGroup) {
1293
  bool updateAssigned = false;
11✔
1294

1295
  (void)taosThreadMutexLock(&pGroup->mutex);
11✔
1296
  if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
11✔
1297
    mError("arbgroup:%d, skip update arb assigned for member token mismatch, local:[%s] msg:[%s]", vgId,
4!
1298
           pGroup->assignedLeader.token, memberToken);
1299
    goto _OVER;
4✔
1300
  }
1301

1302
  if (errcode != TSDB_CODE_SUCCESS) {
7✔
1303
    mError("arbgroup:%d, skip update arb assigned for since:%s", vgId, tstrerror(errcode));
3!
1304
    goto _OVER;
3✔
1305
  }
1306

1307
  if (pGroup->assignedLeader.acked == false) {
4!
1308
    mndArbGroupDupObj(pGroup, pNewGroup);
4✔
1309
    pNewGroup->isSync = false;
4✔
1310
    pNewGroup->assignedLeader.acked = true;
4✔
1311

1312
    mInfo("arbgroup:%d, arb received assigned ack", vgId);
4!
1313
    updateAssigned = true;
4✔
1314
    goto _OVER;
4✔
1315
  }
1316

1317
_OVER:
×
1318
  (void)taosThreadMutexUnlock(&pGroup->mutex);
11✔
1319
  return updateAssigned;
11✔
1320
}
1321

1322
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
2✔
1323
  if (pRsp->contLen == 0) {
2!
1324
    mDebug("arbgroup:0, arb set-assigned-rsp contLen is 0");
×
1325
    return 0;
×
1326
  }
1327

1328
  int32_t code = -1;
2✔
1329

1330
  SMnode *pMnode = pRsp->info.node;
2✔
1331
  SSdb   *pSdb = pMnode->pSdb;
2✔
1332

1333
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1334
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
2!
1335
    mError("arbgroup:0, failed to get arb token for arb-set-assigned response");
×
1336
    TAOS_RETURN(code);
×
1337
  }
1338

1339
  SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
2✔
1340
  if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
2!
1341
    mError("arbgroup:0, arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1342
    TAOS_RETURN(code);
×
1343
  }
1344

1345
  if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
2!
1346
    mError("arbgroup:%d, skip update arb assigned for arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
×
1347
           arbToken, setAssignedRsp.arbToken);
1348
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1349
    goto _OVER;
×
1350
  }
1351

1352
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
2✔
1353
  if (!pGroup) {
2!
1354
    mError("arbgroup:%d, failed to set arb assigned for since:%s", setAssignedRsp.vgId, terrstr());
×
1355
    code = -1;
×
1356
    if (terrno != 0) code = terrno;
×
1357
    goto _OVER;
×
1358
  }
1359

1360
  SArbGroup newGroup = {0};
2✔
1361
  bool      updateAssigned = mndArbIsNeedUpdateAssignedBySetAssignedLeader(
2✔
1362
      pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken, pRsp->code, &newGroup);
1363
  if (updateAssigned) {
2✔
1364
    if ((code = mndArbPutUpdateArbIntoWQ(pMnode, &newGroup)) != 0) {
1!
1365
      mError("arbgroup:%d, failed to pullup update arb assigned since:%s", setAssignedRsp.vgId, tstrerror(code));
×
1366
      goto _OVER;
×
1367
    }
1368
  }
1369

1370
  code = 0;
2✔
1371

1372
_OVER:
2✔
1373
  tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
2✔
1374
  return code;
2✔
1375
}
1376

1377
static char *formatTimestamp(char *buf, int64_t val, int precision) {
18✔
1378
  time_t tt;
1379
  if (precision == TSDB_TIME_PRECISION_MICRO) {
18!
1380
    tt = (time_t)(val / 1000000);
×
1381
  }
1382
  if (precision == TSDB_TIME_PRECISION_NANO) {
18!
1383
    tt = (time_t)(val / 1000000000);
×
1384
  } else {
1385
    tt = (time_t)(val / 1000);
18✔
1386
  }
1387

1388
  struct tm tm;
1389
  if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
18!
1390
    mError("failed to get local time");
×
1391
    return NULL;
×
1392
  }
1393
  size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
18✔
1394

1395
  if (precision == TSDB_TIME_PRECISION_MICRO) {
18!
1396
    sprintf(buf + pos, ".%06d", (int)(val % 1000000));
×
1397
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
18!
1398
    sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
×
1399
  } else {
1400
    sprintf(buf + pos, ".%03d", (int)(val % 1000));
18✔
1401
  }
1402

1403
  return buf;
18✔
1404
}
1405

1406
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
18✔
1407
  SMnode    *pMnode = pReq->info.node;
18✔
1408
  SSdb      *pSdb = pMnode->pSdb;
18✔
1409
  int32_t    numOfRows = 0;
18✔
1410
  int32_t    cols = 0;
18✔
1411
  SArbGroup *pGroup = NULL;
18✔
1412
  int32_t    code = 0;
18✔
1413
  int32_t    lino = 0;
18✔
1414

1415
  while (numOfRows < rows) {
36!
1416
    pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
36✔
1417
    if (pShow->pIter == NULL) break;
36✔
1418

1419
    (void)taosThreadMutexLock(&pGroup->mutex);
18✔
1420

1421
    cols = 0;
18✔
1422
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
18✔
1423
    SVgObj          *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
18✔
1424
    if (!pVgObj) {
18!
1425
      (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1426
      sdbRelease(pSdb, pGroup);
×
1427
      continue;
×
1428
    }
1429
    char dbNameInGroup[TSDB_DB_FNAME_LEN];
1430
    tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
18✔
1431
    sdbRelease(pSdb, pVgObj);
18✔
1432

1433
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
18✔
1434
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(dbNameInGroup), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
18✔
1435
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pGroup, &lino, _OVER);
18!
1436

1437
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
18✔
1438
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false), pGroup, &lino, _OVER);
18!
1439

1440
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
54✔
1441
      SArbGroupMember *pMember = &pGroup->members[i];
36✔
1442
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
36✔
1443
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false), pGroup,
36!
1444
                          &lino, _OVER);
1445
    }
1446

1447
    mInfo("arbgroup:%d, arb group sync:%d, code:%s, update time:%" PRId64, pGroup->vgId, pGroup->isSync,
18!
1448
          tstrerror(pGroup->code), pGroup->updateTimeMs);
1449

1450
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
18✔
1451
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
18!
1452

1453
    char strCheckSyncCode[100] = {0};
18✔
1454
    char bufUpdateTime[40] = {0};
18✔
1455
    (void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
18✔
1456
    (void)tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
18✔
1457

1458
    char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
18✔
1459
    STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);
18✔
1460
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
18✔
1461
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)checkSyncCode, false), pGroup, &lino, _OVER);
18!
1462

1463
    if (pGroup->assignedLeader.dnodeId != 0) {
18!
1464
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1465
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false),
×
1466
                          pGroup, &lino, _OVER);
1467

1468
      char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
×
1469
      STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1470
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1471
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)token, false), pGroup, &lino, _OVER);
×
1472

1473
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1474
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false),
×
1475
                          pGroup, &lino, _OVER);
1476
    } else {
1477
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
18✔
1478
      colDataSetNULL(pColInfo, numOfRows);
18!
1479

1480
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
18✔
1481
      colDataSetNULL(pColInfo, numOfRows);
18!
1482

1483
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
18✔
1484
      colDataSetNULL(pColInfo, numOfRows);
18!
1485
    }
1486

1487
    (void)taosThreadMutexUnlock(&pGroup->mutex);
18✔
1488

1489
    numOfRows++;
18✔
1490
    sdbRelease(pSdb, pGroup);
18✔
1491
  }
1492

1493
_OVER:
×
1494
  if (code != 0) mError("arbgroup:0, failed to restrieve arb group at line:%d, since %s", lino, tstrerror(code));
18!
1495
  pShow->numOfRows += numOfRows;
18✔
1496

1497
  return numOfRows;
18✔
1498
}
1499

1500
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter) {
×
1501
  SSdb *pSdb = pMnode->pSdb;
×
1502
  sdbCancelFetchByType(pSdb, pIter, SDB_ARBGROUP);
×
1503
}
×
1504

1505
int32_t mndGetArbGroupSize(SMnode *pMnode) {
×
1506
  SSdb *pSdb = pMnode->pSdb;
×
1507
  return sdbGetSize(pSdb, SDB_ARBGROUP);
×
1508
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc