• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3532

20 Nov 2024 07:11AM UTC coverage: 60.78% (+0.6%) from 60.213%
#3532

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

119943 of 252352 branches covered (47.53%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

463 existing lines in 99 files now uncovered.

200682 of 275165 relevant lines covered (72.93%)

15642683.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.14
/source/dnode/mnode/impl/src/mndArbGroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndArbGroup.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23

24
#define ARBGROUP_VER_NUMBER   1
25
#define ARBGROUP_RESERVE_SIZE 63
26

27
static SHashObj *arbUpdateHash = NULL;
28

29
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
30
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
31
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
32

33
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
34
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
35
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
36

37
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
38
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup);
39
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray);
40

41
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
42
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
43
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
44
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
45
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
46
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
47
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
48
static void    mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
49

50
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
19✔
51
  if (token1 == NULL || token2 == NULL) return -1;
19!
52
  if (strlen(token1) == 0 || strlen(token2) == 0) return -1;
19!
53
  return strncmp(token1, token2, TSDB_ARB_TOKEN_SIZE);
7✔
54
}
55

56
int32_t mndInitArbGroup(SMnode *pMnode) {
1,979✔
57
  int32_t   code = 0;
1,979✔
58
  SSdbTable table = {
1,979✔
59
      .sdbType = SDB_ARBGROUP,
60
      .keyType = SDB_KEY_INT32,
61
      .encodeFp = (SdbEncodeFp)mndArbGroupActionEncode,
62
      .decodeFp = (SdbDecodeFp)mndArbGroupActionDecode,
63
      .insertFp = (SdbInsertFp)mndArbGroupActionInsert,
64
      .updateFp = (SdbUpdateFp)mndArbGroupActionUpdate,
65
      .deleteFp = (SdbDeleteFp)mndArbGroupActionDelete,
66
  };
67

68
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
1,979✔
69
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer);
1,979✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
1,979✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
1,979✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
1,979✔
73
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
1,979✔
74

75
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
1,979✔
76
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
1,979✔
77

78
  arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,979✔
79
  if (arbUpdateHash == NULL) {
1,979!
80
    code = terrno;
×
81
    TAOS_RETURN(code);
×
82
  }
83

84
  return sdbSetTable(pMnode->pSdb, table);
1,979✔
85
}
86

87
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
1,978✔
88

89
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
×
90
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
×
91
  if (pGroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
92
    terrno = TSDB_CODE_MND_ARBGROUP_NOT_EXIST;
×
93
  }
94
  return pGroup;
×
95
}
96

97
void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
×
98
  SSdb *pSdb = pMnode->pSdb;
×
99
  sdbRelease(pSdb, pGroup);
×
100
}
×
101

102
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
12✔
103
  if (pVgObj->replica != 2) {
12!
104
    TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
105
  }
106
  (void)memset(outGroup, 0, sizeof(SArbGroup));
12✔
107
  outGroup->dbUid = pVgObj->dbUid;
12✔
108
  outGroup->vgId = pVgObj->vgId;
12✔
109
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
36✔
110
    SArbGroupMember *pMember = &outGroup->members[i];
24✔
111
    pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
24✔
112
  }
113

114
  TAOS_RETURN(TSDB_CODE_SUCCESS);
12✔
115
}
116

117
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
30✔
118
  int32_t code = 0;
30✔
119
  int32_t lino = 0;
30✔
120
  terrno = TSDB_CODE_OUT_OF_MEMORY;
30✔
121

122
  int32_t  size = sizeof(SArbGroup) + ARBGROUP_RESERVE_SIZE;
30✔
123
  SSdbRaw *pRaw = sdbAllocRaw(SDB_ARBGROUP, ARBGROUP_VER_NUMBER, size);
30✔
124
  if (pRaw == NULL) goto _OVER;
30!
125

126
  int32_t dataPos = 0;
30✔
127
  SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
30!
128
  SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
30!
129
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
90✔
130
    SArbGroupMember *pMember = &pGroup->members[i];
60✔
131
    SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
60!
132
    SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
60!
133
  }
134
  SDB_SET_INT8(pRaw, dataPos, pGroup->isSync, _OVER)
30!
135

136
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
30✔
137
  SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
30!
138
  SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
30!
139
  SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
30!
140
  SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
30!
141

142
  SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
30!
143

144
  terrno = 0;
30✔
145

146
_OVER:
30✔
147
  if (terrno != 0) {
30!
148
    mError("arbgroup:%d, failed to encode to raw:%p since %s", pGroup->vgId, pRaw, terrstr());
×
149
    sdbFreeRaw(pRaw);
×
150
    return NULL;
×
151
  }
152

153
  mTrace("arbgroup:%d, encode to raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
30!
154
  return pRaw;
30✔
155
}
156

157
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
24✔
158
  int32_t code = 0;
24✔
159
  int32_t lino = 0;
24✔
160
  terrno = TSDB_CODE_OUT_OF_MEMORY;
24✔
161
  SSdbRow   *pRow = NULL;
24✔
162
  SArbGroup *pGroup = NULL;
24✔
163

164
  int8_t sver = 0;
24✔
165
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
24!
166

167
  if (sver != ARBGROUP_VER_NUMBER) {
24!
168
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
169
    goto _OVER;
×
170
  }
171

172
  pRow = sdbAllocRow(sizeof(SArbGroup));
24✔
173
  if (pRow == NULL) goto _OVER;
24!
174

175
  pGroup = sdbGetRowObj(pRow);
24✔
176
  if (pGroup == NULL) goto _OVER;
24!
177

178
  int32_t dataPos = 0;
24✔
179
  SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
24!
180
  SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
24!
181
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
72✔
182
    SArbGroupMember *pMember = &pGroup->members[i];
48✔
183
    SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
48!
184
    SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
48!
185

186
    pMember->state.nextHbSeq = 0;
48✔
187
    pMember->state.responsedHbSeq = -1;
48✔
188
    pMember->state.lastHbMs = 0;
48✔
189
  }
190
  SDB_GET_INT8(pRaw, dataPos, &pGroup->isSync, _OVER)
24!
191

192
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
24✔
193
  SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
24!
194
  SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
24!
195
  SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
24!
196
  SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
24!
197

198
  pGroup->mutexInited = false;
24✔
199

200
  SDB_GET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
24!
201

202
  terrno = 0;
24✔
203

204
_OVER:
24✔
205
  if (terrno != 0) {
24!
206
    mError("arbgroup:%d, failed to decode from raw:%p since %s", pGroup == NULL ? 0 : pGroup->vgId, pRaw, terrstr());
×
207
    taosMemoryFreeClear(pRow);
×
208
    return NULL;
×
209
  }
210

211
  mTrace("arbgroup:%d, decode from raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
24!
212
  return pRow;
24✔
213
}
214

215
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
6✔
216
  mTrace("arbgroup:%d, perform insert action, row:%p", pGroup->vgId, pGroup);
6!
217
  if (!pGroup->mutexInited && (taosThreadMutexInit(&pGroup->mutex, NULL) == 0)) {
6!
218
    pGroup->mutexInited = true;
6✔
219
  }
220

221
  return pGroup->mutexInited ? 0 : -1;
6!
222
}
223

224
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
24✔
225
  mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
24!
226
  if (pGroup->mutexInited) {
24✔
227
    (void)taosThreadMutexDestroy(&pGroup->mutex);
6✔
228
    pGroup->mutexInited = false;
6✔
229
  }
230
  return 0;
24✔
231
}
232

233
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
12✔
234
  mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
12!
235
  (void)taosThreadMutexLock(&pOld->mutex);
12✔
236

237
  if (pOld->version != pNew->version) {
12!
238
    mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
×
239
          " new version:%" PRId64,
240
          pOld->vgId, pOld, pNew, pOld->version, pNew->version);
241
    goto _OVER;
×
242
  }
243

244
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
36✔
245
    tstrncpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
24✔
246
  }
247
  pOld->isSync = pNew->isSync;
12✔
248
  pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
12✔
249
  tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
12✔
250
  pOld->assignedLeader.acked = pNew->assignedLeader.acked;
12✔
251
  pOld->version++;
12✔
252

253
  mInfo(
12!
254
      "arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
255
      "as-token:%s, as-acked:%d, version:%" PRId64,
256
      pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
257
      pOld->assignedLeader.dnodeId, pOld->assignedLeader.token, pOld->assignedLeader.acked, pOld->version);
258

259
_OVER:
×
260
  (void)taosThreadMutexUnlock(&pOld->mutex);
12✔
261

262
  if (taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)) != 0) {
12✔
263
    mError("arbgroup:%d, failed to remove from arbUpdateHash", pOld->vgId);
6!
264
  }
265
  return 0;
12✔
266
}
267

268
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
269
  int32_t  code = 0;
×
270
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
271
  if (pRedoRaw == NULL) {
×
272
    code = terrno;
×
273
    TAOS_RETURN(code);
×
274
  }
275
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
276
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
×
277
  return 0;
×
278
}
279

280
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
281
  int32_t  code = 0;
6✔
282
  SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
6✔
283
  if (pUndoRaw == NULL) {
6!
284
    code = terrno;
×
285
    TAOS_RETURN(code);
×
286
  }
287
  if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
6!
288
  if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
6!
289
  return 0;
6✔
290
}
291

292
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
12✔
293
  int32_t  code = 0;
12✔
294
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
12✔
295
  if (pCommitRaw == NULL) {
12!
296
    code = terrno;
×
297
    TAOS_RETURN(code);
×
298
  }
299
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
12!
300
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
12!
301
  return 0;
12✔
302
}
303

304
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
305
  int32_t  code = 0;
6✔
306
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
6✔
307
  if (pRedoRaw == NULL) {
6!
308
    code = terrno;
×
309
    TAOS_RETURN(code);
×
310
  }
311
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
6!
312
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
6!
313
  return 0;
6✔
314
}
315

316
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
317
  int32_t  code = 0;
×
318
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
319
  if (pRedoRaw == NULL) {
×
320
    code = terrno;
×
321
    TAOS_RETURN(code);
×
322
  }
323
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
324
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
325
  return 0;
×
326
}
327

328
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
329
  int32_t  code = 0;
6✔
330
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
6✔
331
  if (pCommitRaw == NULL) {
6!
332
    code = terrno;
×
333
    TAOS_RETURN(code);
×
334
  }
335
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
6!
336
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
6!
337
  return 0;
6✔
338
}
339

340
static void *mndBuildArbHeartBeatReq(int32_t *pContLen, char *arbToken, int32_t dnodeId, int64_t arbTerm,
7✔
341
                                     SArray *hbMembers) {
342
  SVArbHeartBeatReq req = {0};
7✔
343
  req.dnodeId = dnodeId;
7✔
344
  req.arbToken = arbToken;
7✔
345
  req.arbTerm = arbTerm;
7✔
346
  req.hbMembers = hbMembers;
7✔
347

348
  int32_t contLen = tSerializeSVArbHeartBeatReq(NULL, 0, &req);
7✔
349
  if (contLen <= 0) return NULL;
7!
350

351
  void *pReq = rpcMallocCont(contLen);
7✔
352
  if (pReq == NULL) return NULL;
7!
353

354
  if (tSerializeSVArbHeartBeatReq(pReq, contLen, &req) <= 0) {
7!
355
    rpcFreeCont(pReq);
×
356
    return NULL;
×
357
  }
358
  *pContLen = contLen;
7✔
359
  return pReq;
7✔
360
}
361

362
static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t arbTerm, SArray *hbMembers) {
7✔
363
  int32_t contLen = 0;
7✔
364
  void   *pHead = mndBuildArbHeartBeatReq(&contLen, arbToken, pDnode->id, arbTerm, hbMembers);
7✔
365
  if (pHead == NULL) {
7!
366
    mError("dnodeId:%d, failed to build arb-hb request", pDnode->id);
×
367
    return -1;
×
368
  }
369
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_HEARTBEAT, .pCont = pHead, .contLen = contLen};
7✔
370

371
  SEpSet epSet = mndGetDnodeEpset(pDnode);
7✔
372
  if (epSet.numOfEps == 0) {
7!
373
    mError("dnodeId:%d, failed to send arb-hb request to dnode since no epSet found", pDnode->id);
×
374
    rpcFreeCont(pHead);
×
375
    return -1;
×
376
  }
377

378
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
7✔
379
  if (code != 0) {
7!
380
    mError("dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code);
×
381
  } else {
382
    mTrace("dnodeId:%d, send arb-hb request to dnode", pDnode->id);
7!
383
  }
384
  return code;
7✔
385
}
386

387
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
12,041✔
388
  int32_t    code = 0;
12,041✔
389
  SMnode    *pMnode = pReq->info.node;
12,041✔
390
  SSdb      *pSdb = pMnode->pSdb;
12,041✔
391
  SArbGroup *pArbGroup = NULL;
12,041✔
392
  void      *pIter = NULL;
12,041✔
393

394
  SHashObj *pDnodeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
12,041✔
395

396
  // collect member of same dnode
397
  while (1) {
398
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
12,047✔
399
    if (pIter == NULL) break;
12,047✔
400

401
    (void)taosThreadMutexLock(&pArbGroup->mutex);
6✔
402

403
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
18✔
404
      SArbGroupMember *pMember = &pArbGroup->members[i];
12✔
405
      int32_t          dnodeId = pMember->info.dnodeId;
12✔
406
      void            *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
12✔
407
      SArray          *hbMembers = NULL;
12✔
408
      if (pObj) {
12✔
409
        hbMembers = *(SArray **)pObj;
5✔
410
      } else {
411
        hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
7✔
412
        if (taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES) != 0) {
7!
413
          mError("dnodeId:%d, failed to push hb member inty]o hash, but conitnue next at this timer round", dnodeId);
×
414
        }
415
      }
416
      SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
12✔
417
      if (taosArrayPush(hbMembers, &reqMember) == NULL) {
24!
418
        mError("dnodeId:%d, failed to push hb member, but conitnue next at this timer round", dnodeId);
×
419
      }
420
    }
421

422
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
6✔
423
    sdbRelease(pSdb, pArbGroup);
6✔
424
  }
425

426
  char arbToken[TSDB_ARB_TOKEN_SIZE];
427
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
12,041!
428
    mError("failed to get arb token for arb-hb timer");
×
429
    pIter = taosHashIterate(pDnodeHash, NULL);
×
430
    while (pIter) {
×
431
      SArray *hbMembers = *(SArray **)pIter;
×
432
      taosArrayDestroy(hbMembers);
×
433
      pIter = taosHashIterate(pDnodeHash, pIter);
×
434
    }
435
    taosHashCleanup(pDnodeHash);
×
436
    TAOS_RETURN(code);
×
437
  }
438

439
  int64_t nowMs = taosGetTimestampMs();
12,041✔
440

441
  pIter = NULL;
12,041✔
442
  while (1) {
7✔
443
    pIter = taosHashIterate(pDnodeHash, pIter);
12,048✔
444
    if (pIter == NULL) break;
12,048✔
445

446
    int32_t dnodeId = *(int32_t *)taosHashGetKey(pIter, NULL);
7✔
447
    SArray *hbMembers = *(SArray **)pIter;
7✔
448

449
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
7✔
450
    if (pDnode == NULL) {
7!
451
      mError("dnodeId:%d, timer failed to acquire dnode", dnodeId);
×
452
      taosArrayDestroy(hbMembers);
×
453
      continue;
×
454
    }
455

456
    int64_t mndTerm = mndGetTerm(pMnode);
7✔
457

458
    if (mndIsDnodeOnline(pDnode, nowMs)) {
7!
459
      int32_t sendCode = mndSendArbHeartBeatReq(pDnode, arbToken, mndTerm, hbMembers);
7✔
460
      if (TSDB_CODE_SUCCESS != sendCode) {
7!
461
        mError("dnodeId:%d, timer failed to send arb-hb request", dnodeId);
×
462
      }
463
    }
464

465
    mndReleaseDnode(pMnode, pDnode);
7✔
466
    taosArrayDestroy(hbMembers);
7✔
467
  }
468
  taosHashCleanup(pDnodeHash);
12,041✔
469

470
  return 0;
12,041✔
471
}
472

UNCOV
473
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
×
474
                                     char *member0Token, char *member1Token) {
UNCOV
475
  SVArbCheckSyncReq req = {0};
×
UNCOV
476
  req.arbToken = arbToken;
×
UNCOV
477
  req.arbTerm = arbTerm;
×
UNCOV
478
  req.member0Token = member0Token;
×
UNCOV
479
  req.member1Token = member1Token;
×
480

UNCOV
481
  int32_t reqLen = tSerializeSVArbCheckSyncReq(NULL, 0, &req);
×
UNCOV
482
  int32_t contLen = reqLen + sizeof(SMsgHead);
×
483

UNCOV
484
  if (contLen <= 0) return NULL;
×
UNCOV
485
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
486
  if (pHead == NULL) return NULL;
×
487

UNCOV
488
  pHead->contLen = htonl(contLen);
×
UNCOV
489
  pHead->vgId = htonl(vgId);
×
UNCOV
490
  if (tSerializeSVArbCheckSyncReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
×
491
    rpcFreeCont(pHead);
×
492
    return NULL;
×
493
  }
UNCOV
494
  *pContLen = contLen;
×
UNCOV
495
  return pHead;
×
496
}
497

UNCOV
498
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
×
499
                                      char *member1Token) {
UNCOV
500
  int32_t code = 0;
×
UNCOV
501
  int32_t contLen = 0;
×
UNCOV
502
  void   *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
×
UNCOV
503
  if (!pHead) {
×
504
    mError("vgId:%d, failed to build check-sync request", vgId);
×
505
    return -1;
×
506
  }
UNCOV
507
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_CHECK_SYNC, .pCont = pHead, .contLen = contLen};
×
508

UNCOV
509
  SEpSet epSet = mndGetVgroupEpsetById(pMnode, vgId);
×
UNCOV
510
  if (epSet.numOfEps == 0) {
×
511
    mError("vgId:%d, failed to send check-sync request since no epSet found", vgId);
×
512
    rpcFreeCont(pHead);
×
513
    code = -1;
×
514
    if (terrno != 0) code = terrno;
×
515
    TAOS_RETURN(code);
×
516
  }
517

UNCOV
518
  code = tmsgSendReq(&epSet, &rpcMsg);
×
UNCOV
519
  if (code != 0) {
×
520
    mError("vgId:%d, failed to send check-sync request since 0x%x", vgId, code);
×
521
  } else {
UNCOV
522
    mDebug("vgId:%d, send check-sync request", vgId);
×
523
  }
UNCOV
524
  return code;
×
525
}
526

527
static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) {
10✔
528
  SArbGroupMember *pArbMember = &pArbGroup->members[index];
10✔
529
  return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutSec * 1000);
10✔
530
}
531

532
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
×
533
                                             char *memberToken) {
534
  SVArbSetAssignedLeaderReq req = {0};
×
535
  req.arbToken = arbToken;
×
536
  req.arbTerm = arbTerm;
×
537
  req.memberToken = memberToken;
×
538

539
  int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
×
540
  int32_t contLen = reqLen + sizeof(SMsgHead);
×
541

542
  if (contLen <= 0) return NULL;
×
543
  SMsgHead *pHead = rpcMallocCont(contLen);
×
544
  if (pHead == NULL) return NULL;
×
545

546
  pHead->contLen = htonl(contLen);
×
547
  pHead->vgId = htonl(vgId);
×
548
  if (tSerializeSVArbSetAssignedLeaderReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
×
549
    rpcFreeCont(pHead);
×
550
    return NULL;
×
551
  }
552
  *pContLen = contLen;
×
553
  return pHead;
×
554
}
555

556
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
×
557
                                              int64_t term, char *memberToken) {
558
  int32_t code = 0;
×
559
  int32_t contLen = 0;
×
560
  void   *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken);
×
561
  if (!pHead) {
×
562
    mError("vgId:%d, failed to build set-assigned request", vgId);
×
563
    code = -1;
×
564
    if (terrno != 0) code = terrno;
×
565
    TAOS_RETURN(code);
×
566
  }
567
  SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
×
568

569
  SEpSet epSet = mndGetDnodeEpsetById(pMnode, dnodeId);
×
570
  if (epSet.numOfEps == 0) {
×
571
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since no epSet found", dnodeId, vgId);
×
572
    rpcFreeCont(pHead);
×
573
    code = -1;
×
574
    if (terrno != 0) code = terrno;
×
575
    TAOS_RETURN(code);
×
576
  }
577
  code = tmsgSendReq(&epSet, &rpcMsg);
×
578
  if (code != 0) {
×
579
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since 0x%x", dnodeId, vgId, code);
×
580
  } else {
581
    mInfo("dnodeId:%d vgId:%d, send arb-set-assigned request to dnode", dnodeId, vgId);
×
582
  }
583
  return code;
×
584
}
585

586
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
5✔
587
  *pOp = CHECK_SYNC_NONE;
5✔
588
  int32_t code = 0;
5✔
589

590
  int32_t vgId = pArbGroup->vgId;
5✔
591

592
  bool                member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
5✔
593
  bool                member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
5✔
594
  SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
5✔
595
  int32_t             currentAssignedDnodeId = pAssignedLeader->dnodeId;
5✔
596

597
  // 1. has assigned && no response => send req
598
  if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
5!
599
    *pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
×
600
    return;
×
601
  }
602

603
  // 2. both of the two members are timeout => skip
604
  if (member0IsTimeout && member1IsTimeout) {
5!
605
    return;
5✔
606
  }
607

608
  // 3. no member is timeout => check sync
UNCOV
609
  if (member0IsTimeout == false && member1IsTimeout == false) {
×
610
    // no assigned leader and not sync
UNCOV
611
    if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
×
UNCOV
612
      *pOp = CHECK_SYNC_CHECK_SYNC;
×
613
    }
UNCOV
614
    return;
×
615
  }
616

617
  // 4. one of the members is timeout => set assigned leader
618
  int32_t          candidateIndex = member0IsTimeout ? 1 : 0;
×
619
  SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
×
620

621
  // has assigned leader and dnodeId not match => skip
622
  if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
×
623
    mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, assigned leader has been set to dnodeId:%d", vgId,
×
624
          pMember->info.dnodeId, currentAssignedDnodeId);
625
    return;
×
626
  }
627

628
  // not sync => skip
629
  if (pArbGroup->isSync == false) {
×
630
    if (currentAssignedDnodeId == pMember->info.dnodeId) {
×
631
      mDebug("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
632
             pMember->info.dnodeId);
633
    } else {
634
      mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
635
            pMember->info.dnodeId);
636
    }
637
    return;
×
638
  }
639

640
  // is sync && no assigned leader => write to sdb
641
  mndArbGroupDupObj(pArbGroup, pNewGroup);
×
642
  mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
×
643
  *pOp = CHECK_SYNC_UPDATE;
×
644
}
645

646
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
5,726✔
647
  int32_t    code = 0, lino = 0;
5,726✔
648
  SMnode    *pMnode = pReq->info.node;
5,726✔
649
  SSdb      *pSdb = pMnode->pSdb;
5,726✔
650
  SArbGroup *pArbGroup = NULL;
5,726✔
651
  void      *pIter = NULL;
5,726✔
652
  SArray    *pUpdateArray = NULL;
5,726✔
653

654
  char arbToken[TSDB_ARB_TOKEN_SIZE];
655
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
5,726!
656

657
  int64_t term = mndGetTerm(pMnode);
5,726✔
658
  if (term < 0) {
5,726!
659
    mError("arb failed to get term since %s", terrstr());
×
660
    code = -1;
×
661
    if (terrno != 0) code = terrno;
×
662
    TAOS_RETURN(code);
×
663
  }
664

665
  int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
5,726✔
666
  int64_t nowMs = taosGetTimestampMs();
5,726✔
667
  if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
5,726✔
668
    mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs,
32!
669
          nowMs);
670
    return 0;
32✔
671
  }
672

673
  while (1) {
5✔
674
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
5,699✔
675
    if (pIter == NULL) break;
5,699✔
676

677
    SArbGroup arbGroupDup = {0};
5✔
678

679
    (void)taosThreadMutexLock(&pArbGroup->mutex);
5✔
680
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
5✔
681
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
5✔
682

683
    sdbRelease(pSdb, pArbGroup);
5✔
684

685
    ECheckSyncOp op = CHECK_SYNC_NONE;
5✔
686
    SArbGroup    newGroup = {0};
5✔
687
    mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
5✔
688

689
    int32_t             vgId = arbGroupDup.vgId;
5✔
690
    SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
5✔
691
    int32_t             assgndDnodeId = pAssgndLeader->dnodeId;
5✔
692

693
    switch (op) {
5!
694
      case CHECK_SYNC_NONE:
5✔
695
        mTrace("vgId:%d, arb skip to send msg by check sync", vgId);
5!
696
        break;
5✔
697
      case CHECK_SYNC_SET_ASSIGNED_LEADER:
×
698
        (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token);
×
699
        mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
×
700
        break;
×
UNCOV
701
      case CHECK_SYNC_CHECK_SYNC:
×
UNCOV
702
        (void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
×
703
                                     arbGroupDup.members[1].state.token);
UNCOV
704
        mInfo("vgId:%d, arb send check sync request", vgId);
×
UNCOV
705
        break;
×
706
      case CHECK_SYNC_UPDATE:
×
707
        if (!pUpdateArray) {
×
708
          pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
×
709
          if (!pUpdateArray) {
×
710
            TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
711
          }
712
        }
713

714
        if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
×
715
          TAOS_CHECK_EXIT(terrno);
×
716
        }
717
        break;
×
718
      default:
×
719
        mError("vgId:%d, arb unknown check sync op:%d", vgId, op);
×
720
        break;
×
721
    }
722
  }
723

724
  TAOS_CHECK_EXIT(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
5,694!
725

726
_exit:
5,694✔
727
  if (code != 0) {
5,694!
728
    mError("failed to check sync at line %d since %s", lino, terrstr());
×
729
  }
730

731
  taosArrayDestroy(pUpdateArray);
5,694✔
732
  return 0;
5,694✔
733
}
734

735
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
4✔
736
  SMArbUpdateGroupBatchReq req = {0};
4✔
737
  req.updateArray = updateArray;
4✔
738

739
  int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
4✔
740
  if (contLen <= 0) return NULL;
4!
741
  SMsgHead *pHead = rpcMallocCont(contLen);
4✔
742
  if (pHead == NULL) return NULL;
4!
743

744
  if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
4!
745
    rpcFreeCont(pHead);
×
746
    return NULL;
×
747
  }
748
  *pContLen = contLen;
4✔
749
  return pHead;
4✔
750
}
751

752
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
6✔
753
  outGroup->vgId = pGroup->vgId;
6✔
754
  outGroup->dbUid = pGroup->dbUid;
6✔
755
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
18✔
756
    outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
12✔
757
    outGroup->members[i].token = pGroup->members[i].state.token;  // just copy the pointer
12✔
758
  }
759
  outGroup->isSync = pGroup->isSync;
6✔
760
  outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
6✔
761
  outGroup->assignedLeader.token = pGroup->assignedLeader.token;  // just copy the pointer
6✔
762
  outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
6✔
763
  outGroup->version = pGroup->version;
6✔
764
}
6✔
765

766
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
×
767
  if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
×
768
    mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
×
769
    return 0;
×
770
  }
771

772
  int32_t ret = -1;
×
773

774
  SMArbUpdateGroup newGroup = {0};
×
775
  mndInitArbUpdateGroup(pNewGroup, &newGroup);
×
776

777
  SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
×
778
  if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
×
779

780
  int32_t contLen = 0;
×
781
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
×
782
  if (!pHead) {
×
783
    mError("failed to build arb-update-group request");
×
784
    goto _OVER;
×
785
  }
786

787
  SRpcMsg rpcMsg = {
×
788
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
789
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
790
  if (ret != 0) goto _OVER;
×
791

792
  if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
×
793

794
_OVER:
×
795
  taosArrayDestroy(pArray);
×
796
  return ret;
×
797
}
798

799
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray) {
5,701✔
800
  int32_t ret = -1;
5,701✔
801

802
  size_t  sz = taosArrayGetSize(newGroupArray);
5,701✔
803
  SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
5,701✔
804
  for (size_t i = 0; i < sz; i++) {
5,713✔
805
    SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
12✔
806
    if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
12✔
807
      mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
6!
808
      continue;
6✔
809
    }
810

811
    SMArbUpdateGroup newGroup = {0};
6✔
812
    mndInitArbUpdateGroup(pNewGroup, &newGroup);
6✔
813

814
    if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
6!
815
    if (taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0) != 0) goto _OVER;
6!
816
  }
817

818
  if (taosArrayGetSize(pArray) == 0) {
5,701✔
819
    ret = 0;
5,697✔
820
    goto _OVER;
5,697✔
821
  }
822

823
  int32_t contLen = 0;
4✔
824
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
4✔
825
  if (!pHead) {
4!
826
    mError("failed to build arb-update-group request");
×
827
    goto _OVER;
×
828
  }
829

830
  SRpcMsg rpcMsg = {
4✔
831
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
832
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
4✔
833

834
_OVER:
5,701✔
835
  taosArrayDestroy(pArray);
5,701✔
836

837
  if (ret != 0) {
5,701!
838
    for (size_t i = 0; i < sz; i++) {
×
839
      SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
840
      if (taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != 0) {
×
841
        mError("failed to remove vgId:%d from arbUpdateHash", pNewGroup->vgId);
×
842
      }
843
    }
844
  }
845

846
  return ret;
5,701✔
847
}
848

849
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
4✔
850
  int    code = -1;
4✔
851
  size_t sz = 0;
4✔
852

853
  SMArbUpdateGroupBatchReq req = {0};
4✔
854
  if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
4!
855
    mError("arb failed to decode arb-update-group request");
×
856
    TAOS_RETURN(code);
×
857
  }
858

859
  SMnode *pMnode = pReq->info.node;
4✔
860
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
4✔
861
  if (pTrans == NULL) {
4!
862
    mError("failed to update arbgroup in create trans, since %s", terrstr());
×
863
    goto _OVER;
×
864
  }
865

866
  sz = taosArrayGetSize(req.updateArray);
4✔
867
  for (size_t i = 0; i < sz; i++) {
10✔
868
    SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
6✔
869
    SArbGroup         newGroup = {0};
6✔
870
    newGroup.vgId = pUpdateGroup->vgId;
6✔
871
    newGroup.dbUid = pUpdateGroup->dbUid;
6✔
872
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
18✔
873
      newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
12✔
874
      tstrncpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
12✔
875
    }
876

877
    newGroup.isSync = pUpdateGroup->isSync;
6✔
878
    newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
6✔
879
    tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
6✔
880
    newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
6✔
881
    newGroup.version = pUpdateGroup->version;
6✔
882

883
    SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
6✔
884
    if (!pOldGroup) {
6!
885
      mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
×
886
      if (taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)) != 0) {
×
887
        mError("failed to remove vgId:%d from arbUpdateHash", newGroup.vgId);
×
888
      }
889
      continue;
×
890
    }
891

892
    mndTransAddArbGroupId(pTrans, newGroup.vgId);
6✔
893

894
    if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
6!
895
      mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
×
896
             terrstr());
897
      goto _OVER;
×
898
    }
899

900
    mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
6!
901
          pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
902
          newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
903
          newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
904

905
    sdbRelease(pMnode->pSdb, pOldGroup);
6✔
906
  }
907

908
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
4!
909
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
4!
910

911
  code = 0;
4✔
912

913
_OVER:
4✔
914
  if (code != 0) {
4!
915
    // failed to update arbgroup
916
    for (size_t i = 0; i < sz; i++) {
×
917
      SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
×
918
      if (taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)) != 0) {
×
919
        mError("failed to remove vgId:%d from arbUpdateHash", pUpdateGroup->vgId);
×
920
      }
921
    }
922
  }
923

924
  mndTransDrop(pTrans);
4✔
925
  tFreeSMArbUpdateGroupBatchReq(&req);
4✔
926
  return code;
4✔
927
}
928

929
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
17✔
930
  (void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
17✔
931
}
17✔
932

933
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
×
934
  SArbGroupMember *pMember = &pGroup->members[index];
×
935

936
  pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
×
937
  tstrncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
×
938
  pGroup->assignedLeader.acked = false;
×
939
}
×
940

941
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
×
942
  pGroup->assignedLeader.dnodeId = 0;
×
943
  (void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
×
944
  pGroup->assignedLeader.acked = false;
×
945
}
×
946

947
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
×
948
  int32_t code = -1;
×
949
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
×
950
  if (pTrans == NULL) {
×
951
    mError("failed to update arbgroup in create trans, vgId:%d, since %s", pNew->vgId, terrstr());
×
952
    if (terrno != 0) code = terrno;
×
953
    goto _OVER;
×
954
  }
955

956
  mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
×
957
        pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token,
958
        pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
959
        pNew->assignedLeader.token, pNew->assignedLeader.acked);
960

961
  mndTransAddArbGroupId(pTrans, pNew->vgId);
×
962
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) {
×
963
    goto _OVER;
×
964
  }
965

966
  if ((code = mndSetCreateArbGroupCommitLogs(pTrans, pNew)) != 0) {
×
967
    mError("failed to update arbgroup in set commit log, vgId:%d, since %s", pNew->vgId, tstrerror(code));
×
968
    goto _OVER;
×
969
  }
970

971
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
×
972

973
  code = 0;
×
974

975
_OVER:
×
976
  mndTransDrop(pTrans);
×
977
  return code;
×
978
}
979

980
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
12✔
981
                                  SArbGroup *pNewGroup) {
982
  bool             updateToken = false;
12✔
983
  SArbGroupMember *pMember = NULL;
12✔
984

985
  (void)taosThreadMutexLock(&pGroup->mutex);
12✔
986

987
  int index = 0;
12✔
988
  for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
18!
989
    pMember = &pGroup->members[index];
18✔
990
    if (pMember->info.dnodeId == dnodeId) {
18✔
991
      break;
12✔
992
    }
993
    pMember = NULL;
6✔
994
  }
995

996
  if (pMember == NULL) {
12!
997
    mInfo("dnodeId:%d vgId:%d, arb token update check failed, no obj found", dnodeId, pRspMember->vgId);
×
998
    goto _OVER;
×
999
  }
1000

1001
  if (pMember->state.responsedHbSeq >= pRspMember->hbSeq) {
12!
1002
    // skip
1003
    mInfo("dnodeId:%d vgId:%d, skip arb token update, heart beat seq expired, local:%d msg:%d", dnodeId,
×
1004
          pRspMember->vgId, pMember->state.responsedHbSeq, pRspMember->hbSeq);
1005
    goto _OVER;
×
1006
  }
1007

1008
  // update hb state
1009
  pMember->state.responsedHbSeq = pRspMember->hbSeq;
12✔
1010
  pMember->state.lastHbMs = nowMs;
12✔
1011
  if (mndArbCheckToken(pMember->state.token, pRspMember->memberToken) == 0) {
12!
1012
    // skip
1013
    mDebug("dnodeId:%d vgId:%d, skip arb token update, token matched", dnodeId, pRspMember->vgId);
×
1014
    goto _OVER;
×
1015
  }
1016

1017
  // update token
1018
  mndArbGroupDupObj(pGroup, pNewGroup);
12✔
1019
  tstrncpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
12✔
1020
  pNewGroup->isSync = false;
12✔
1021

1022
  bool resetAssigned = false;
12✔
1023
  if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) {
12!
1024
    mndArbGroupResetAssignedLeader(pNewGroup);
×
1025
    resetAssigned = true;
×
1026
  }
1027

1028
  updateToken = true;
12✔
1029
  mInfo("dnodeId:%d vgId:%d, arb token updating, resetAssigned:%d", dnodeId, pRspMember->vgId, resetAssigned);
12!
1030

1031
_OVER:
×
1032
  (void)taosThreadMutexUnlock(&pGroup->mutex);
12✔
1033
  return updateToken;
12✔
1034
}
1035

1036
static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
7✔
1037
  int64_t nowMs = taosGetTimestampMs();
7✔
1038
  size_t  size = taosArrayGetSize(memberArray);
7✔
1039
  SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
7✔
1040

1041
  for (size_t i = 0; i < size; i++) {
19✔
1042
    SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
12✔
1043

1044
    SArbGroup  newGroup = {0};
12✔
1045
    SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &pRspMember->vgId);
12✔
1046
    if (pGroup == NULL) {
12!
1047
      mInfo("failed to update arb token, vgId:%d not found", pRspMember->vgId);
×
1048
      continue;
×
1049
    }
1050

1051
    bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
12✔
1052
    if (updateToken) {
12!
1053
      if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
12!
1054
        mError("failed to push newGroup to updateArray, but continue at this hearbear");
×
1055
      }
1056
    }
1057

1058
    sdbRelease(pMnode->pSdb, pGroup);
12✔
1059
  }
1060

1061
  TAOS_CHECK_RETURN(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
7!
1062

1063
  taosArrayDestroy(pUpdateArray);
7✔
1064
  return 0;
7✔
1065
}
1066

1067
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
×
1068
                                  bool newIsSync, SArbGroup *pNewGroup) {
1069
  bool updateIsSync = false;
×
1070

1071
  (void)taosThreadMutexLock(&pGroup->mutex);
×
1072

1073
  if (pGroup->assignedLeader.dnodeId != 0) {
×
1074
    terrno = TSDB_CODE_SUCCESS;
×
1075
    mInfo("skip to update arb sync, vgId:%d has assigned leader:%d", vgId, pGroup->assignedLeader.dnodeId);
×
1076
    goto _OVER;
×
1077
  }
1078

1079
  char *local0Token = pGroup->members[0].state.token;
×
1080
  char *local1Token = pGroup->members[1].state.token;
×
1081
  if (mndArbCheckToken(local0Token, member0Token) != 0 || mndArbCheckToken(local1Token, member1Token) != 0) {
×
1082
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1083
    mInfo("skip to update arb sync, memberToken mismatch local:[%s][%s], msg:[%s][%s]", local0Token, local1Token,
×
1084
          member0Token, member1Token);
1085
    goto _OVER;
×
1086
  }
1087

1088
  if (pGroup->isSync != newIsSync) {
×
1089
    mndArbGroupDupObj(pGroup, pNewGroup);
×
1090
    pNewGroup->isSync = newIsSync;
×
1091

1092
    mInfo("vgId:%d, arb isSync updating, new isSync:%d", vgId, newIsSync);
×
1093
    updateIsSync = true;
×
1094
  }
1095

1096
_OVER:
×
1097
  (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1098
  return updateIsSync;
×
1099
}
1100

1101
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync) {
×
1102
  int32_t    code = 0;
×
1103
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
×
1104
  if (pGroup == NULL) {
×
1105
    mInfo("failed to update arb sync, vgId:%d not found", vgId);
×
1106
    code = -1;
×
1107
    if (terrno != 0) code = terrno;
×
1108
    TAOS_RETURN(code);
×
1109
  }
1110

1111
  SArbGroup newGroup = {0};
×
1112
  bool      updateIsSync = mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup);
×
1113
  if (updateIsSync) {
×
1114
    if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
×
1115
      mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
×
1116
    }
1117
  }
1118

1119
  sdbRelease(pMnode->pSdb, pGroup);
×
1120
  return 0;
×
1121
}
1122

1123
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
7✔
1124
  if (pRsp->contLen == 0) {
7!
1125
    mDebug("arb hb-rsp contLen is 0");
×
1126
    return 0;
×
1127
  }
1128

1129
  int32_t code = -1;
7✔
1130

1131
  SMnode *pMnode = pRsp->info.node;
7✔
1132
  SSdb   *pSdb = pMnode->pSdb;
7✔
1133

1134
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1135
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
7!
1136
    mError("failed to get arb token for arb-hb response");
×
1137
    TAOS_RETURN(code);
×
1138
  }
1139

1140
  SVArbHeartBeatRsp arbHbRsp = {0};
7✔
1141
  if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
7!
1142
    mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1143
    TAOS_RETURN(code);
×
1144
  }
1145

1146
  if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
7!
1147
    mInfo("arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]", arbHbRsp.dnodeId,
×
1148
          arbToken, arbHbRsp.arbToken);
1149
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1150
    goto _OVER;
×
1151
  }
1152

1153
  TAOS_CHECK_GOTO(mndUpdateArbHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
7!
1154
  code = 0;
7✔
1155

1156
_OVER:
7✔
1157
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
7✔
1158
  return code;
7✔
1159
}
1160

UNCOV
1161
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
×
UNCOV
1162
  if (pRsp->contLen == 0) {
×
UNCOV
1163
    mDebug("arb check-sync-rsp contLen is 0");
×
UNCOV
1164
    return 0;
×
1165
  }
1166

1167
  int32_t code = -1;
×
1168

1169
  SMnode *pMnode = pRsp->info.node;
×
1170
  SSdb   *pSdb = pMnode->pSdb;
×
1171

1172
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1173
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1174
    mError("failed to get arb token for arb-check-sync response");
×
1175
    TAOS_RETURN(code);
×
1176
  }
1177

1178
  SVArbCheckSyncRsp syncRsp = {0};
×
1179
  if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
×
1180
    mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1181
    if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
×
1182
      terrno = TSDB_CODE_SUCCESS;
×
1183
      return 0;
×
1184
    }
1185
    TAOS_RETURN(code);
×
1186
  }
1187

1188
  if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
×
1189
    mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
×
1190
          syncRsp.arbToken);
1191
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1192
    goto _OVER;
×
1193
  }
1194

1195
  bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
×
1196
  if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync)) != 0) {
×
1197
    mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
×
1198
    goto _OVER;
×
1199
  }
1200

1201
  code = 0;
×
1202

1203
_OVER:
×
1204
  tFreeSVArbCheckSyncRsp(&syncRsp);
×
1205
  TAOS_RETURN(code);
×
1206
}
1207

1208
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
×
1209
                                          SArbGroup *pNewGroup) {
1210
  bool updateAssigned = false;
×
1211

1212
  (void)taosThreadMutexLock(&pGroup->mutex);
×
1213
  if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
×
1214
    mInfo("skip update arb assigned for vgId:%d, member token mismatch, local:[%s] msg:[%s]", vgId,
×
1215
          pGroup->assignedLeader.token, memberToken);
1216
    goto _OVER;
×
1217
  }
1218

1219
  if (errcode != TSDB_CODE_SUCCESS) {
×
1220
    mInfo("skip update arb assigned for vgId:%d, since:%s", vgId, tstrerror(errcode));
×
1221
    goto _OVER;
×
1222
  }
1223

1224
  if (pGroup->assignedLeader.acked == false) {
×
1225
    mndArbGroupDupObj(pGroup, pNewGroup);
×
1226
    pNewGroup->isSync = false;
×
1227
    pNewGroup->assignedLeader.acked = true;
×
1228

1229
    mInfo("vgId:%d, arb received assigned ack", vgId);
×
1230
    updateAssigned = true;
×
1231
    goto _OVER;
×
1232
  }
1233

1234
_OVER:
×
1235
  (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1236
  return updateAssigned;
×
1237
}
1238

1239
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
×
1240
  if (pRsp->contLen == 0) {
×
1241
    mDebug("arb set-assigned-rsp contLen is 0");
×
1242
    return 0;
×
1243
  }
1244

1245
  int32_t code = -1;
×
1246

1247
  SMnode *pMnode = pRsp->info.node;
×
1248
  SSdb   *pSdb = pMnode->pSdb;
×
1249

1250
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1251
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1252
    mError("failed to get arb token for arb-set-assigned response");
×
1253
    TAOS_RETURN(code);
×
1254
  }
1255

1256
  SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
×
1257
  if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
×
1258
    mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1259
    TAOS_RETURN(code);
×
1260
  }
1261

1262
  if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
×
1263
    mInfo("skip update arb assigned for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
×
1264
          arbToken, setAssignedRsp.arbToken);
1265
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1266
    goto _OVER;
×
1267
  }
1268

1269
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
×
1270
  if (!pGroup) {
×
1271
    mError("failed to set arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
×
1272
    code = -1;
×
1273
    if (terrno != 0) code = terrno;
×
1274
    goto _OVER;
×
1275
  }
1276

1277
  SArbGroup newGroup = {0};
×
1278
  bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
×
1279
                                                             pRsp->code, &newGroup);
1280
  if (updateAssigned) {
×
1281
    if ((code = mndPullupArbUpdateGroup(pMnode, &newGroup)) != 0) {
×
1282
      mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
×
1283
      goto _OVER;
×
1284
    }
1285
  }
1286

1287
  code = 0;
×
1288

1289
_OVER:
×
1290
  tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
×
1291
  return code;
×
1292
}
1293

1294
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1295
  SMnode    *pMnode = pReq->info.node;
×
1296
  SSdb      *pSdb = pMnode->pSdb;
×
1297
  int32_t    numOfRows = 0;
×
1298
  int32_t    cols = 0;
×
1299
  SArbGroup *pGroup = NULL;
×
1300
  int32_t    code = 0;
×
1301
  int32_t    lino = 0;
×
1302

1303
  while (numOfRows < rows) {
×
1304
    pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
×
1305
    if (pShow->pIter == NULL) break;
×
1306

1307
    (void)taosThreadMutexLock(&pGroup->mutex);
×
1308

1309
    cols = 0;
×
1310
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1311
    SVgObj          *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
×
1312
    if (!pVgObj) {
×
1313
      (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1314
      sdbRelease(pSdb, pGroup);
×
1315
      continue;
×
1316
    }
1317
    char dbNameInGroup[TSDB_DB_FNAME_LEN];
1318
    strncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
×
1319
    sdbRelease(pSdb, pVgObj);
×
1320

1321
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1322
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(dbNameInGroup), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1323
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pGroup, &lino, _OVER);
×
1324

1325
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1326
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false), pGroup, &lino, _OVER);
×
1327

1328
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
1329
      SArbGroupMember *pMember = &pGroup->members[i];
×
1330
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1331
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false), pGroup,
×
1332
                          &lino, _OVER);
1333
    }
1334

1335
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1336
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
×
1337

1338
    if (pGroup->assignedLeader.dnodeId != 0) {
×
1339
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1340
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false),
×
1341
                          pGroup, &lino, _OVER);
1342

1343
      char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
×
1344
      STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1345
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1346
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)token, false), pGroup, &lino, _OVER);
×
1347

1348
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1349
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false),
×
1350
                          pGroup, &lino, _OVER);
1351
    } else {
1352
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1353
      colDataSetNULL(pColInfo, numOfRows);
×
1354

1355
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1356
      colDataSetNULL(pColInfo, numOfRows);
×
1357

1358
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1359
      colDataSetNULL(pColInfo, numOfRows);
×
1360
    }
1361

1362
    (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1363

1364
    numOfRows++;
×
1365
    sdbRelease(pSdb, pGroup);
×
1366
  }
1367

1368
_OVER:
×
1369
  if (code != 0) mError("failed to restrieve arb group at line:%d, since %s", lino, tstrerror(code));
×
1370
  pShow->numOfRows += numOfRows;
×
1371

1372
  return numOfRows;
×
1373
}
1374

1375
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter) {
×
1376
  SSdb *pSdb = pMnode->pSdb;
×
1377
  sdbCancelFetchByType(pSdb, pIter, SDB_ARBGROUP);
×
1378
}
×
1379

1380
int32_t mndGetArbGroupSize(SMnode *pMnode) {
×
1381
  SSdb *pSdb = pMnode->pSdb;
×
1382
  return sdbGetSize(pSdb, SDB_ARBGROUP);
×
1383
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc