• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.91
/source/dnode/mnode/impl/src/mndArbGroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndArbGroup.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23

24
#define ARBGROUP_VER_NUMBER   1
25
#define ARBGROUP_RESERVE_SIZE 51
26

27
static SHashObj *arbUpdateHash = NULL;
28

29
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
30
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
31
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
32

33
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
34
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
35
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
36

37
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup);
38
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray);
39

40
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
41
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
42
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
43
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
44
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
45
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
46
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
48
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq);
49

50
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
9✔
51
  if (token1 == NULL || token2 == NULL) return -1;
9!
52
  if (strlen(token1) == 0 || strlen(token2) == 0) return -1;
9!
53
  return strncmp(token1, token2, TSDB_ARB_TOKEN_SIZE);
9✔
54
}
55

56
int32_t mndInitArbGroup(SMnode *pMnode) {
8✔
57
  int32_t   code = 0;
8✔
58
  SSdbTable table = {
8✔
59
      .sdbType = SDB_ARBGROUP,
60
      .keyType = SDB_KEY_INT32,
61
      .encodeFp = (SdbEncodeFp)mndArbGroupActionEncode,
62
      .decodeFp = (SdbDecodeFp)mndArbGroupActionDecode,
63
      .insertFp = (SdbInsertFp)mndArbGroupActionInsert,
64
      .updateFp = (SdbUpdateFp)mndArbGroupActionUpdate,
65
      .deleteFp = (SdbDeleteFp)mndArbGroupActionDelete,
66
  };
67

68
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
8✔
69
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer);
8✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
8✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
8✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
8✔
73
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
8✔
74
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_ASSIGN_LEADER, mndProcessAssignLeaderMsg);
8✔
75

76
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
8✔
77
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
8✔
78

79
  arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
8✔
80
  if (arbUpdateHash == NULL) {
8!
81
    code = terrno;
×
82
    TAOS_RETURN(code);
×
83
  }
84

85
  return sdbSetTable(pMnode->pSdb, table);
8✔
86
}
87

88
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
8✔
89

90
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
×
91
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
×
92
  if (pGroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
93
    terrno = TSDB_CODE_MND_ARBGROUP_NOT_EXIST;
×
94
  }
95
  return pGroup;
×
96
}
97

98
void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
×
99
  SSdb *pSdb = pMnode->pSdb;
×
100
  sdbRelease(pSdb, pGroup);
×
101
}
×
102

UNCOV
103
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
×
UNCOV
104
  if (pVgObj->replica != 2) {
×
105
    TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
106
  }
UNCOV
107
  (void)memset(outGroup, 0, sizeof(SArbGroup));
×
UNCOV
108
  outGroup->dbUid = pVgObj->dbUid;
×
UNCOV
109
  outGroup->vgId = pVgObj->vgId;
×
UNCOV
110
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
UNCOV
111
    SArbGroupMember *pMember = &outGroup->members[i];
×
UNCOV
112
    pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
×
113
  }
114

UNCOV
115
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
116
}
117

118
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
1✔
119
  int32_t code = 0;
1✔
120
  int32_t lino = 0;
1✔
121
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1✔
122

123
  int32_t  size = sizeof(SArbGroup) + ARBGROUP_RESERVE_SIZE;
1✔
124
  SSdbRaw *pRaw = sdbAllocRaw(SDB_ARBGROUP, ARBGROUP_VER_NUMBER, size);
1✔
125
  if (pRaw == NULL) goto _OVER;
1!
126

127
  int32_t dataPos = 0;
1✔
128
  SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
1!
129
  SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
1!
130
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
3✔
131
    SArbGroupMember *pMember = &pGroup->members[i];
2✔
132
    SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
2!
133
    SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
2!
134
  }
135
  SDB_SET_INT8(pRaw, dataPos, pGroup->isSync, _OVER)
1!
136

137
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
1✔
138
  SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
1!
139
  SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
1!
140
  SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
1!
141
  SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
1!
142
  SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
1!
143
  SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
1!
144

145
  SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
1!
146

147
  terrno = 0;
1✔
148

149
_OVER:
1✔
150
  if (terrno != 0) {
1!
151
    mError("arbgroup:%d, failed to encode to raw:%p since %s", pGroup->vgId, pRaw, terrstr());
×
152
    sdbFreeRaw(pRaw);
×
153
    return NULL;
×
154
  }
155

156
  mTrace("arbgroup:%d, encode to raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
1!
157
  return pRaw;
1✔
158
}
159

160
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
1✔
161
  int32_t code = 0;
1✔
162
  int32_t lino = 0;
1✔
163
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1✔
164
  SSdbRow   *pRow = NULL;
1✔
165
  SArbGroup *pGroup = NULL;
1✔
166

167
  int8_t sver = 0;
1✔
168
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1!
169

170
  if (sver != ARBGROUP_VER_NUMBER) {
1!
171
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
172
    goto _OVER;
×
173
  }
174

175
  pRow = sdbAllocRow(sizeof(SArbGroup));
1✔
176
  if (pRow == NULL) goto _OVER;
1!
177

178
  pGroup = sdbGetRowObj(pRow);
1✔
179
  if (pGroup == NULL) goto _OVER;
1!
180

181
  int32_t dataPos = 0;
1✔
182
  SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
1!
183
  SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
1!
184
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
3✔
185
    SArbGroupMember *pMember = &pGroup->members[i];
2✔
186
    SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
2!
187
    SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
2!
188

189
    pMember->state.nextHbSeq = 0;
2✔
190
    pMember->state.responsedHbSeq = -1;
2✔
191
    pMember->state.lastHbMs = 0;
2✔
192
  }
193
  SDB_GET_INT8(pRaw, dataPos, &pGroup->isSync, _OVER)
1!
194

195
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
1✔
196
  SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
1!
197
  SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
1!
198
  SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
1!
199
  SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
1!
200
  SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
1!
201
  SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
1!
202

203
  pGroup->mutexInited = false;
1✔
204

205
  SDB_GET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
1!
206

207
  terrno = 0;
1✔
208

209
_OVER:
1✔
210
  if (terrno != 0) {
1!
211
    mError("arbgroup:%d, failed to decode from raw:%p since %s", pGroup == NULL ? 0 : pGroup->vgId, pRaw, terrstr());
×
212
    taosMemoryFreeClear(pRow);
×
213
    return NULL;
×
214
  }
215

216
  mTrace("arbgroup:%d, decode from raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
1!
217
  return pRow;
1✔
218
}
219

UNCOV
220
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
×
UNCOV
221
  mTrace("arbgroup:%d, perform insert action, row:%p", pGroup->vgId, pGroup);
×
UNCOV
222
  if (!pGroup->mutexInited && (taosThreadMutexInit(&pGroup->mutex, NULL) == 0)) {
×
UNCOV
223
    pGroup->mutexInited = true;
×
224
  }
225

UNCOV
226
  return pGroup->mutexInited ? 0 : -1;
×
227
}
228

UNCOV
229
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
×
UNCOV
230
  mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
×
UNCOV
231
  if (pGroup->mutexInited) {
×
UNCOV
232
    (void)taosThreadMutexDestroy(&pGroup->mutex);
×
UNCOV
233
    pGroup->mutexInited = false;
×
234
  }
UNCOV
235
  return 0;
×
236
}
237

UNCOV
238
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
×
UNCOV
239
  mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
×
UNCOV
240
  (void)taosThreadMutexLock(&pOld->mutex);
×
241

UNCOV
242
  if (pOld->version != pNew->version) {
×
243
    mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
×
244
          " new version:%" PRId64,
245
          pOld->vgId, pOld, pNew, pOld->version, pNew->version);
246
    goto _OVER;
×
247
  }
248

UNCOV
249
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
UNCOV
250
    tstrncpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
×
251
  }
UNCOV
252
  pOld->isSync = pNew->isSync;
×
UNCOV
253
  pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
×
UNCOV
254
  tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
×
UNCOV
255
  pOld->assignedLeader.acked = pNew->assignedLeader.acked;
×
UNCOV
256
  pOld->version++;
×
UNCOV
257
  pOld->code = pNew->code;
×
UNCOV
258
  pOld->updateTimeMs = pNew->updateTimeMs;
×
259

UNCOV
260
  mInfo(
×
261
      "arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
262
      "as-token:%s, as-acked:%d, version:%" PRId64,
263
      pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
264
      pOld->assignedLeader.dnodeId, pOld->assignedLeader.token, pOld->assignedLeader.acked, pOld->version);
265

266
_OVER:
×
UNCOV
267
  (void)taosThreadMutexUnlock(&pOld->mutex);
×
268

UNCOV
269
  if (taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)) != 0) {
×
UNCOV
270
    mError("arbgroup:%d, failed to remove from arbUpdateHash", pOld->vgId);
×
271
  }
UNCOV
272
  return 0;
×
273
}
274

275
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
276
  int32_t  code = 0;
×
277
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
278
  if (pRedoRaw == NULL) {
×
279
    code = terrno;
×
280
    TAOS_RETURN(code);
×
281
  }
282
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
283
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
×
284
  return 0;
×
285
}
286

UNCOV
287
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
UNCOV
288
  int32_t  code = 0;
×
UNCOV
289
  SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
×
UNCOV
290
  if (pUndoRaw == NULL) {
×
291
    code = terrno;
×
292
    TAOS_RETURN(code);
×
293
  }
UNCOV
294
  if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
×
UNCOV
295
  if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
×
UNCOV
296
  return 0;
×
297
}
298

UNCOV
299
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
×
UNCOV
300
  int32_t  code = 0;
×
UNCOV
301
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
×
UNCOV
302
  if (pCommitRaw == NULL) {
×
303
    code = terrno;
×
304
    TAOS_RETURN(code);
×
305
  }
UNCOV
306
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
×
UNCOV
307
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
×
UNCOV
308
  return 0;
×
309
}
310

UNCOV
311
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
×
UNCOV
312
  int32_t  code = 0;
×
UNCOV
313
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
UNCOV
314
  if (pRedoRaw == NULL) {
×
315
    code = terrno;
×
316
    TAOS_RETURN(code);
×
317
  }
UNCOV
318
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
UNCOV
319
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
UNCOV
320
  return 0;
×
321
}
322

323
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
324
  int32_t  code = 0;
×
325
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
326
  if (pRedoRaw == NULL) {
×
327
    code = terrno;
×
328
    TAOS_RETURN(code);
×
329
  }
330
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
331
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
332
  return 0;
×
333
}
334

UNCOV
335
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
×
UNCOV
336
  int32_t  code = 0;
×
UNCOV
337
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
×
UNCOV
338
  if (pCommitRaw == NULL) {
×
339
    code = terrno;
×
340
    TAOS_RETURN(code);
×
341
  }
UNCOV
342
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
×
UNCOV
343
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
×
UNCOV
344
  return 0;
×
345
}
346

UNCOV
347
static void *mndBuildArbHeartBeatReq(int32_t *pContLen, char *arbToken, int32_t dnodeId, int64_t arbTerm,
×
348
                                     SArray *hbMembers) {
UNCOV
349
  SVArbHeartBeatReq req = {0};
×
UNCOV
350
  req.dnodeId = dnodeId;
×
UNCOV
351
  req.arbToken = arbToken;
×
UNCOV
352
  req.arbTerm = arbTerm;
×
UNCOV
353
  req.hbMembers = hbMembers;
×
354

UNCOV
355
  int32_t contLen = tSerializeSVArbHeartBeatReq(NULL, 0, &req);
×
UNCOV
356
  if (contLen <= 0) return NULL;
×
357

UNCOV
358
  void *pReq = rpcMallocCont(contLen);
×
UNCOV
359
  if (pReq == NULL) return NULL;
×
360

UNCOV
361
  if (tSerializeSVArbHeartBeatReq(pReq, contLen, &req) <= 0) {
×
362
    rpcFreeCont(pReq);
×
363
    return NULL;
×
364
  }
UNCOV
365
  *pContLen = contLen;
×
UNCOV
366
  return pReq;
×
367
}
368

UNCOV
369
static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t arbTerm, SArray *hbMembers) {
×
UNCOV
370
  int32_t contLen = 0;
×
UNCOV
371
  void   *pHead = mndBuildArbHeartBeatReq(&contLen, arbToken, pDnode->id, arbTerm, hbMembers);
×
UNCOV
372
  if (pHead == NULL) {
×
373
    mError("dnodeId:%d, failed to build arb-hb request", pDnode->id);
×
374
    return -1;
×
375
  }
UNCOV
376
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_HEARTBEAT, .pCont = pHead, .contLen = contLen};
×
377

UNCOV
378
  SEpSet epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
379
  if (epSet.numOfEps == 0) {
×
380
    mError("dnodeId:%d, failed to send arb-hb request to dnode since no epSet found", pDnode->id);
×
381
    rpcFreeCont(pHead);
×
382
    return -1;
×
383
  }
384

UNCOV
385
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
UNCOV
386
  if (code != 0) {
×
387
    mError("dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code);
×
388
  } else {
UNCOV
389
    mTrace("dnodeId:%d, send arb-hb request to dnode", pDnode->id);
×
390
  }
UNCOV
391
  return code;
×
392
}
393

394
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
6✔
395
  int32_t    code = 0;
6✔
396
  SMnode    *pMnode = pReq->info.node;
6✔
397
  SSdb      *pSdb = pMnode->pSdb;
6✔
398
  SArbGroup *pArbGroup = NULL;
6✔
399
  void      *pIter = NULL;
6✔
400

401
  SHashObj *pDnodeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
6✔
402

403
  // collect member of same dnode
404
  while (1) {
405
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
6✔
406
    if (pIter == NULL) break;
6!
407

UNCOV
408
    (void)taosThreadMutexLock(&pArbGroup->mutex);
×
409

UNCOV
410
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
UNCOV
411
      SArbGroupMember *pMember = &pArbGroup->members[i];
×
UNCOV
412
      int32_t          dnodeId = pMember->info.dnodeId;
×
UNCOV
413
      void            *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
×
UNCOV
414
      SArray          *hbMembers = NULL;
×
UNCOV
415
      if (pObj) {
×
UNCOV
416
        hbMembers = *(SArray **)pObj;
×
417
      } else {
UNCOV
418
        hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
×
UNCOV
419
        if (taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES) != 0) {
×
420
          mError("dnodeId:%d, failed to push hb member inty]o hash, but conitnue next at this timer round", dnodeId);
×
421
        }
422
      }
UNCOV
423
      SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
×
UNCOV
424
      if (taosArrayPush(hbMembers, &reqMember) == NULL) {
×
425
        mError("dnodeId:%d, failed to push hb member, but conitnue next at this timer round", dnodeId);
×
426
      }
427
    }
428

UNCOV
429
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
×
UNCOV
430
    sdbRelease(pSdb, pArbGroup);
×
431
  }
432

433
  char arbToken[TSDB_ARB_TOKEN_SIZE];
434
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
6!
435
    mError("failed to get arb token for arb-hb timer");
×
436
    pIter = taosHashIterate(pDnodeHash, NULL);
×
437
    while (pIter) {
×
438
      SArray *hbMembers = *(SArray **)pIter;
×
439
      taosArrayDestroy(hbMembers);
×
440
      pIter = taosHashIterate(pDnodeHash, pIter);
×
441
    }
442
    taosHashCleanup(pDnodeHash);
×
443
    TAOS_RETURN(code);
×
444
  }
445

446
  int64_t nowMs = taosGetTimestampMs();
6✔
447

448
  pIter = NULL;
6✔
UNCOV
449
  while (1) {
×
450
    pIter = taosHashIterate(pDnodeHash, pIter);
6✔
451
    if (pIter == NULL) break;
6!
452

UNCOV
453
    int32_t dnodeId = *(int32_t *)taosHashGetKey(pIter, NULL);
×
UNCOV
454
    SArray *hbMembers = *(SArray **)pIter;
×
455

UNCOV
456
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
UNCOV
457
    if (pDnode == NULL) {
×
458
      mError("dnodeId:%d, timer failed to acquire dnode", dnodeId);
×
459
      taosArrayDestroy(hbMembers);
×
460
      continue;
×
461
    }
462

UNCOV
463
    int64_t mndTerm = mndGetTerm(pMnode);
×
464

UNCOV
465
    if (mndIsDnodeOnline(pDnode, nowMs)) {
×
UNCOV
466
      int32_t sendCode = mndSendArbHeartBeatReq(pDnode, arbToken, mndTerm, hbMembers);
×
UNCOV
467
      if (TSDB_CODE_SUCCESS != sendCode) {
×
468
        mError("dnodeId:%d, timer failed to send arb-hb request", dnodeId);
×
469
      }
470
    }
471

UNCOV
472
    mndReleaseDnode(pMnode, pDnode);
×
UNCOV
473
    taosArrayDestroy(hbMembers);
×
474
  }
475
  taosHashCleanup(pDnodeHash);
6✔
476

477
  return 0;
6✔
478
}
479

UNCOV
480
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
×
481
                                     char *member0Token, char *member1Token) {
UNCOV
482
  SVArbCheckSyncReq req = {0};
×
UNCOV
483
  req.arbToken = arbToken;
×
UNCOV
484
  req.arbTerm = arbTerm;
×
UNCOV
485
  req.member0Token = member0Token;
×
UNCOV
486
  req.member1Token = member1Token;
×
487

UNCOV
488
  int32_t reqLen = tSerializeSVArbCheckSyncReq(NULL, 0, &req);
×
UNCOV
489
  int32_t contLen = reqLen + sizeof(SMsgHead);
×
490

UNCOV
491
  if (contLen <= 0) return NULL;
×
UNCOV
492
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
493
  if (pHead == NULL) return NULL;
×
494

UNCOV
495
  pHead->contLen = htonl(contLen);
×
UNCOV
496
  pHead->vgId = htonl(vgId);
×
UNCOV
497
  if (tSerializeSVArbCheckSyncReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
×
498
    rpcFreeCont(pHead);
×
499
    return NULL;
×
500
  }
UNCOV
501
  *pContLen = contLen;
×
UNCOV
502
  return pHead;
×
503
}
504

UNCOV
505
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
×
506
                                      char *member1Token) {
UNCOV
507
  int32_t code = 0;
×
UNCOV
508
  int32_t contLen = 0;
×
UNCOV
509
  void   *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
×
UNCOV
510
  if (!pHead) {
×
511
    mError("vgId:%d, failed to build check-sync request", vgId);
×
512
    return -1;
×
513
  }
UNCOV
514
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_CHECK_SYNC, .pCont = pHead, .contLen = contLen};
×
515

UNCOV
516
  SEpSet epSet = mndGetVgroupEpsetById(pMnode, vgId);
×
UNCOV
517
  if (epSet.numOfEps == 0) {
×
518
    mError("vgId:%d, failed to send check-sync request since no epSet found", vgId);
×
519
    rpcFreeCont(pHead);
×
520
    code = -1;
×
521
    if (terrno != 0) code = terrno;
×
522
    TAOS_RETURN(code);
×
523
  }
524

UNCOV
525
  code = tmsgSendReq(&epSet, &rpcMsg);
×
UNCOV
526
  if (code != 0) {
×
527
    mError("vgId:%d, failed to send check-sync request since 0x%x", vgId, code);
×
528
  } else {
UNCOV
529
    mDebug("vgId:%d, send check-sync request", vgId);
×
530
  }
UNCOV
531
  return code;
×
532
}
533

534
static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) {
8✔
535
  SArbGroupMember *pArbMember = &pArbGroup->members[index];
8✔
536
  return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutSec * 1000);
8✔
537
}
538

539
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
×
540
                                             char *memberToken, bool force) {
541
  SVArbSetAssignedLeaderReq req = {0};
×
542
  req.arbToken = arbToken;
×
543
  req.arbTerm = arbTerm;
×
544
  req.memberToken = memberToken;
×
545
  if (force) req.force = 1;
×
546

547
  int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
×
548
  int32_t contLen = reqLen + sizeof(SMsgHead);
×
549

550
  if (contLen <= 0) return NULL;
×
551
  SMsgHead *pHead = rpcMallocCont(contLen);
×
552
  if (pHead == NULL) return NULL;
×
553

554
  pHead->contLen = htonl(contLen);
×
555
  pHead->vgId = htonl(vgId);
×
556
  if (tSerializeSVArbSetAssignedLeaderReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
×
557
    rpcFreeCont(pHead);
×
558
    return NULL;
×
559
  }
560
  *pContLen = contLen;
×
561
  return pHead;
×
562
}
563

564
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
×
565
                                              int64_t term, char *memberToken, bool force) {
566
  int32_t code = 0;
×
567
  int32_t contLen = 0;
×
568
  void   *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force);
×
569
  if (!pHead) {
×
570
    mError("vgId:%d, failed to build set-assigned request", vgId);
×
571
    code = -1;
×
572
    if (terrno != 0) code = terrno;
×
573
    TAOS_RETURN(code);
×
574
  }
575
  SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
×
576

577
  SEpSet epSet = mndGetDnodeEpsetById(pMnode, dnodeId);
×
578
  if (epSet.numOfEps == 0) {
×
579
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since no epSet found", dnodeId, vgId);
×
580
    rpcFreeCont(pHead);
×
581
    code = -1;
×
582
    if (terrno != 0) code = terrno;
×
583
    TAOS_RETURN(code);
×
584
  }
585
  code = tmsgSendReq(&epSet, &rpcMsg);
×
586
  if (code != 0) {
×
587
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since 0x%x", dnodeId, vgId, code);
×
588
  } else {
589
    mInfo("dnodeId:%d vgId:%d, send arb-set-assigned request to dnode", dnodeId, vgId);
×
590
  }
591
  return code;
×
592
}
593

594
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
4✔
595
  *pOp = CHECK_SYNC_NONE;
4✔
596
  int32_t code = 0;
4✔
597

598
  int32_t vgId = pArbGroup->vgId;
4✔
599

600
  bool                member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
4✔
601
  bool                member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
4✔
602
  SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
4✔
603
  int32_t             currentAssignedDnodeId = pAssignedLeader->dnodeId;
4✔
604

605
  // 1. has assigned && no response => send req
606
  if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
4!
607
    *pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
2✔
608
    return;
2✔
609
  }
610

611
  // 2. both of the two members are timeout => skip
612
  if (member0IsTimeout && member1IsTimeout) {
2!
UNCOV
613
    return;
×
614
  }
615

616
  // 3. no member is timeout => check sync
617
  if (member0IsTimeout == false && member1IsTimeout == false) {
2!
618
    // no assigned leader and not sync
619
    if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
1!
620
      *pOp = CHECK_SYNC_CHECK_SYNC;
1✔
621
    }
622
    return;
1✔
623
  }
624

625
  // 4. one of the members is timeout => set assigned leader
626
  int32_t          candidateIndex = member0IsTimeout ? 1 : 0;
1✔
627
  SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
1✔
628

629
  // has assigned leader and dnodeId not match => skip
630
  if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
1!
631
    mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, assigned leader has been set to dnodeId:%d", vgId,
×
632
          pMember->info.dnodeId, currentAssignedDnodeId);
633
    return;
×
634
  }
635

636
  // not sync => skip
637
  if (pArbGroup->isSync == false) {
1!
638
    if (currentAssignedDnodeId == pMember->info.dnodeId) {
×
639
      mDebug("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
640
             pMember->info.dnodeId);
641
    } else {
642
      mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
643
            pMember->info.dnodeId);
644
    }
645
    //*pOp = CHECK_SYNC_CHECK_SYNC;
646
    return;
×
647
  }
648

649
  // is sync && no assigned leader => write to sdb
650
  mndArbGroupDupObj(pArbGroup, pNewGroup);
1✔
651
  mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
1✔
652
  *pOp = CHECK_SYNC_UPDATE;
1✔
653
}
654

655
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) {
×
656
  SMnode    *pMnode = pReq->info.node;
×
657
  int32_t    code = -1, lino = 0;
×
658
  SArray    *pArray = NULL;
×
659
  void      *pIter = NULL;
×
660
  SSdb      *pSdb = pMnode->pSdb;
×
661
  SArbGroup *pArbGroup = NULL;
×
662

663
  SAssignLeaderReq req = {0};
×
664
  if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) {
×
665
    code = TSDB_CODE_INVALID_MSG;
×
666
    goto _exit;
×
667
  }
668

669
  mInfo("begin to process assign leader");
×
670

671
  char arbToken[TSDB_ARB_TOKEN_SIZE];
672
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
×
673

674
  int64_t term = mndGetTerm(pMnode);
×
675
  if (term < 0) {
×
676
    mError("arb failed to get term since %s", terrstr());
×
677
    code = -1;
×
678
    if (terrno != 0) code = terrno;
×
679
    TAOS_RETURN(code);
×
680
  }
681

682
  while (1) {
×
683
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
×
684
    if (pIter == NULL) break;
×
685

686
    SArbGroup arbGroupDup = {0};
×
687

688
    (void)taosThreadMutexLock(&pArbGroup->mutex);
×
689
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
×
690
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
×
691

692
    sdbRelease(pSdb, pArbGroup);
×
693

694
    int32_t dnodeId = 0;
×
695
    for (int32_t i = 0; i < 2; i++) {
×
696
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId);
×
697
      bool       isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
698
      mndReleaseDnode(pMnode, pDnode);
×
699
      if (isonline) {
×
700
        dnodeId = arbGroupDup.members[i].info.dnodeId;
×
701
        break;
×
702
      }
703
    }
704

705
    (void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true);
×
706
    mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId);
×
707
  }
708

709
  code = 0;
×
710

711
  // auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen);
712

713
_exit:
×
714
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
715
    mError("failed to assign leader since %s", tstrerror(code));
×
716
  }
717

718
  tFreeSAssignLeaderReq(&req);
×
719
  TAOS_RETURN(code);
×
720
}
721

722
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
4✔
723
  int32_t    code = 0, lino = 0;
4✔
724
  SMnode    *pMnode = pReq->info.node;
4✔
725
  SSdb      *pSdb = pMnode->pSdb;
4✔
726
  SArbGroup *pArbGroup = NULL;
4✔
727
  void      *pIter = NULL;
4✔
728
  SArray    *pUpdateArray = NULL;
4✔
729

730
  char arbToken[TSDB_ARB_TOKEN_SIZE];
731
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
4!
732

733
  int64_t term = mndGetTerm(pMnode);
4✔
734
  if (term < 0) {
4!
735
    mError("arb failed to get term since %s", terrstr());
×
736
    code = -1;
×
737
    if (terrno != 0) code = terrno;
×
738
    TAOS_RETURN(code);
×
739
  }
740

741
  int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
4✔
742
  int64_t nowMs = taosGetTimestampMs();
4✔
743
  if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
4✔
744
    mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs,
1!
745
          nowMs);
746
    return 0;
1✔
747
  }
748

UNCOV
749
  while (1) {
×
750
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
3✔
751
    if (pIter == NULL) break;
3!
752

UNCOV
753
    SArbGroup arbGroupDup = {0};
×
754

UNCOV
755
    (void)taosThreadMutexLock(&pArbGroup->mutex);
×
UNCOV
756
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
×
UNCOV
757
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
×
758

UNCOV
759
    sdbRelease(pSdb, pArbGroup);
×
760

UNCOV
761
    ECheckSyncOp op = CHECK_SYNC_NONE;
×
UNCOV
762
    SArbGroup    newGroup = {0};
×
UNCOV
763
    mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
×
764

UNCOV
765
    int32_t             vgId = arbGroupDup.vgId;
×
UNCOV
766
    SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
×
UNCOV
767
    int32_t             assgndDnodeId = pAssgndLeader->dnodeId;
×
768

UNCOV
769
    switch (op) {
×
UNCOV
770
      case CHECK_SYNC_NONE:
×
UNCOV
771
        mTrace("vgId:%d, arb skip to send msg by check sync", vgId);
×
UNCOV
772
        break;
×
773
      case CHECK_SYNC_SET_ASSIGNED_LEADER:
×
774
        (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false);
×
775
        mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
×
776
        break;
×
UNCOV
777
      case CHECK_SYNC_CHECK_SYNC:
×
UNCOV
778
        (void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
×
779
                                     arbGroupDup.members[1].state.token);
UNCOV
780
        mInfo("vgId:%d, arb send check sync request", vgId);
×
UNCOV
781
        break;
×
782
      case CHECK_SYNC_UPDATE:
×
783
        if (!pUpdateArray) {
×
784
          pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
×
785
          if (!pUpdateArray) {
×
786
            TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
787
          }
788
        }
789

790
        if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
×
791
          TAOS_CHECK_EXIT(terrno);
×
792
        }
793
        break;
×
794
      default:
×
795
        mError("vgId:%d, arb unknown check sync op:%d", vgId, op);
×
796
        break;
×
797
    }
798
  }
799

800
  TAOS_CHECK_EXIT(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
3!
801

802
_exit:
3✔
803
  if (code != 0) {
3!
804
    mError("failed to check sync at line %d since %s", lino, terrstr());
×
805
  }
806

807
  taosArrayDestroy(pUpdateArray);
3✔
808
  return 0;
3✔
809
}
810

UNCOV
811
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
×
UNCOV
812
  SMArbUpdateGroupBatchReq req = {0};
×
UNCOV
813
  req.updateArray = updateArray;
×
814

UNCOV
815
  int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
×
UNCOV
816
  if (contLen <= 0) return NULL;
×
UNCOV
817
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
818
  if (pHead == NULL) return NULL;
×
819

UNCOV
820
  if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
×
821
    rpcFreeCont(pHead);
×
822
    return NULL;
×
823
  }
UNCOV
824
  *pContLen = contLen;
×
UNCOV
825
  return pHead;
×
826
}
827

UNCOV
828
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
×
UNCOV
829
  outGroup->vgId = pGroup->vgId;
×
UNCOV
830
  outGroup->dbUid = pGroup->dbUid;
×
UNCOV
831
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
UNCOV
832
    outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
×
UNCOV
833
    outGroup->members[i].token = pGroup->members[i].state.token;  // just copy the pointer
×
834
  }
UNCOV
835
  outGroup->isSync = pGroup->isSync;
×
UNCOV
836
  outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
×
UNCOV
837
  outGroup->assignedLeader.token = pGroup->assignedLeader.token;  // just copy the pointer
×
UNCOV
838
  outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
×
UNCOV
839
  outGroup->version = pGroup->version;
×
UNCOV
840
  outGroup->code = pGroup->code;
×
UNCOV
841
  outGroup->updateTimeMs = pGroup->updateTimeMs;
×
UNCOV
842
}
×
843

UNCOV
844
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
×
UNCOV
845
  if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
×
UNCOV
846
    mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
×
UNCOV
847
    return 0;
×
848
  }
849

UNCOV
850
  int32_t ret = -1;
×
851

UNCOV
852
  SMArbUpdateGroup newGroup = {0};
×
UNCOV
853
  mndInitArbUpdateGroup(pNewGroup, &newGroup);
×
854

UNCOV
855
  SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
×
UNCOV
856
  if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
×
857

UNCOV
858
  int32_t contLen = 0;
×
UNCOV
859
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
×
UNCOV
860
  if (!pHead) {
×
861
    mError("failed to build arb-update-group request");
×
862
    goto _OVER;
×
863
  }
864

UNCOV
865
  SRpcMsg rpcMsg = {
×
866
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
UNCOV
867
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
UNCOV
868
  if (ret != 0) goto _OVER;
×
869

UNCOV
870
  if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
×
871

UNCOV
872
_OVER:
×
UNCOV
873
  taosArrayDestroy(pArray);
×
UNCOV
874
  return ret;
×
875
}
876

877
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray) {
3✔
878
  int32_t ret = -1;
3✔
879

880
  size_t  sz = taosArrayGetSize(newGroupArray);
3✔
881
  SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
3✔
882
  for (size_t i = 0; i < sz; i++) {
3!
UNCOV
883
    SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
UNCOV
884
    if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
×
UNCOV
885
      mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
×
UNCOV
886
      continue;
×
887
    }
888

UNCOV
889
    SMArbUpdateGroup newGroup = {0};
×
UNCOV
890
    mndInitArbUpdateGroup(pNewGroup, &newGroup);
×
891

UNCOV
892
    if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
×
UNCOV
893
    if (taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0) != 0) goto _OVER;
×
894
  }
895

896
  if (taosArrayGetSize(pArray) == 0) {
3!
897
    ret = 0;
3✔
898
    goto _OVER;
3✔
899
  }
900

UNCOV
901
  int32_t contLen = 0;
×
UNCOV
902
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
×
UNCOV
903
  if (!pHead) {
×
904
    mError("failed to build arb-update-group request");
×
905
    goto _OVER;
×
906
  }
907

UNCOV
908
  SRpcMsg rpcMsg = {
×
909
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
UNCOV
910
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
911

912
_OVER:
3✔
913
  taosArrayDestroy(pArray);
3✔
914

915
  if (ret != 0) {
3!
916
    for (size_t i = 0; i < sz; i++) {
×
917
      SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
918
      if (taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != 0) {
×
919
        mError("failed to remove vgId:%d from arbUpdateHash", pNewGroup->vgId);
×
920
      }
921
    }
922
  }
923

924
  return ret;
3✔
925
}
926

UNCOV
927
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
×
UNCOV
928
  int    code = -1;
×
UNCOV
929
  size_t sz = 0;
×
930

UNCOV
931
  SMArbUpdateGroupBatchReq req = {0};
×
UNCOV
932
  if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
933
    mError("arb failed to decode arb-update-group request");
×
934
    TAOS_RETURN(code);
×
935
  }
936

UNCOV
937
  SMnode *pMnode = pReq->info.node;
×
UNCOV
938
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
×
UNCOV
939
  if (pTrans == NULL) {
×
940
    mError("failed to update arbgroup in create trans, since %s", terrstr());
×
941
    goto _OVER;
×
942
  }
943

UNCOV
944
  sz = taosArrayGetSize(req.updateArray);
×
UNCOV
945
  for (size_t i = 0; i < sz; i++) {
×
UNCOV
946
    SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
×
UNCOV
947
    SArbGroup         newGroup = {0};
×
UNCOV
948
    newGroup.vgId = pUpdateGroup->vgId;
×
UNCOV
949
    newGroup.dbUid = pUpdateGroup->dbUid;
×
UNCOV
950
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
UNCOV
951
      newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
×
UNCOV
952
      tstrncpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
×
953
    }
954

UNCOV
955
    newGroup.isSync = pUpdateGroup->isSync;
×
UNCOV
956
    newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
×
UNCOV
957
    tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
×
UNCOV
958
    newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
×
UNCOV
959
    newGroup.version = pUpdateGroup->version;
×
UNCOV
960
    newGroup.code = pUpdateGroup->code;
×
UNCOV
961
    newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
×
962

UNCOV
963
    mInfo(
×
964
        "trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
965
        "%" PRId64,
966
        pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
967
        newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
968
        newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked,
969
        pUpdateGroup->code, pUpdateGroup->updateTimeMs);
970

UNCOV
971
    SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
×
UNCOV
972
    if (!pOldGroup) {
×
973
      mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
×
974
      if (taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)) != 0) {
×
975
        mError("failed to remove vgId:%d from arbUpdateHash", newGroup.vgId);
×
976
      }
977
      continue;
×
978
    }
979

UNCOV
980
    mndTransAddArbGroupId(pTrans, newGroup.vgId);
×
981

UNCOV
982
    if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
×
983
      mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
×
984
             terrstr());
985
      goto _OVER;
×
986
    }
987

UNCOV
988
    mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
×
989
          pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
990
          newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
991
          newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
992

UNCOV
993
    sdbRelease(pMnode->pSdb, pOldGroup);
×
994
  }
995

UNCOV
996
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
×
UNCOV
997
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
×
998

UNCOV
999
  code = 0;
×
1000

UNCOV
1001
_OVER:
×
UNCOV
1002
  if (code != 0) {
×
1003
    // failed to update arbgroup
1004
    for (size_t i = 0; i < sz; i++) {
×
1005
      SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
×
1006
      if (taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)) != 0) {
×
1007
        mError("failed to remove vgId:%d from arbUpdateHash", pUpdateGroup->vgId);
×
1008
      }
1009
    }
1010
  }
1011

UNCOV
1012
  mndTransDrop(pTrans);
×
UNCOV
1013
  tFreeSMArbUpdateGroupBatchReq(&req);
×
UNCOV
1014
  return code;
×
1015
}
1016

1017
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
4✔
1018
  (void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
4✔
1019
}
4✔
1020

1021
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
1✔
1022
  SArbGroupMember *pMember = &pGroup->members[index];
1✔
1023

1024
  pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
1✔
1025
  tstrncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
1✔
1026
  pGroup->assignedLeader.acked = false;
1✔
1027
}
1✔
1028

1029
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
1✔
1030
  pGroup->assignedLeader.dnodeId = 0;
1✔
1031
  (void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
1✔
1032
  pGroup->assignedLeader.acked = false;
1✔
1033
}
1✔
1034

1035
bool mndCheckArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
3✔
1036
                                 SArbGroup *pNewGroup) {
1037
  bool             updateToken = false;
3✔
1038
  SArbGroupMember *pMember = NULL;
3✔
1039

1040
  (void)taosThreadMutexLock(&pGroup->mutex);
3✔
1041

1042
  int index = 0;
3✔
1043
  for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
3!
1044
    pMember = &pGroup->members[index];
3✔
1045
    if (pMember->info.dnodeId == dnodeId) {
3!
1046
      break;
3✔
1047
    }
UNCOV
1048
    pMember = NULL;
×
1049
  }
1050

1051
  if (pMember == NULL) {
3!
1052
    mInfo("dnodeId:%d vgId:%d, arb token update check failed, no obj found", dnodeId, pRspMember->vgId);
×
1053
    goto _OVER;
×
1054
  }
1055

1056
  if (pMember->state.responsedHbSeq >= pRspMember->hbSeq) {
3✔
1057
    // skip
1058
    mInfo("dnodeId:%d vgId:%d, skip arb token update, heart beat seq expired, local:%d msg:%d", dnodeId,
1!
1059
          pRspMember->vgId, pMember->state.responsedHbSeq, pRspMember->hbSeq);
1060
    goto _OVER;
1✔
1061
  }
1062

1063
  // update hb state
1064
  pMember->state.responsedHbSeq = pRspMember->hbSeq;
2✔
1065
  pMember->state.lastHbMs = nowMs;
2✔
1066
  if (mndArbCheckToken(pMember->state.token, pRspMember->memberToken) == 0) {
2✔
1067
    // skip
1068
    mDebug("dnodeId:%d vgId:%d, skip arb token update, token matched", dnodeId, pRspMember->vgId);
1!
1069
    goto _OVER;
1✔
1070
  }
1071

1072
  // update token
1073
  mndArbGroupDupObj(pGroup, pNewGroup);
1✔
1074
  tstrncpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
1✔
1075
  pNewGroup->isSync = false;
1✔
1076

1077
  bool resetAssigned = false;
1✔
1078
  if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) {
1!
1079
    mndArbGroupResetAssignedLeader(pNewGroup);
1✔
1080
    resetAssigned = true;
1✔
1081
  }
1082

1083
  updateToken = true;
1✔
1084
  mInfo("dnodeId:%d vgId:%d, arb token updating, resetAssigned:%d", dnodeId, pRspMember->vgId, resetAssigned);
1!
1085

1086
_OVER:
×
1087
  (void)taosThreadMutexUnlock(&pGroup->mutex);
3✔
1088
  return updateToken;
3✔
1089
}
1090

UNCOV
1091
static int32_t mndUpdateArbGroupsByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
×
UNCOV
1092
  int64_t nowMs = taosGetTimestampMs();
×
UNCOV
1093
  size_t  size = taosArrayGetSize(memberArray);
×
UNCOV
1094
  SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
×
1095

UNCOV
1096
  for (size_t i = 0; i < size; i++) {
×
UNCOV
1097
    SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
×
1098

UNCOV
1099
    SArbGroup  newGroup = {0};
×
UNCOV
1100
    SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &pRspMember->vgId);
×
UNCOV
1101
    if (pGroup == NULL) {
×
1102
      mInfo("failed to update arb token, vgId:%d not found", pRspMember->vgId);
×
1103
      continue;
×
1104
    }
1105

UNCOV
1106
    bool updateToken = mndCheckArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
×
UNCOV
1107
    if (updateToken) {
×
UNCOV
1108
      if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
×
1109
        mError("failed to push newGroup to updateArray, but continue at this hearbear");
×
1110
      }
1111
    }
1112

UNCOV
1113
    sdbRelease(pMnode->pSdb, pGroup);
×
1114
  }
1115

UNCOV
1116
  TAOS_CHECK_RETURN(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
×
1117

UNCOV
1118
  taosArrayDestroy(pUpdateArray);
×
UNCOV
1119
  return 0;
×
1120
}
1121

1122
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
2✔
1123
                                  bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
1124
  bool updateIsSync = false;
2✔
1125

1126
  (void)taosThreadMutexLock(&pGroup->mutex);
2✔
1127

1128
  if (pGroup->assignedLeader.dnodeId != 0) {
2!
1129
    terrno = TSDB_CODE_SUCCESS;
×
1130
    mInfo("skip to update arb sync, vgId:%d has assigned leader:%d", vgId, pGroup->assignedLeader.dnodeId);
×
1131
    goto _OVER;
×
1132
  }
1133

1134
  char *local0Token = pGroup->members[0].state.token;
2✔
1135
  char *local1Token = pGroup->members[1].state.token;
2✔
1136
  if (mndArbCheckToken(local0Token, member0Token) != 0 || mndArbCheckToken(local1Token, member1Token) != 0) {
2!
1137
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
1✔
1138
    mInfo("skip to update arb sync, memberToken mismatch local:[%s][%s], msg:[%s][%s]", local0Token, local1Token,
1!
1139
          member0Token, member1Token);
1140
    goto _OVER;
1✔
1141
  }
1142

1143
  if (pGroup->isSync != newIsSync) {
1!
1144
    mndArbGroupDupObj(pGroup, pNewGroup);
1✔
1145
    pNewGroup->isSync = newIsSync;
1✔
1146
    pNewGroup->code = code;
1✔
1147
    pNewGroup->updateTimeMs = taosGetTimestampMs();
1✔
1148

1149
    mInfo("vgId:%d, arb isSync updating, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync, pNewGroup->updateTimeMs);
1!
1150
    updateIsSync = true;
1✔
1151
  }
1152

UNCOV
1153
_OVER:
×
1154
  (void)taosThreadMutexUnlock(&pGroup->mutex);
2✔
1155
  return updateIsSync;
2✔
1156
}
1157

UNCOV
1158
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync,
×
1159
                                int32_t rsp_code) {
UNCOV
1160
  int32_t    code = 0;
×
UNCOV
1161
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
×
UNCOV
1162
  if (pGroup == NULL) {
×
1163
    mInfo("failed to update arb sync, vgId:%d not found", vgId);
×
1164
    code = -1;
×
1165
    if (terrno != 0) code = terrno;
×
1166
    TAOS_RETURN(code);
×
1167
  }
1168

UNCOV
1169
  SArbGroup newGroup = {0};
×
1170
  bool      updateIsSync =
UNCOV
1171
      mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
×
UNCOV
1172
  if (updateIsSync) {
×
UNCOV
1173
    if (mndUpdateArbGroup(pMnode, &newGroup) != 0) {
×
1174
      mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
×
1175
    }
1176
  }
1177

UNCOV
1178
  sdbRelease(pMnode->pSdb, pGroup);
×
UNCOV
1179
  return 0;
×
1180
}
1181

UNCOV
1182
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
×
UNCOV
1183
  if (pRsp->contLen == 0) {
×
1184
    mDebug("arb hb-rsp contLen is 0");
×
1185
    return 0;
×
1186
  }
1187

UNCOV
1188
  int32_t code = -1;
×
1189

UNCOV
1190
  SMnode *pMnode = pRsp->info.node;
×
UNCOV
1191
  SSdb   *pSdb = pMnode->pSdb;
×
1192

1193
  char arbToken[TSDB_ARB_TOKEN_SIZE];
UNCOV
1194
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1195
    mError("failed to get arb token for arb-hb response");
×
1196
    TAOS_RETURN(code);
×
1197
  }
1198

UNCOV
1199
  SVArbHeartBeatRsp arbHbRsp = {0};
×
UNCOV
1200
  if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
×
1201
    mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1202
    TAOS_RETURN(code);
×
1203
  }
1204

UNCOV
1205
  if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
×
1206
    mInfo("arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]", arbHbRsp.dnodeId,
×
1207
          arbToken, arbHbRsp.arbToken);
1208
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1209
    goto _OVER;
×
1210
  }
1211

UNCOV
1212
  TAOS_CHECK_GOTO(mndUpdateArbGroupsByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
×
UNCOV
1213
  code = 0;
×
1214

UNCOV
1215
_OVER:
×
UNCOV
1216
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
×
UNCOV
1217
  return code;
×
1218
}
1219

UNCOV
1220
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
×
UNCOV
1221
  if (pRsp->contLen == 0) {
×
UNCOV
1222
    mDebug("arb check-sync-rsp contLen is 0");
×
UNCOV
1223
    return 0;
×
1224
  }
1225

UNCOV
1226
  int32_t code = -1;
×
1227

UNCOV
1228
  SMnode *pMnode = pRsp->info.node;
×
UNCOV
1229
  SSdb   *pSdb = pMnode->pSdb;
×
1230

1231
  char arbToken[TSDB_ARB_TOKEN_SIZE];
UNCOV
1232
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1233
    mError("failed to get arb token for arb-check-sync response");
×
1234
    TAOS_RETURN(code);
×
1235
  }
1236

UNCOV
1237
  SVArbCheckSyncRsp syncRsp = {0};
×
UNCOV
1238
  if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
×
1239
    mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1240
    if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
×
1241
      terrno = TSDB_CODE_SUCCESS;
×
1242
      return 0;
×
1243
    }
1244
    TAOS_RETURN(code);
×
1245
  }
1246

UNCOV
1247
  mInfo("vgId:%d, vnode-arb-check-sync-rsp received, errCode:%d", syncRsp.vgId, syncRsp.errCode);
×
UNCOV
1248
  if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
×
1249
    mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
×
1250
          syncRsp.arbToken);
1251
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1252
    goto _OVER;
×
1253
  }
1254

UNCOV
1255
  bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
×
UNCOV
1256
  if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
×
1257
                               syncRsp.errCode)) != 0) {
1258
    mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
×
1259
    goto _OVER;
×
1260
  }
1261

UNCOV
1262
  code = 0;
×
1263

UNCOV
1264
_OVER:
×
UNCOV
1265
  tFreeSVArbCheckSyncRsp(&syncRsp);
×
UNCOV
1266
  TAOS_RETURN(code);
×
1267
}
1268

1269
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
3✔
1270
                                          SArbGroup *pNewGroup) {
1271
  bool updateAssigned = false;
3✔
1272

1273
  (void)taosThreadMutexLock(&pGroup->mutex);
3✔
1274
  if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
3✔
1275
    mInfo("skip update arb assigned for vgId:%d, member token mismatch, local:[%s] msg:[%s]", vgId,
1!
1276
          pGroup->assignedLeader.token, memberToken);
1277
    goto _OVER;
1✔
1278
  }
1279

1280
  if (errcode != TSDB_CODE_SUCCESS) {
2✔
1281
    mInfo("skip update arb assigned for vgId:%d, since:%s", vgId, tstrerror(errcode));
1!
1282
    goto _OVER;
1✔
1283
  }
1284

1285
  if (pGroup->assignedLeader.acked == false) {
1!
1286
    mndArbGroupDupObj(pGroup, pNewGroup);
1✔
1287
    pNewGroup->isSync = false;
1✔
1288
    pNewGroup->assignedLeader.acked = true;
1✔
1289

1290
    mInfo("vgId:%d, arb received assigned ack", vgId);
1!
1291
    updateAssigned = true;
1✔
1292
    goto _OVER;
1✔
1293
  }
1294

1295
_OVER:
×
1296
  (void)taosThreadMutexUnlock(&pGroup->mutex);
3✔
1297
  return updateAssigned;
3✔
1298
}
1299

1300
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
×
1301
  if (pRsp->contLen == 0) {
×
1302
    mDebug("arb set-assigned-rsp contLen is 0");
×
1303
    return 0;
×
1304
  }
1305

1306
  int32_t code = -1;
×
1307

1308
  SMnode *pMnode = pRsp->info.node;
×
1309
  SSdb   *pSdb = pMnode->pSdb;
×
1310

1311
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1312
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1313
    mError("failed to get arb token for arb-set-assigned response");
×
1314
    TAOS_RETURN(code);
×
1315
  }
1316

1317
  SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
×
1318
  if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
×
1319
    mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1320
    TAOS_RETURN(code);
×
1321
  }
1322

1323
  if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
×
1324
    mInfo("skip update arb assigned for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
×
1325
          arbToken, setAssignedRsp.arbToken);
1326
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1327
    goto _OVER;
×
1328
  }
1329

1330
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
×
1331
  if (!pGroup) {
×
1332
    mError("failed to set arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
×
1333
    code = -1;
×
1334
    if (terrno != 0) code = terrno;
×
1335
    goto _OVER;
×
1336
  }
1337

1338
  SArbGroup newGroup = {0};
×
1339
  bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
×
1340
                                                             pRsp->code, &newGroup);
1341
  if (updateAssigned) {
×
1342
    if ((code = mndUpdateArbGroup(pMnode, &newGroup)) != 0) {
×
1343
      mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
×
1344
      goto _OVER;
×
1345
    }
1346
  }
1347

1348
  code = 0;
×
1349

1350
_OVER:
×
1351
  tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
×
1352
  return code;
×
1353
}
1354

1355
static char *formatTimestamp(char *buf, int64_t val, int precision) {
×
1356
  time_t tt;
1357
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1358
    tt = (time_t)(val / 1000000);
×
1359
  }
1360
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
1361
    tt = (time_t)(val / 1000000000);
×
1362
  } else {
1363
    tt = (time_t)(val / 1000);
×
1364
  }
1365

1366
  struct tm tm;
1367
  if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
×
1368
    mError("failed to get local time");
×
1369
    return NULL;
×
1370
  }
1371
  size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
×
1372

1373
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1374
    sprintf(buf + pos, ".%06d", (int)(val % 1000000));
×
1375
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
1376
    sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
×
1377
  } else {
1378
    sprintf(buf + pos, ".%03d", (int)(val % 1000));
×
1379
  }
1380

1381
  return buf;
×
1382
}
1383

1384
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1385
  SMnode    *pMnode = pReq->info.node;
×
1386
  SSdb      *pSdb = pMnode->pSdb;
×
1387
  int32_t    numOfRows = 0;
×
1388
  int32_t    cols = 0;
×
1389
  SArbGroup *pGroup = NULL;
×
1390
  int32_t    code = 0;
×
1391
  int32_t    lino = 0;
×
1392

1393
  while (numOfRows < rows) {
×
1394
    pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
×
1395
    if (pShow->pIter == NULL) break;
×
1396

1397
    (void)taosThreadMutexLock(&pGroup->mutex);
×
1398

1399
    cols = 0;
×
1400
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1401
    SVgObj          *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
×
1402
    if (!pVgObj) {
×
1403
      (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1404
      sdbRelease(pSdb, pGroup);
×
1405
      continue;
×
1406
    }
1407
    char dbNameInGroup[TSDB_DB_FNAME_LEN];
1408
    tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
×
1409
    sdbRelease(pSdb, pVgObj);
×
1410

1411
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1412
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(dbNameInGroup), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1413
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pGroup, &lino, _OVER);
×
1414

1415
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1416
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false), pGroup, &lino, _OVER);
×
1417

1418
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
1419
      SArbGroupMember *pMember = &pGroup->members[i];
×
1420
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1421
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false), pGroup,
×
1422
                          &lino, _OVER);
1423
    }
1424

1425
    mInfo("vgId:%d, arb group sync:%d, code:%s, update time:%" PRId64, pGroup->vgId, pGroup->isSync,
×
1426
          tstrerror(pGroup->code), pGroup->updateTimeMs);
1427

1428
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1429
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
×
1430

1431
    char strCheckSyncCode[100] = {0};
×
1432
    char bufUpdateTime[40] = {0};
×
1433
    (void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
×
1434
    (void)tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
×
1435

1436
    char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
×
1437
    STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);
×
1438
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1439
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)checkSyncCode, false), pGroup, &lino, _OVER);
×
1440

1441
    if (pGroup->assignedLeader.dnodeId != 0) {
×
1442
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1443
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false),
×
1444
                          pGroup, &lino, _OVER);
1445

1446
      char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
×
1447
      STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1448
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1449
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)token, false), pGroup, &lino, _OVER);
×
1450

1451
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1452
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false),
×
1453
                          pGroup, &lino, _OVER);
1454
    } else {
1455
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1456
      colDataSetNULL(pColInfo, numOfRows);
×
1457

1458
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1459
      colDataSetNULL(pColInfo, numOfRows);
×
1460

1461
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1462
      colDataSetNULL(pColInfo, numOfRows);
×
1463
    }
1464

1465
    (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1466

1467
    numOfRows++;
×
1468
    sdbRelease(pSdb, pGroup);
×
1469
  }
1470

1471
_OVER:
×
1472
  if (code != 0) mError("failed to restrieve arb group at line:%d, since %s", lino, tstrerror(code));
×
1473
  pShow->numOfRows += numOfRows;
×
1474

1475
  return numOfRows;
×
1476
}
1477

1478
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter) {
×
1479
  SSdb *pSdb = pMnode->pSdb;
×
1480
  sdbCancelFetchByType(pSdb, pIter, SDB_ARBGROUP);
×
1481
}
×
1482

1483
int32_t mndGetArbGroupSize(SMnode *pMnode) {
×
1484
  SSdb *pSdb = pMnode->pSdb;
×
1485
  return sdbGetSize(pSdb, SDB_ARBGROUP);
×
1486
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc