• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.44
/source/dnode/mnode/impl/src/mndArbGroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndArbGroup.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23

24
#define ARBGROUP_VER_NUMBER   1
25
#define ARBGROUP_RESERVE_SIZE 51
26

27
static SHashObj *arbUpdateHash = NULL;
28

29
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
30
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
31
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
32

33
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
34
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
35
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
36

37
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup);
38
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray);
39

40
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
41
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
42
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
43
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
44
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
45
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
46
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter);
48
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq);
49

50
static int32_t mndArbCheckToken(const char *token1, const char *token2) {
176✔
51
  if (token1 == NULL || token2 == NULL) return -1;
176!
52
  if (strlen(token1) == 0 || strlen(token2) == 0) return -1;
176!
53
  return strncmp(token1, token2, TSDB_ARB_TOKEN_SIZE);
121✔
54
}
55

56
int32_t mndInitArbGroup(SMnode *pMnode) {
1,748✔
57
  int32_t   code = 0;
1,748✔
58
  SSdbTable table = {
1,748✔
59
      .sdbType = SDB_ARBGROUP,
60
      .keyType = SDB_KEY_INT32,
61
      .encodeFp = (SdbEncodeFp)mndArbGroupActionEncode,
62
      .decodeFp = (SdbDecodeFp)mndArbGroupActionDecode,
63
      .insertFp = (SdbInsertFp)mndArbGroupActionInsert,
64
      .updateFp = (SdbUpdateFp)mndArbGroupActionUpdate,
65
      .deleteFp = (SdbDeleteFp)mndArbGroupActionDelete,
66
  };
67

68
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
1,748✔
69
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer);
1,748✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
1,748✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
1,748✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
1,748✔
73
  mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
1,748✔
74
  mndSetMsgHandle(pMnode, TDMT_MND_ARB_ASSIGN_LEADER, mndProcessAssignLeaderMsg);
1,748✔
75

76
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
1,748✔
77
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
1,748✔
78

79
  arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,748✔
80
  if (arbUpdateHash == NULL) {
1,748!
81
    code = terrno;
×
82
    TAOS_RETURN(code);
×
83
  }
84

85
  return sdbSetTable(pMnode->pSdb, table);
1,748✔
86
}
87

88
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
1,747✔
89

90
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
×
91
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
×
92
  if (pGroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
93
    terrno = TSDB_CODE_MND_ARBGROUP_NOT_EXIST;
×
94
  }
95
  return pGroup;
×
96
}
97

98
void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
×
99
  SSdb *pSdb = pMnode->pSdb;
×
100
  sdbRelease(pSdb, pGroup);
×
101
}
×
102

103
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
14✔
104
  if (pVgObj->replica != 2) {
14!
105
    TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
106
  }
107
  (void)memset(outGroup, 0, sizeof(SArbGroup));
14✔
108
  outGroup->dbUid = pVgObj->dbUid;
14✔
109
  outGroup->vgId = pVgObj->vgId;
14✔
110
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
42✔
111
    SArbGroupMember *pMember = &outGroup->members[i];
28✔
112
    pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
28✔
113
  }
114

115
  TAOS_RETURN(TSDB_CODE_SUCCESS);
14✔
116
}
117

118
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
41✔
119
  int32_t code = 0;
41✔
120
  int32_t lino = 0;
41✔
121
  terrno = TSDB_CODE_OUT_OF_MEMORY;
41✔
122

123
  int32_t  size = sizeof(SArbGroup) + ARBGROUP_RESERVE_SIZE;
41✔
124
  SSdbRaw *pRaw = sdbAllocRaw(SDB_ARBGROUP, ARBGROUP_VER_NUMBER, size);
41✔
125
  if (pRaw == NULL) goto _OVER;
41!
126

127
  int32_t dataPos = 0;
41✔
128
  SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
41!
129
  SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
41!
130
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
123✔
131
    SArbGroupMember *pMember = &pGroup->members[i];
82✔
132
    SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
82!
133
    SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
82!
134
  }
135
  SDB_SET_INT8(pRaw, dataPos, pGroup->isSync, _OVER)
41!
136

137
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
41✔
138
  SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
41!
139
  SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
41!
140
  SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
41!
141
  SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
41!
142
  SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
41!
143
  SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
41!
144

145
  SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
41!
146

147
  terrno = 0;
41✔
148

149
_OVER:
41✔
150
  if (terrno != 0) {
41!
151
    mError("arbgroup:%d, failed to encode to raw:%p since %s", pGroup->vgId, pRaw, terrstr());
×
152
    sdbFreeRaw(pRaw);
×
153
    return NULL;
×
154
  }
155

156
  mTrace("arbgroup:%d, encode to raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
41!
157
  return pRaw;
41✔
158
}
159

160
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
33✔
161
  int32_t code = 0;
33✔
162
  int32_t lino = 0;
33✔
163
  terrno = TSDB_CODE_OUT_OF_MEMORY;
33✔
164
  SSdbRow   *pRow = NULL;
33✔
165
  SArbGroup *pGroup = NULL;
33✔
166

167
  int8_t sver = 0;
33✔
168
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
33!
169

170
  if (sver != ARBGROUP_VER_NUMBER) {
33!
171
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
172
    goto _OVER;
×
173
  }
174

175
  pRow = sdbAllocRow(sizeof(SArbGroup));
33✔
176
  if (pRow == NULL) goto _OVER;
33!
177

178
  pGroup = sdbGetRowObj(pRow);
33✔
179
  if (pGroup == NULL) goto _OVER;
33!
180

181
  int32_t dataPos = 0;
33✔
182
  SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
33!
183
  SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
33!
184
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
99✔
185
    SArbGroupMember *pMember = &pGroup->members[i];
66✔
186
    SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
66!
187
    SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
66!
188

189
    pMember->state.nextHbSeq = 0;
66✔
190
    pMember->state.responsedHbSeq = -1;
66✔
191
    pMember->state.lastHbMs = 0;
66✔
192
  }
193
  SDB_GET_INT8(pRaw, dataPos, &pGroup->isSync, _OVER)
33!
194

195
  SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
33✔
196
  SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
33!
197
  SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
33!
198
  SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
33!
199
  SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
33!
200
  SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
33!
201
  SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
33!
202

203
  pGroup->mutexInited = false;
33✔
204

205
  SDB_GET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
33!
206

207
  terrno = 0;
33✔
208

209
_OVER:
33✔
210
  if (terrno != 0) {
33!
211
    mError("arbgroup:%d, failed to decode from raw:%p since %s", pGroup == NULL ? 0 : pGroup->vgId, pRaw, terrstr());
×
212
    taosMemoryFreeClear(pRow);
×
213
    return NULL;
×
214
  }
215

216
  mTrace("arbgroup:%d, decode from raw:%p, row:%p", pGroup->vgId, pRaw, pGroup);
33!
217
  return pRow;
33✔
218
}
219

220
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
8✔
221
  mTrace("arbgroup:%d, perform insert action, row:%p", pGroup->vgId, pGroup);
8!
222
  if (!pGroup->mutexInited && (taosThreadMutexInit(&pGroup->mutex, NULL) == 0)) {
8!
223
    pGroup->mutexInited = true;
8✔
224
  }
225

226
  return pGroup->mutexInited ? 0 : -1;
8!
227
}
228

229
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
32✔
230
  mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
32!
231
  if (pGroup->mutexInited) {
32✔
232
    (void)taosThreadMutexDestroy(&pGroup->mutex);
8✔
233
    pGroup->mutexInited = false;
8✔
234
  }
235
  return 0;
32✔
236
}
237

238
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
18✔
239
  mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
18!
240
  (void)taosThreadMutexLock(&pOld->mutex);
18✔
241

242
  if (pOld->version != pNew->version) {
18!
243
    mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
×
244
          " new version:%" PRId64,
245
          pOld->vgId, pOld, pNew, pOld->version, pNew->version);
246
    goto _OVER;
×
247
  }
248

249
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
54✔
250
    tstrncpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
36✔
251
  }
252
  pOld->isSync = pNew->isSync;
18✔
253
  pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
18✔
254
  tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
18✔
255
  pOld->assignedLeader.acked = pNew->assignedLeader.acked;
18✔
256
  pOld->version++;
18✔
257
  pOld->code = pNew->code;
18✔
258
  pOld->updateTimeMs = pNew->updateTimeMs;
18✔
259

260
  mInfo(
18!
261
      "arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
262
      "as-token:%s, as-acked:%d, version:%" PRId64,
263
      pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
264
      pOld->assignedLeader.dnodeId, pOld->assignedLeader.token, pOld->assignedLeader.acked, pOld->version);
265

266
_OVER:
×
267
  (void)taosThreadMutexUnlock(&pOld->mutex);
18✔
268

269
  if (taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)) != 0) {
18✔
270
    mError("arbgroup:%d, failed to remove from arbUpdateHash", pOld->vgId);
6!
271
  }
272
  return 0;
18✔
273
}
274

275
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
276
  int32_t  code = 0;
×
277
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
278
  if (pRedoRaw == NULL) {
×
279
    code = terrno;
×
280
    TAOS_RETURN(code);
×
281
  }
282
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
283
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
×
284
  return 0;
×
285
}
286

287
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
7✔
288
  int32_t  code = 0;
7✔
289
  SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
7✔
290
  if (pUndoRaw == NULL) {
7!
291
    code = terrno;
×
292
    TAOS_RETURN(code);
×
293
  }
294
  if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
7!
295
  if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
7!
296
  return 0;
7✔
297
}
298

299
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
19✔
300
  int32_t  code = 0;
19✔
301
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
19✔
302
  if (pCommitRaw == NULL) {
19!
303
    code = terrno;
×
304
    TAOS_RETURN(code);
×
305
  }
306
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
19!
307
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
19!
308
  return 0;
19✔
309
}
310

311
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
312
  int32_t  code = 0;
6✔
313
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
6✔
314
  if (pRedoRaw == NULL) {
6!
315
    code = terrno;
×
316
    TAOS_RETURN(code);
×
317
  }
318
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
6!
319
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
6!
320
  return 0;
6✔
321
}
322

323
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
×
324
  int32_t  code = 0;
×
325
  SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
×
326
  if (pRedoRaw == NULL) {
×
327
    code = terrno;
×
328
    TAOS_RETURN(code);
×
329
  }
330
  if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
×
331
  if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
×
332
  return 0;
×
333
}
334

335
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
6✔
336
  int32_t  code = 0;
6✔
337
  SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
6✔
338
  if (pCommitRaw == NULL) {
6!
339
    code = terrno;
×
340
    TAOS_RETURN(code);
×
341
  }
342
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
6!
343
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
6!
344
  return 0;
6✔
345
}
346

347
static void *mndBuildArbHeartBeatReq(int32_t *pContLen, char *arbToken, int32_t dnodeId, int64_t arbTerm,
70✔
348
                                     SArray *hbMembers) {
349
  SVArbHeartBeatReq req = {0};
70✔
350
  req.dnodeId = dnodeId;
70✔
351
  req.arbToken = arbToken;
70✔
352
  req.arbTerm = arbTerm;
70✔
353
  req.hbMembers = hbMembers;
70✔
354

355
  int32_t contLen = tSerializeSVArbHeartBeatReq(NULL, 0, &req);
70✔
356
  if (contLen <= 0) return NULL;
70!
357

358
  void *pReq = rpcMallocCont(contLen);
70✔
359
  if (pReq == NULL) return NULL;
70!
360

361
  if (tSerializeSVArbHeartBeatReq(pReq, contLen, &req) <= 0) {
70!
362
    rpcFreeCont(pReq);
×
363
    return NULL;
×
364
  }
365
  *pContLen = contLen;
70✔
366
  return pReq;
70✔
367
}
368

369
static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t arbTerm, SArray *hbMembers) {
70✔
370
  int32_t contLen = 0;
70✔
371
  void   *pHead = mndBuildArbHeartBeatReq(&contLen, arbToken, pDnode->id, arbTerm, hbMembers);
70✔
372
  if (pHead == NULL) {
70!
373
    mError("dnodeId:%d, failed to build arb-hb request", pDnode->id);
×
374
    return -1;
×
375
  }
376
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_HEARTBEAT, .pCont = pHead, .contLen = contLen};
70✔
377

378
  SEpSet epSet = mndGetDnodeEpset(pDnode);
70✔
379
  if (epSet.numOfEps == 0) {
70!
380
    mError("dnodeId:%d, failed to send arb-hb request to dnode since no epSet found", pDnode->id);
×
381
    rpcFreeCont(pHead);
×
382
    return -1;
×
383
  }
384

385
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
70✔
386
  if (code != 0) {
70!
387
    mError("dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code);
×
388
  } else {
389
    mTrace("dnodeId:%d, send arb-hb request to dnode", pDnode->id);
70!
390
  }
391
  return code;
70✔
392
}
393

394
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
34,134✔
395
  int32_t    code = 0;
34,134✔
396
  SMnode    *pMnode = pReq->info.node;
34,134✔
397
  SSdb      *pSdb = pMnode->pSdb;
34,134✔
398
  SArbGroup *pArbGroup = NULL;
34,134✔
399
  void      *pIter = NULL;
34,134✔
400

401
  SHashObj *pDnodeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
34,134✔
402

403
  // collect member of same dnode
404
  while (1) {
405
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
34,178✔
406
    if (pIter == NULL) break;
34,178✔
407

408
    (void)taosThreadMutexLock(&pArbGroup->mutex);
44✔
409

410
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
132✔
411
      SArbGroupMember *pMember = &pArbGroup->members[i];
88✔
412
      int32_t          dnodeId = pMember->info.dnodeId;
88✔
413
      void            *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
88✔
414
      SArray          *hbMembers = NULL;
88✔
415
      if (pObj) {
88✔
416
        hbMembers = *(SArray **)pObj;
18✔
417
      } else {
418
        hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
70✔
419
        if (taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES) != 0) {
70!
420
          mError("dnodeId:%d, failed to push hb member inty]o hash, but conitnue next at this timer round", dnodeId);
×
421
        }
422
      }
423
      SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
88✔
424
      if (taosArrayPush(hbMembers, &reqMember) == NULL) {
176!
425
        mError("dnodeId:%d, failed to push hb member, but conitnue next at this timer round", dnodeId);
×
426
      }
427
    }
428

429
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
44✔
430
    sdbRelease(pSdb, pArbGroup);
44✔
431
  }
432

433
  char arbToken[TSDB_ARB_TOKEN_SIZE];
434
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
34,134!
435
    mError("failed to get arb token for arb-hb timer");
×
436
    pIter = taosHashIterate(pDnodeHash, NULL);
×
437
    while (pIter) {
×
438
      SArray *hbMembers = *(SArray **)pIter;
×
439
      taosArrayDestroy(hbMembers);
×
440
      pIter = taosHashIterate(pDnodeHash, pIter);
×
441
    }
442
    taosHashCleanup(pDnodeHash);
×
443
    TAOS_RETURN(code);
×
444
  }
445

446
  int64_t nowMs = taosGetTimestampMs();
34,134✔
447

448
  pIter = NULL;
34,134✔
449
  while (1) {
70✔
450
    pIter = taosHashIterate(pDnodeHash, pIter);
34,204✔
451
    if (pIter == NULL) break;
34,204✔
452

453
    int32_t dnodeId = *(int32_t *)taosHashGetKey(pIter, NULL);
70✔
454
    SArray *hbMembers = *(SArray **)pIter;
70✔
455

456
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
70✔
457
    if (pDnode == NULL) {
70!
458
      mError("dnodeId:%d, timer failed to acquire dnode", dnodeId);
×
459
      taosArrayDestroy(hbMembers);
×
460
      continue;
×
461
    }
462

463
    int64_t mndTerm = mndGetTerm(pMnode);
70✔
464

465
    if (mndIsDnodeOnline(pDnode, nowMs)) {
70!
466
      int32_t sendCode = mndSendArbHeartBeatReq(pDnode, arbToken, mndTerm, hbMembers);
70✔
467
      if (TSDB_CODE_SUCCESS != sendCode) {
70!
468
        mError("dnodeId:%d, timer failed to send arb-hb request", dnodeId);
×
469
      }
470
    }
471

472
    mndReleaseDnode(pMnode, pDnode);
70✔
473
    taosArrayDestroy(hbMembers);
70✔
474
  }
475
  taosHashCleanup(pDnodeHash);
34,134✔
476

477
  return 0;
34,134✔
478
}
479

480
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
16✔
481
                                     char *member0Token, char *member1Token) {
482
  SVArbCheckSyncReq req = {0};
16✔
483
  req.arbToken = arbToken;
16✔
484
  req.arbTerm = arbTerm;
16✔
485
  req.member0Token = member0Token;
16✔
486
  req.member1Token = member1Token;
16✔
487

488
  int32_t reqLen = tSerializeSVArbCheckSyncReq(NULL, 0, &req);
16✔
489
  int32_t contLen = reqLen + sizeof(SMsgHead);
16✔
490

491
  if (contLen <= 0) return NULL;
16!
492
  SMsgHead *pHead = rpcMallocCont(contLen);
16✔
493
  if (pHead == NULL) return NULL;
16!
494

495
  pHead->contLen = htonl(contLen);
16✔
496
  pHead->vgId = htonl(vgId);
16✔
497
  if (tSerializeSVArbCheckSyncReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
16!
498
    rpcFreeCont(pHead);
×
499
    return NULL;
×
500
  }
501
  *pContLen = contLen;
16✔
502
  return pHead;
16✔
503
}
504

505
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
16✔
506
                                      char *member1Token) {
507
  int32_t code = 0;
16✔
508
  int32_t contLen = 0;
16✔
509
  void   *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
16✔
510
  if (!pHead) {
16!
511
    mError("vgId:%d, failed to build check-sync request", vgId);
×
512
    return -1;
×
513
  }
514
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_ARB_CHECK_SYNC, .pCont = pHead, .contLen = contLen};
16✔
515

516
  SEpSet epSet = mndGetVgroupEpsetById(pMnode, vgId);
16✔
517
  if (epSet.numOfEps == 0) {
16!
518
    mError("vgId:%d, failed to send check-sync request since no epSet found", vgId);
×
519
    rpcFreeCont(pHead);
×
520
    code = -1;
×
521
    if (terrno != 0) code = terrno;
×
522
    TAOS_RETURN(code);
×
523
  }
524

525
  code = tmsgSendReq(&epSet, &rpcMsg);
16✔
526
  if (code != 0) {
16!
527
    mError("vgId:%d, failed to send check-sync request since 0x%x", vgId, code);
×
528
  } else {
529
    mDebug("vgId:%d, send check-sync request", vgId);
16!
530
  }
531
  return code;
16✔
532
}
533

534
static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) {
56✔
535
  SArbGroupMember *pArbMember = &pArbGroup->members[index];
56✔
536
  return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutSec * 1000);
56✔
537
}
538

539
static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
×
540
                                             char *memberToken, bool force) {
541
  SVArbSetAssignedLeaderReq req = {0};
×
542
  req.arbToken = arbToken;
×
543
  req.arbTerm = arbTerm;
×
544
  req.memberToken = memberToken;
×
545
  if (force) req.force = 1;
×
546

547
  int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req);
×
548
  int32_t contLen = reqLen + sizeof(SMsgHead);
×
549

550
  if (contLen <= 0) return NULL;
×
551
  SMsgHead *pHead = rpcMallocCont(contLen);
×
552
  if (pHead == NULL) return NULL;
×
553

554
  pHead->contLen = htonl(contLen);
×
555
  pHead->vgId = htonl(vgId);
×
556
  if (tSerializeSVArbSetAssignedLeaderReq((char *)pHead + sizeof(SMsgHead), contLen, &req) <= 0) {
×
557
    rpcFreeCont(pHead);
×
558
    return NULL;
×
559
  }
560
  *pContLen = contLen;
×
561
  return pHead;
×
562
}
563

564
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
×
565
                                              int64_t term, char *memberToken, bool force) {
566
  int32_t code = 0;
×
567
  int32_t contLen = 0;
×
568
  void   *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force);
×
569
  if (!pHead) {
×
570
    mError("vgId:%d, failed to build set-assigned request", vgId);
×
571
    code = -1;
×
572
    if (terrno != 0) code = terrno;
×
573
    TAOS_RETURN(code);
×
574
  }
575
  SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
×
576

577
  SEpSet epSet = mndGetDnodeEpsetById(pMnode, dnodeId);
×
578
  if (epSet.numOfEps == 0) {
×
579
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since no epSet found", dnodeId, vgId);
×
580
    rpcFreeCont(pHead);
×
581
    code = -1;
×
582
    if (terrno != 0) code = terrno;
×
583
    TAOS_RETURN(code);
×
584
  }
585
  code = tmsgSendReq(&epSet, &rpcMsg);
×
586
  if (code != 0) {
×
587
    mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since 0x%x", dnodeId, vgId, code);
×
588
  } else {
589
    mInfo("dnodeId:%d vgId:%d, send arb-set-assigned request to dnode", dnodeId, vgId);
×
590
  }
591
  return code;
×
592
}
593

594
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
28✔
595
  *pOp = CHECK_SYNC_NONE;
28✔
596
  int32_t code = 0;
28✔
597

598
  int32_t vgId = pArbGroup->vgId;
28✔
599

600
  bool                member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
28✔
601
  bool                member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
28✔
602
  SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
28✔
603
  int32_t             currentAssignedDnodeId = pAssignedLeader->dnodeId;
28✔
604

605
  // 1. has assigned && no response => send req
606
  if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
28!
607
    *pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
2✔
608
    return;
2✔
609
  }
610

611
  // 2. both of the two members are timeout => skip
612
  if (member0IsTimeout && member1IsTimeout) {
26!
613
    return;
2✔
614
  }
615

616
  // 3. no member is timeout => check sync
617
  if (member0IsTimeout == false && member1IsTimeout == false) {
24!
618
    // no assigned leader and not sync
619
    if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
23!
620
      *pOp = CHECK_SYNC_CHECK_SYNC;
17✔
621
    }
622
    return;
23✔
623
  }
624

625
  // 4. one of the members is timeout => set assigned leader
626
  int32_t          candidateIndex = member0IsTimeout ? 1 : 0;
1✔
627
  SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
1✔
628

629
  // has assigned leader and dnodeId not match => skip
630
  if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
1!
631
    mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, assigned leader has been set to dnodeId:%d", vgId,
×
632
          pMember->info.dnodeId, currentAssignedDnodeId);
633
    return;
×
634
  }
635

636
  // not sync => skip
637
  if (pArbGroup->isSync == false) {
1!
638
    if (currentAssignedDnodeId == pMember->info.dnodeId) {
×
639
      mDebug("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
640
             pMember->info.dnodeId);
641
    } else {
642
      mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
×
643
            pMember->info.dnodeId);
644
    }
645
    //*pOp = CHECK_SYNC_CHECK_SYNC;
646
    return;
×
647
  }
648

649
  // is sync && no assigned leader => write to sdb
650
  mndArbGroupDupObj(pArbGroup, pNewGroup);
1✔
651
  mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
1✔
652
  *pOp = CHECK_SYNC_UPDATE;
1✔
653
}
654

655
static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) {
×
656
  SMnode    *pMnode = pReq->info.node;
×
657
  int32_t    code = -1, lino = 0;
×
658
  SArray    *pArray = NULL;
×
659
  void      *pIter = NULL;
×
660
  SSdb      *pSdb = pMnode->pSdb;
×
661
  SArbGroup *pArbGroup = NULL;
×
662

663
  SAssignLeaderReq req = {0};
×
664
  if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) {
×
665
    code = TSDB_CODE_INVALID_MSG;
×
666
    goto _exit;
×
667
  }
668

669
  mInfo("begin to process assign leader");
×
670

671
  char arbToken[TSDB_ARB_TOKEN_SIZE];
672
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
×
673

674
  int64_t term = mndGetTerm(pMnode);
×
675
  if (term < 0) {
×
676
    mError("arb failed to get term since %s", terrstr());
×
677
    code = -1;
×
678
    if (terrno != 0) code = terrno;
×
679
    TAOS_RETURN(code);
×
680
  }
681

682
  while (1) {
×
683
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
×
684
    if (pIter == NULL) break;
×
685

686
    SArbGroup arbGroupDup = {0};
×
687

688
    (void)taosThreadMutexLock(&pArbGroup->mutex);
×
689
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
×
690
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
×
691

692
    sdbRelease(pSdb, pArbGroup);
×
693

694
    int32_t dnodeId = 0;
×
695
    for (int32_t i = 0; i < 2; i++) {
×
696
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId);
×
697
      bool       isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
×
698
      mndReleaseDnode(pMnode, pDnode);
×
699
      if (isonline) {
×
700
        dnodeId = arbGroupDup.members[i].info.dnodeId;
×
701
        break;
×
702
      }
703
    }
704

705
    (void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true);
×
706
    mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId);
×
707
  }
708

709
  code = 0;
×
710

711
  // auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen);
712

713
_exit:
×
714
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
715
    mError("failed to assign leader since %s", tstrerror(code));
×
716
  }
717

718
  tFreeSAssignLeaderReq(&req);
×
719
  TAOS_RETURN(code);
×
720
}
721

722
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
22,514✔
723
  int32_t    code = 0, lino = 0;
22,514✔
724
  SMnode    *pMnode = pReq->info.node;
22,514✔
725
  SSdb      *pSdb = pMnode->pSdb;
22,514✔
726
  SArbGroup *pArbGroup = NULL;
22,514✔
727
  void      *pIter = NULL;
22,514✔
728
  SArray    *pUpdateArray = NULL;
22,514✔
729

730
  char arbToken[TSDB_ARB_TOKEN_SIZE];
731
  TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
22,514!
732

733
  int64_t term = mndGetTerm(pMnode);
22,514✔
734
  if (term < 0) {
22,514!
735
    mError("arb failed to get term since %s", terrstr());
×
736
    code = -1;
×
737
    if (terrno != 0) code = terrno;
×
738
    TAOS_RETURN(code);
×
739
  }
740

741
  int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
22,514✔
742
  int64_t nowMs = taosGetTimestampMs();
22,514✔
743
  if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
22,514✔
744
    mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs,
1,205!
745
          nowMs);
746
    return 0;
1,205✔
747
  }
748

749
  while (1) {
24✔
750
    pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
21,333✔
751
    if (pIter == NULL) break;
21,333✔
752

753
    SArbGroup arbGroupDup = {0};
24✔
754

755
    (void)taosThreadMutexLock(&pArbGroup->mutex);
24✔
756
    mndArbGroupDupObj(pArbGroup, &arbGroupDup);
24✔
757
    (void)taosThreadMutexUnlock(&pArbGroup->mutex);
24✔
758

759
    sdbRelease(pSdb, pArbGroup);
24✔
760

761
    ECheckSyncOp op = CHECK_SYNC_NONE;
24✔
762
    SArbGroup    newGroup = {0};
24✔
763
    mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
24✔
764

765
    int32_t             vgId = arbGroupDup.vgId;
24✔
766
    SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
24✔
767
    int32_t             assgndDnodeId = pAssgndLeader->dnodeId;
24✔
768

769
    switch (op) {
24!
770
      case CHECK_SYNC_NONE:
8✔
771
        mTrace("vgId:%d, arb skip to send msg by check sync", vgId);
8!
772
        break;
8✔
773
      case CHECK_SYNC_SET_ASSIGNED_LEADER:
×
774
        (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false);
×
775
        mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
×
776
        break;
×
777
      case CHECK_SYNC_CHECK_SYNC:
16✔
778
        (void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
16✔
779
                                     arbGroupDup.members[1].state.token);
780
        mInfo("vgId:%d, arb send check sync request", vgId);
16!
781
        break;
16✔
782
      case CHECK_SYNC_UPDATE:
×
783
        if (!pUpdateArray) {
×
784
          pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
×
785
          if (!pUpdateArray) {
×
786
            TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
787
          }
788
        }
789

790
        if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
×
791
          TAOS_CHECK_EXIT(terrno);
×
792
        }
793
        break;
×
794
      default:
×
795
        mError("vgId:%d, arb unknown check sync op:%d", vgId, op);
×
796
        break;
×
797
    }
798
  }
799

800
  TAOS_CHECK_EXIT(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
21,309!
801

802
_exit:
21,309✔
803
  if (code != 0) {
21,309!
804
    mError("failed to check sync at line %d since %s", lino, terrstr());
×
805
  }
806

807
  taosArrayDestroy(pUpdateArray);
21,309✔
808
  return 0;
21,309✔
809
}
810

811
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
10✔
812
  SMArbUpdateGroupBatchReq req = {0};
10✔
813
  req.updateArray = updateArray;
10✔
814

815
  int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
10✔
816
  if (contLen <= 0) return NULL;
10!
817
  SMsgHead *pHead = rpcMallocCont(contLen);
10✔
818
  if (pHead == NULL) return NULL;
10!
819

820
  if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
10!
821
    rpcFreeCont(pHead);
×
822
    return NULL;
×
823
  }
824
  *pContLen = contLen;
10✔
825
  return pHead;
10✔
826
}
827

828
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
12✔
829
  outGroup->vgId = pGroup->vgId;
12✔
830
  outGroup->dbUid = pGroup->dbUid;
12✔
831
  for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
36✔
832
    outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
24✔
833
    outGroup->members[i].token = pGroup->members[i].state.token;  // just copy the pointer
24✔
834
  }
835
  outGroup->isSync = pGroup->isSync;
12✔
836
  outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
12✔
837
  outGroup->assignedLeader.token = pGroup->assignedLeader.token;  // just copy the pointer
12✔
838
  outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
12✔
839
  outGroup->version = pGroup->version;
12✔
840
  outGroup->code = pGroup->code;
12✔
841
  outGroup->updateTimeMs = pGroup->updateTimeMs;
12✔
842
}
12✔
843

844
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
3✔
845
  if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
3✔
846
    mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
1!
847
    return 0;
1✔
848
  }
849

850
  int32_t ret = -1;
2✔
851

852
  SMArbUpdateGroup newGroup = {0};
2✔
853
  mndInitArbUpdateGroup(pNewGroup, &newGroup);
2✔
854

855
  SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
2✔
856
  if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
2!
857

858
  int32_t contLen = 0;
2✔
859
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
2✔
860
  if (!pHead) {
2!
861
    mError("failed to build arb-update-group request");
×
862
    goto _OVER;
×
863
  }
864

865
  SRpcMsg rpcMsg = {
2✔
866
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
867
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
2✔
868
  if (ret != 0) goto _OVER;
2!
869

870
  if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
2!
871

872
_OVER:
2✔
873
  taosArrayDestroy(pArray);
2✔
874
  return ret;
2✔
875
}
876

877
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray) {
21,379✔
878
  int32_t ret = -1;
21,379✔
879

880
  size_t  sz = taosArrayGetSize(newGroupArray);
21,379✔
881
  SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
21,379✔
882
  for (size_t i = 0; i < sz; i++) {
21,437✔
883
    SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
58✔
884
    if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
58✔
885
      mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
48!
886
      continue;
48✔
887
    }
888

889
    SMArbUpdateGroup newGroup = {0};
10✔
890
    mndInitArbUpdateGroup(pNewGroup, &newGroup);
10✔
891

892
    if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
10!
893
    if (taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0) != 0) goto _OVER;
10!
894
  }
895

896
  if (taosArrayGetSize(pArray) == 0) {
21,379✔
897
    ret = 0;
21,371✔
898
    goto _OVER;
21,371✔
899
  }
900

901
  int32_t contLen = 0;
8✔
902
  void   *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
8✔
903
  if (!pHead) {
8!
904
    mError("failed to build arb-update-group request");
×
905
    goto _OVER;
×
906
  }
907

908
  SRpcMsg rpcMsg = {
8✔
909
      .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
910
  ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
8✔
911

912
_OVER:
21,379✔
913
  taosArrayDestroy(pArray);
21,379✔
914

915
  if (ret != 0) {
21,379!
916
    for (size_t i = 0; i < sz; i++) {
×
917
      SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
×
918
      if (taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != 0) {
×
919
        mError("failed to remove vgId:%d from arbUpdateHash", pNewGroup->vgId);
×
920
      }
921
    }
922
  }
923

924
  return ret;
21,379✔
925
}
926

927
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
10✔
928
  int    code = -1;
10✔
929
  size_t sz = 0;
10✔
930

931
  SMArbUpdateGroupBatchReq req = {0};
10✔
932
  if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
10!
933
    mError("arb failed to decode arb-update-group request");
×
934
    TAOS_RETURN(code);
×
935
  }
936

937
  SMnode *pMnode = pReq->info.node;
10✔
938
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
10✔
939
  if (pTrans == NULL) {
10!
940
    mError("failed to update arbgroup in create trans, since %s", terrstr());
×
941
    goto _OVER;
×
942
  }
943

944
  sz = taosArrayGetSize(req.updateArray);
10✔
945
  for (size_t i = 0; i < sz; i++) {
22✔
946
    SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
12✔
947
    SArbGroup         newGroup = {0};
12✔
948
    newGroup.vgId = pUpdateGroup->vgId;
12✔
949
    newGroup.dbUid = pUpdateGroup->dbUid;
12✔
950
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
36✔
951
      newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
24✔
952
      tstrncpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
24✔
953
    }
954

955
    newGroup.isSync = pUpdateGroup->isSync;
12✔
956
    newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
12✔
957
    tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
12✔
958
    newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
12✔
959
    newGroup.version = pUpdateGroup->version;
12✔
960
    newGroup.code = pUpdateGroup->code;
12✔
961
    newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
12✔
962

963
    mInfo(
12!
964
        "trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
965
        "%" PRId64,
966
        pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
967
        newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
968
        newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked,
969
        pUpdateGroup->code, pUpdateGroup->updateTimeMs);
970

971
    SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
12✔
972
    if (!pOldGroup) {
12!
973
      mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
×
974
      if (taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)) != 0) {
×
975
        mError("failed to remove vgId:%d from arbUpdateHash", newGroup.vgId);
×
976
      }
977
      continue;
×
978
    }
979

980
    mndTransAddArbGroupId(pTrans, newGroup.vgId);
12✔
981

982
    if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
12!
983
      mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
×
984
             terrstr());
985
      goto _OVER;
×
986
    }
987

988
    mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
12!
989
          pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
990
          newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
991
          newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
992

993
    sdbRelease(pMnode->pSdb, pOldGroup);
12✔
994
  }
995

996
  if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
10!
997
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
10!
998

999
  code = 0;
10✔
1000

1001
_OVER:
10✔
1002
  if (code != 0) {
10!
1003
    // failed to update arbgroup
1004
    for (size_t i = 0; i < sz; i++) {
×
1005
      SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
×
1006
      if (taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)) != 0) {
×
1007
        mError("failed to remove vgId:%d from arbUpdateHash", pUpdateGroup->vgId);
×
1008
      }
1009
    }
1010
  }
1011

1012
  mndTransDrop(pTrans);
10✔
1013
  tFreeSMArbUpdateGroupBatchReq(&req);
10✔
1014
  return code;
10✔
1015
}
1016

1017
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
89✔
1018
  (void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
89✔
1019
}
89✔
1020

1021
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
1✔
1022
  SArbGroupMember *pMember = &pGroup->members[index];
1✔
1023

1024
  pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
1✔
1025
  tstrncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
1✔
1026
  pGroup->assignedLeader.acked = false;
1✔
1027
}
1✔
1028

1029
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
1✔
1030
  pGroup->assignedLeader.dnodeId = 0;
1✔
1031
  (void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
1✔
1032
  pGroup->assignedLeader.acked = false;
1✔
1033
}
1✔
1034

1035
bool mndCheckArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
91✔
1036
                                 SArbGroup *pNewGroup) {
1037
  bool             updateToken = false;
91✔
1038
  SArbGroupMember *pMember = NULL;
91✔
1039

1040
  (void)taosThreadMutexLock(&pGroup->mutex);
91✔
1041

1042
  int index = 0;
91✔
1043
  for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
135!
1044
    pMember = &pGroup->members[index];
135✔
1045
    if (pMember->info.dnodeId == dnodeId) {
135✔
1046
      break;
91✔
1047
    }
1048
    pMember = NULL;
44✔
1049
  }
1050

1051
  if (pMember == NULL) {
91!
1052
    mInfo("dnodeId:%d vgId:%d, arb token update check failed, no obj found", dnodeId, pRspMember->vgId);
×
1053
    goto _OVER;
×
1054
  }
1055

1056
  if (pMember->state.responsedHbSeq >= pRspMember->hbSeq) {
91✔
1057
    // skip
1058
    mInfo("dnodeId:%d vgId:%d, skip arb token update, heart beat seq expired, local:%d msg:%d", dnodeId,
1!
1059
          pRspMember->vgId, pMember->state.responsedHbSeq, pRspMember->hbSeq);
1060
    goto _OVER;
1✔
1061
  }
1062

1063
  // update hb state
1064
  pMember->state.responsedHbSeq = pRspMember->hbSeq;
90✔
1065
  pMember->state.lastHbMs = nowMs;
90✔
1066
  if (mndArbCheckToken(pMember->state.token, pRspMember->memberToken) == 0) {
90✔
1067
    // skip
1068
    mDebug("dnodeId:%d vgId:%d, skip arb token update, token matched", dnodeId, pRspMember->vgId);
31✔
1069
    goto _OVER;
31✔
1070
  }
1071

1072
  // update token
1073
  mndArbGroupDupObj(pGroup, pNewGroup);
59✔
1074
  tstrncpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
59✔
1075
  pNewGroup->isSync = false;
59✔
1076

1077
  bool resetAssigned = false;
59✔
1078
  if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) {
59✔
1079
    mndArbGroupResetAssignedLeader(pNewGroup);
1✔
1080
    resetAssigned = true;
1✔
1081
  }
1082

1083
  updateToken = true;
59✔
1084
  mInfo("dnodeId:%d vgId:%d, arb token updating, resetAssigned:%d", dnodeId, pRspMember->vgId, resetAssigned);
59!
1085

1086
_OVER:
×
1087
  (void)taosThreadMutexUnlock(&pGroup->mutex);
91✔
1088
  return updateToken;
91✔
1089
}
1090

1091
static int32_t mndUpdateArbGroupsByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
70✔
1092
  int64_t nowMs = taosGetTimestampMs();
70✔
1093
  size_t  size = taosArrayGetSize(memberArray);
70✔
1094
  SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
70✔
1095

1096
  for (size_t i = 0; i < size; i++) {
158✔
1097
    SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
88✔
1098

1099
    SArbGroup  newGroup = {0};
88✔
1100
    SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &pRspMember->vgId);
88✔
1101
    if (pGroup == NULL) {
88!
1102
      mInfo("failed to update arb token, vgId:%d not found", pRspMember->vgId);
×
1103
      continue;
×
1104
    }
1105

1106
    bool updateToken = mndCheckArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
88✔
1107
    if (updateToken) {
88✔
1108
      if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
58!
1109
        mError("failed to push newGroup to updateArray, but continue at this hearbear");
×
1110
      }
1111
    }
1112

1113
    sdbRelease(pMnode->pSdb, pGroup);
88✔
1114
  }
1115

1116
  TAOS_CHECK_RETURN(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
70!
1117

1118
  taosArrayDestroy(pUpdateArray);
70✔
1119
  return 0;
70✔
1120
}
1121

1122
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
5✔
1123
                                  bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
1124
  bool updateIsSync = false;
5✔
1125

1126
  (void)taosThreadMutexLock(&pGroup->mutex);
5✔
1127

1128
  if (pGroup->assignedLeader.dnodeId != 0) {
5!
1129
    terrno = TSDB_CODE_SUCCESS;
×
1130
    mInfo("skip to update arb sync, vgId:%d has assigned leader:%d", vgId, pGroup->assignedLeader.dnodeId);
×
1131
    goto _OVER;
×
1132
  }
1133

1134
  char *local0Token = pGroup->members[0].state.token;
5✔
1135
  char *local1Token = pGroup->members[1].state.token;
5✔
1136
  if (mndArbCheckToken(local0Token, member0Token) != 0 || mndArbCheckToken(local1Token, member1Token) != 0) {
5!
1137
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
1✔
1138
    mInfo("skip to update arb sync, memberToken mismatch local:[%s][%s], msg:[%s][%s]", local0Token, local1Token,
1!
1139
          member0Token, member1Token);
1140
    goto _OVER;
1✔
1141
  }
1142

1143
  if (pGroup->isSync != newIsSync) {
4!
1144
    mndArbGroupDupObj(pGroup, pNewGroup);
4✔
1145
    pNewGroup->isSync = newIsSync;
4✔
1146
    pNewGroup->code = code;
4✔
1147
    pNewGroup->updateTimeMs = taosGetTimestampMs();
4✔
1148

1149
    mInfo("vgId:%d, arb isSync updating, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync, pNewGroup->updateTimeMs);
4!
1150
    updateIsSync = true;
4✔
1151
  }
1152

UNCOV
1153
_OVER:
×
1154
  (void)taosThreadMutexUnlock(&pGroup->mutex);
5✔
1155
  return updateIsSync;
5✔
1156
}
1157

1158
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync,
3✔
1159
                                int32_t rsp_code) {
1160
  int32_t    code = 0;
3✔
1161
  SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
3✔
1162
  if (pGroup == NULL) {
3!
1163
    mInfo("failed to update arb sync, vgId:%d not found", vgId);
×
1164
    code = -1;
×
1165
    if (terrno != 0) code = terrno;
×
1166
    TAOS_RETURN(code);
×
1167
  }
1168

1169
  SArbGroup newGroup = {0};
3✔
1170
  bool      updateIsSync =
1171
      mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
3✔
1172
  if (updateIsSync) {
3!
1173
    if (mndUpdateArbGroup(pMnode, &newGroup) != 0) {
3!
1174
      mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
×
1175
    }
1176
  }
1177

1178
  sdbRelease(pMnode->pSdb, pGroup);
3✔
1179
  return 0;
3✔
1180
}
1181

1182
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
70✔
1183
  if (pRsp->contLen == 0) {
70!
1184
    mDebug("arb hb-rsp contLen is 0");
×
1185
    return 0;
×
1186
  }
1187

1188
  int32_t code = -1;
70✔
1189

1190
  SMnode *pMnode = pRsp->info.node;
70✔
1191
  SSdb   *pSdb = pMnode->pSdb;
70✔
1192

1193
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1194
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
70!
1195
    mError("failed to get arb token for arb-hb response");
×
1196
    TAOS_RETURN(code);
×
1197
  }
1198

1199
  SVArbHeartBeatRsp arbHbRsp = {0};
70✔
1200
  if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
70!
1201
    mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1202
    TAOS_RETURN(code);
×
1203
  }
1204

1205
  if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
70!
1206
    mInfo("arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]", arbHbRsp.dnodeId,
×
1207
          arbToken, arbHbRsp.arbToken);
1208
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1209
    goto _OVER;
×
1210
  }
1211

1212
  TAOS_CHECK_GOTO(mndUpdateArbGroupsByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
70!
1213
  code = 0;
70✔
1214

1215
_OVER:
70✔
1216
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
70✔
1217
  return code;
70✔
1218
}
1219

1220
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
16✔
1221
  if (pRsp->contLen == 0) {
16✔
1222
    mDebug("arb check-sync-rsp contLen is 0");
13!
1223
    return 0;
13✔
1224
  }
1225

1226
  int32_t code = -1;
3✔
1227

1228
  SMnode *pMnode = pRsp->info.node;
3✔
1229
  SSdb   *pSdb = pMnode->pSdb;
3✔
1230

1231
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1232
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
3!
1233
    mError("failed to get arb token for arb-check-sync response");
×
1234
    TAOS_RETURN(code);
×
1235
  }
1236

1237
  SVArbCheckSyncRsp syncRsp = {0};
3✔
1238
  if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
3!
1239
    mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1240
    if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
×
1241
      terrno = TSDB_CODE_SUCCESS;
×
1242
      return 0;
×
1243
    }
1244
    TAOS_RETURN(code);
×
1245
  }
1246

1247
  mInfo("vgId:%d, vnode-arb-check-sync-rsp received, errCode:%d", syncRsp.vgId, syncRsp.errCode);
3!
1248
  if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
3!
1249
    mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
×
1250
          syncRsp.arbToken);
1251
    terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1252
    goto _OVER;
×
1253
  }
1254

1255
  bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
3✔
1256
  if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
3!
1257
                               syncRsp.errCode)) != 0) {
1258
    mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
×
1259
    goto _OVER;
×
1260
  }
1261

1262
  code = 0;
3✔
1263

1264
_OVER:
3✔
1265
  tFreeSVArbCheckSyncRsp(&syncRsp);
3✔
1266
  TAOS_RETURN(code);
3✔
1267
}
1268

1269
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
3✔
1270
                                          SArbGroup *pNewGroup) {
1271
  bool updateAssigned = false;
3✔
1272

1273
  (void)taosThreadMutexLock(&pGroup->mutex);
3✔
1274
  if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
3✔
1275
    mInfo("skip update arb assigned for vgId:%d, member token mismatch, local:[%s] msg:[%s]", vgId,
1!
1276
          pGroup->assignedLeader.token, memberToken);
1277
    goto _OVER;
1✔
1278
  }
1279

1280
  if (errcode != TSDB_CODE_SUCCESS) {
2✔
1281
    mInfo("skip update arb assigned for vgId:%d, since:%s", vgId, tstrerror(errcode));
1!
1282
    goto _OVER;
1✔
1283
  }
1284

1285
  if (pGroup->assignedLeader.acked == false) {
1!
1286
    mndArbGroupDupObj(pGroup, pNewGroup);
1✔
1287
    pNewGroup->isSync = false;
1✔
1288
    pNewGroup->assignedLeader.acked = true;
1✔
1289

1290
    mInfo("vgId:%d, arb received assigned ack", vgId);
1!
1291
    updateAssigned = true;
1✔
1292
    goto _OVER;
1✔
1293
  }
1294

1295
_OVER:
×
1296
  (void)taosThreadMutexUnlock(&pGroup->mutex);
3✔
1297
  return updateAssigned;
3✔
1298
}
1299

1300
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
×
1301
  if (pRsp->contLen == 0) {
×
1302
    mDebug("arb set-assigned-rsp contLen is 0");
×
1303
    return 0;
×
1304
  }
1305

1306
  int32_t code = -1;
×
1307

1308
  SMnode *pMnode = pRsp->info.node;
×
1309
  SSdb   *pSdb = pMnode->pSdb;
×
1310

1311
  char arbToken[TSDB_ARB_TOKEN_SIZE];
1312
  if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
×
1313
    mError("failed to get arb token for arb-set-assigned response");
×
1314
    TAOS_RETURN(code);
×
1315
  }
1316

1317
  SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
×
1318
  if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
×
1319
    mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
×
1320
    TAOS_RETURN(code);
×
1321
  }
1322

1323
  if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
×
1324
    mInfo("skip update arb assigned for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
×
1325
          arbToken, setAssignedRsp.arbToken);
1326
    code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
1327
    goto _OVER;
×
1328
  }
1329

1330
  SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
×
1331
  if (!pGroup) {
×
1332
    mError("failed to set arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
×
1333
    code = -1;
×
1334
    if (terrno != 0) code = terrno;
×
1335
    goto _OVER;
×
1336
  }
1337

1338
  SArbGroup newGroup = {0};
×
1339
  bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
×
1340
                                                             pRsp->code, &newGroup);
1341
  if (updateAssigned) {
×
1342
    if ((code = mndUpdateArbGroup(pMnode, &newGroup)) != 0) {
×
1343
      mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
×
1344
      goto _OVER;
×
1345
    }
1346
  }
1347

1348
  code = 0;
×
1349

1350
_OVER:
×
1351
  tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
×
1352
  return code;
×
1353
}
1354

1355
static char *formatTimestamp(char *buf, int64_t val, int precision) {
×
1356
  time_t tt;
1357
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1358
    tt = (time_t)(val / 1000000);
×
1359
  }
1360
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
1361
    tt = (time_t)(val / 1000000000);
×
1362
  } else {
1363
    tt = (time_t)(val / 1000);
×
1364
  }
1365

1366
  struct tm tm;
1367
  if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
×
1368
    mError("failed to get local time");
×
1369
    return NULL;
×
1370
  }
1371
  size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
×
1372

1373
  if (precision == TSDB_TIME_PRECISION_MICRO) {
×
1374
    sprintf(buf + pos, ".%06d", (int)(val % 1000000));
×
1375
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
1376
    sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
×
1377
  } else {
1378
    sprintf(buf + pos, ".%03d", (int)(val % 1000));
×
1379
  }
1380

1381
  return buf;
×
1382
}
1383

1384
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1385
  SMnode    *pMnode = pReq->info.node;
×
1386
  SSdb      *pSdb = pMnode->pSdb;
×
1387
  int32_t    numOfRows = 0;
×
1388
  int32_t    cols = 0;
×
1389
  SArbGroup *pGroup = NULL;
×
1390
  int32_t    code = 0;
×
1391
  int32_t    lino = 0;
×
1392

1393
  while (numOfRows < rows) {
×
1394
    pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
×
1395
    if (pShow->pIter == NULL) break;
×
1396

1397
    (void)taosThreadMutexLock(&pGroup->mutex);
×
1398

1399
    cols = 0;
×
1400
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1401
    SVgObj          *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
×
1402
    if (!pVgObj) {
×
1403
      (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1404
      sdbRelease(pSdb, pGroup);
×
1405
      continue;
×
1406
    }
1407
    char dbNameInGroup[TSDB_DB_FNAME_LEN];
1408
    tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
×
1409
    sdbRelease(pSdb, pVgObj);
×
1410

1411
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1412
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(dbNameInGroup), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1413
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pGroup, &lino, _OVER);
×
1414

1415
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1416
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false), pGroup, &lino, _OVER);
×
1417

1418
    for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
×
1419
      SArbGroupMember *pMember = &pGroup->members[i];
×
1420
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1421
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false), pGroup,
×
1422
                          &lino, _OVER);
1423
    }
1424

1425
    mInfo("vgId:%d, arb group sync:%d, code:%s, update time:%" PRId64, pGroup->vgId, pGroup->isSync,
×
1426
          tstrerror(pGroup->code), pGroup->updateTimeMs);
1427

1428
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1429
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
×
1430

1431
    char strCheckSyncCode[100] = {0};
×
1432
    char bufUpdateTime[40] = {0};
×
1433
    (void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
×
1434
    (void)tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
×
1435

1436
    char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
×
1437
    STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);
×
1438
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1439
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)checkSyncCode, false), pGroup, &lino, _OVER);
×
1440

1441
    if (pGroup->assignedLeader.dnodeId != 0) {
×
1442
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1443
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false),
×
1444
                          pGroup, &lino, _OVER);
1445

1446
      char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
×
1447
      STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
×
1448
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1449
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)token, false), pGroup, &lino, _OVER);
×
1450

1451
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1452
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false),
×
1453
                          pGroup, &lino, _OVER);
1454
    } else {
1455
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1456
      colDataSetNULL(pColInfo, numOfRows);
×
1457

1458
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1459
      colDataSetNULL(pColInfo, numOfRows);
×
1460

1461
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1462
      colDataSetNULL(pColInfo, numOfRows);
×
1463
    }
1464

1465
    (void)taosThreadMutexUnlock(&pGroup->mutex);
×
1466

1467
    numOfRows++;
×
1468
    sdbRelease(pSdb, pGroup);
×
1469
  }
1470

1471
_OVER:
×
1472
  if (code != 0) mError("failed to restrieve arb group at line:%d, since %s", lino, tstrerror(code));
×
1473
  pShow->numOfRows += numOfRows;
×
1474

1475
  return numOfRows;
×
1476
}
1477

1478
static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter) {
×
1479
  SSdb *pSdb = pMnode->pSdb;
×
1480
  sdbCancelFetchByType(pSdb, pIter, SDB_ARBGROUP);
×
1481
}
×
1482

1483
int32_t mndGetArbGroupSize(SMnode *pMnode) {
×
1484
  SSdb *pSdb = pMnode->pSdb;
×
1485
  return sdbGetSize(pSdb, SDB_ARBGROUP);
×
1486
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc