• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.14
/source/libs/stream/src/streamHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "streamInt.h"
18
#include "tmisce.h"
19
#include "tref.h"
20
#include "tstream.h"
21
#include "ttimer.h"
22
#include "wal.h"
23

24
int32_t streamMetaRefPool = 0;
25

26
struct SMetaHbInfo {
27
  tmr_h        hbTmr;
28
  int32_t      tickCounter;
29
  int32_t      hbCount;
30
  int64_t      hbStart;
31
  int64_t      msgSendTs;
32
  SStreamHbMsg hbMsg;
33
};
34

35
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
27,048✔
36
  if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) {  // reset the counter
27,048✔
37
    pInfo->tickCounter = 0;
983✔
38
    return true;
983✔
39
  }
40
  return false;
26,065✔
41
}
42

43
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
2✔
44
  int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
2✔
45
  for (int32_t k = 0; k < numOfExisted; ++k) {
2✔
46
    if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) {
×
47
      return true;
×
48
    }
49
  }
50
  return false;
2✔
51
}
52

53
static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
131✔
54
  SStreamMeta* pMeta = pTask->pMeta;
131✔
55

56
  streamMutexLock(&pTask->lock);
131✔
57

58
  int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
131✔
59
  for (int32_t j = 0; j < num; ++j) {
133✔
60
    SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j);
2✔
61

62
    bool exist = existInHbMsg(pMsg, pTaskEpset);
2✔
63
    if (!exist) {
2✔
64
      void* p = taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
2✔
65
      if (p == NULL) {
2✔
66
        stError("failed to set the updateNode info in hbMsg, vgId:%d", pMeta->vgId);
×
67
      }
68

69
      stDebug("vgId:%d nodeId:%d added into hbMsg update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
2✔
70
              (int32_t)taosArrayGetSize(pMsg->pUpdateNodes));
71
    }
72
  }
73

74
  taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList);
131✔
75
  streamMutexUnlock(&pTask->lock);
131✔
76
}
131✔
77

78
static void setProcessProgress(SStreamTask* pTask, STaskStatusEntry* pEntry) {
131✔
79
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
131✔
80
    return;
68✔
81
  }
82

83
  if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
63✔
84
    pEntry->processedVer = pTask->status.latestForceWindow.skey;
×
85
  } else {
86
    if (pTask->exec.pWalReader != NULL) {
63✔
87
      pEntry->processedVer = walReaderGetCurrentVer(pTask->exec.pWalReader) - 1;
63✔
88
      if (pEntry->processedVer < 0) {
63✔
89
        pEntry->processedVer = pTask->chkInfo.processedVer;
10✔
90
      }
91

92
      walReaderValidVersionRange(pTask->exec.pWalReader, &pEntry->verRange.minVer, &pEntry->verRange.maxVer);
63✔
93
    }
94
  }
95
}
96

97
static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) {
77✔
98
  int32_t code = 0;
77✔
99
  int32_t tlen = 0;
77✔
100

101
  tEncodeSize(tEncodeStreamHbMsg, pMsg, tlen, code);
77✔
102
  if (code < 0) {
77✔
103
    stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
×
104
    return TSDB_CODE_FAILED;
×
105
  }
106

107
  void* buf = rpcMallocCont(tlen);
77✔
108
  if (buf == NULL) {
77✔
109
    stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
110
    return TSDB_CODE_FAILED;
×
111
  }
112

113
  SEncoder encoder;
114
  tEncoderInit(&encoder, buf, tlen);
77✔
115
  if ((code = tEncodeStreamHbMsg(&encoder, pMsg)) < 0) {
77✔
116
    rpcFreeCont(buf);
×
117
    tEncoderClear(&encoder);
×
118
    stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
×
119
    return TSDB_CODE_FAILED;
×
120
  }
121
  tEncoderClear(&encoder);
77✔
122

123
  stDebug("vgId:%d send hb to mnode, numOfTasks:%d msgId:%d", pMeta->vgId, pMsg->numOfTasks, pMsg->msgId);
77✔
124

125
  SRpcMsg msg = {0};
77✔
126
  initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
77✔
127
  return tmsgSendReq(pEpset, &msg);
77✔
128
}
129

130
static int32_t streamTaskGetMndEpset(SStreamMeta* pMeta, SEpSet* pEpSet) {
72✔
131
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
72✔
132
  for (int32_t i = 0; i < numOfTasks; ++i) {
74✔
133
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
74✔
134
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
74✔
135
    SStreamTask*   pTask = NULL;
74✔
136

137
    int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
74✔
138
    if (code != 0) {
74✔
139
      continue;
2✔
140
    }
141

142
    if (pTask->info.fillHistory == 1) {
74✔
143
      streamMetaReleaseTask(pMeta, pTask);
2✔
144
      continue;
2✔
145
    }
146

147
    epsetAssign(pEpSet, &pTask->info.mnodeEpset);
72✔
148
    streamMetaReleaseTask(pMeta, pTask);
72✔
149
    return TSDB_CODE_SUCCESS;
72✔
150
  }
151

152
  return TSDB_CODE_FAILED;
×
153
}
154

155
static void streamTaskUpdateMndEpset(SStreamMeta* pMeta, SEpSet* pEpSet) {
×
156
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
157

158
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
159
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
×
160
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
×
161
    SStreamTask*   pTask = NULL;
×
162

163
    int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
×
164
    if (code != 0) {
×
165
      stError("vgId:%d s-task:0x%x failed to acquire it for updating mnode epset, code:%s", pMeta->vgId, pId->taskId,
×
166
              tstrerror(code));
167
      continue;
×
168
    }
169

170
    // ignore this error since it is only for log file
171
    char    buf[256] = {0};
×
172
    int32_t ret = epsetToStr(&pTask->info.mnodeEpset, buf, tListLen(buf));
×
173
    if (ret != 0) {  // print error and continue
×
174
      stError("failed to convert epset to str, code:%s", tstrerror(ret));
×
175
    }
176

177
    char newBuf[256] = {0};
×
178
    ret = epsetToStr(pEpSet, newBuf, tListLen(newBuf));
×
179
    if (ret != 0) {
×
180
      stError("failed to convert epset to str, code:%s", tstrerror(ret));
×
181
    }
182

183
    epsetAssign(&pTask->info.mnodeEpset, pEpSet);
×
184
    stInfo("s-task:0x%x update mnd epset, from %s to %s", pId->taskId, buf, newBuf);
×
185
    streamMetaReleaseTask(pMeta, pTask);
×
186
  }
187

188
  stDebug("vgId:%d update mnd epset for %d tasks completed", pMeta->vgId, numOfTasks);
×
189
}
×
190

191
// NOTE: this task should be executed within the SStreamMeta lock region.
192
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
983✔
193
  SEpSet       epset = {0};
983✔
194
  bool         hasMnodeEpset = false;
983✔
195
  int32_t      numOfTasks = streamMetaGetNumOfTasks(pMeta);
983✔
196
  SMetaHbInfo* pInfo = pMeta->pHbInfo;
983✔
197
  int32_t      code = 0;
983✔
198
  bool         setReqCheckpointId = false;
983✔
199

200
  // not recv the hb msg rsp yet, send current hb msg again
201
  if (pInfo->msgSendTs > 0) {
983✔
202
    stDebug("vgId:%d hbMsg rsp not recv, send current hbMsg, msgId:%d, total:%d again", pMeta->vgId, pInfo->hbMsg.msgId,
3✔
203
            pInfo->hbCount);
204

205
    code = streamTaskGetMndEpset(pMeta, &epset);
3✔
206
    if (code != 0) {
3✔
207
      stError("vgId:%d failed to get the mnode epset, not retrying sending hbMsg, msgId:%d", pMeta->vgId,
×
208
              pInfo->hbMsg.msgId);
209
      return code;
×
210
    }
211

212
    pInfo->msgSendTs = taosGetTimestampMs();
3✔
213
    return doSendHbMsgInfo(&pInfo->hbMsg, pMeta, &epset);
3✔
214
  }
215

216
  SStreamHbMsg* pMsg = &pInfo->hbMsg;
980✔
217
  pMsg->vgId = pMeta->vgId;
980✔
218
  pMsg->msgId = pMeta->pHbInfo->hbCount;
980✔
219
  pMsg->ts = taosGetTimestampMs();
980✔
220

221
  stDebug("vgId:%d build stream hbMsg, leader:%d HbMsgId:%d, HbMsgTs:%" PRId64, pMeta->vgId,
980✔
222
          (pMeta->role == NODE_ROLE_LEADER), pMsg->msgId, pMsg->ts);
223

224
  pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
980✔
225
  pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
980✔
226

227
  if (pMsg->pTaskStatus == NULL || pMsg->pUpdateNodes == NULL) {
980✔
228
    return terrno;
×
229
  }
230

231
  for (int32_t i = 0; i < numOfTasks; ++i) {
1,123✔
232
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
143✔
233

234
    STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
143✔
235
    SStreamTask* pTask = NULL;
143✔
236
    code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
143✔
237
    if (code != 0) {
143✔
238
      continue;
12✔
239
    }
240

241
    // not report the status of fill-history task
242
    if (pTask->info.fillHistory != STREAM_NORMAL_TASK) {
143✔
243
      streamMetaReleaseTask(pMeta, pTask);
12✔
244
      continue;
12✔
245
    }
246

247
    // todo: this lock may be blocked by lock in streamMetaStartOneTask function, which may lock a very long time when
248
    // trying to load remote checkpoint data
249
    streamMutexLock(&pTask->lock);
131✔
250
    STaskStatusEntry entry = streamTaskGetStatusEntry(pTask);
131✔
251
    streamMutexUnlock(&pTask->lock);
131✔
252

253
    entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
131✔
254
    if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
131✔
255
      entry.sinkQuota = pTask->outputInfo.pTokenBucket->quotaRate;
49✔
256
      entry.sinkDataSize = SIZE_IN_MiB(pTask->execInfo.sink.dataSize);
49✔
257
    }
258

259
    SActiveCheckpointInfo* p = pTask->chkInfo.pActiveInfo;
131✔
260
    if (p->activeId != 0) {
131✔
261
      entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0;
8✔
262
      entry.checkpointInfo.activeId = p->activeId;
8✔
263
      entry.checkpointInfo.activeTransId = p->transId;
8✔
264

265
      if (entry.checkpointInfo.failed) {
8✔
266
        stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d, clear the active checkpointInfo",
×
267
               pTask->id.idStr, p->transId);
268

269
        streamMutexLock(&pTask->lock);
×
270
        streamTaskClearCheckInfo(pTask, true);
×
271
        streamMutexUnlock(&pTask->lock);
×
272
      }
273
    }
274

275
    streamMutexLock(&pTask->lock);
131✔
276
    entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(pTask, pMsg->ts);
131✔
277
    if (entry.checkpointInfo.consensusChkptId) {
131✔
278
      entry.checkpointInfo.consensusTs = pTask->status.consenChkptInfo.statusTs;
8✔
279
      setReqCheckpointId = true;
8✔
280
    }
281
    streamMutexUnlock(&pTask->lock);
131✔
282

283
    setProcessProgress(pTask, &entry);
131✔
284
    addUpdateNodeIntoHbMsg(pTask, pMsg);
131✔
285

286
    p = taosArrayPush(pMsg->pTaskStatus, &entry);
131✔
287
    if (p == NULL) {
131✔
288
      stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", pTask->id.taskId, pMeta->vgId);
×
289
    }
290

291
    if (!hasMnodeEpset) {
131✔
292
      epsetAssign(&epset, &pTask->info.mnodeEpset);
74✔
293
      hasMnodeEpset = true;
74✔
294
    }
295

296
    streamMetaReleaseTask(pMeta, pTask);
131✔
297
  }
298

299
  if (setReqCheckpointId) {
980✔
300
    if (pMeta->startInfo.curStage != START_MARK_REQ_CHKPID) {
5✔
301
      stError("vgId:%d internal unknown error, current stage is:%d expected:%d", pMeta->vgId, pMeta->startInfo.curStage,
×
302
              START_MARK_REQ_CHKPID);
303
    }
304

305
    pMeta->startInfo.curStage = START_WAIT_FOR_CHKPTID;
5✔
306
    SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = pMsg->ts};
5✔
307
    taosArrayPush(pMeta->startInfo.pStagesList, &info);
5✔
308

309
    stDebug("vgId:%d mark_req stage -> wait_for_chkptId stage, reqTs:%" PRId64 " , numOfStageHist:%d", pMeta->vgId,
5✔
310
            info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
311
  }
312

313
  pMsg->numOfTasks = taosArrayGetSize(pMsg->pTaskStatus);
980✔
314

315
  if (hasMnodeEpset) {
980✔
316
    pInfo->msgSendTs = taosGetTimestampMs();
74✔
317
    code = doSendHbMsgInfo(pMsg, pMeta, &epset);
74✔
318
  } else {
319
    stDebug("vgId:%d no tasks or no mnd epset, not send stream hb to mnode", pMeta->vgId);
906✔
320
    tCleanupStreamHbMsg(&pInfo->hbMsg);
906✔
321
    pInfo->msgSendTs = -1;
906✔
322
  }
323

324
  return code;
980✔
325
}
326

327
void streamMetaHbToMnode(void* param, void* tmrId) {
27,405✔
328
  int64_t rid = *(int64_t*)param;
27,405✔
329
  int32_t code = 0;
27,405✔
330
  int32_t vgId = 0;
27,405✔
331
  int32_t role = 0;
27,405✔
332

333
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, rid);
27,405✔
334
  if (pMeta == NULL) {
27,405✔
335
    stError("invalid meta rid:%" PRId64 " failed to acquired stream-meta", rid);
×
336
    return;
×
337
  }
338

339
  vgId = pMeta->vgId;
27,405✔
340
  role = pMeta->role;
27,405✔
341

342
  // need to stop, stop now
343
  if (pMeta->closeFlag) {
27,405✔
344
    pMeta->pHbInfo->hbStart = 0;
231✔
345
    code = taosReleaseRef(streamMetaRefPool, rid);
231✔
346
    if (code == TSDB_CODE_SUCCESS) {
231✔
347
      stInfo("vgId:%d jump out of meta timer since closed", vgId);
231✔
348
    } else {
349
      stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
×
350
    }
351
    return;
231✔
352
  }
353

354
  // not leader not send msg
355
  if (pMeta->role != NODE_ROLE_LEADER) {
27,174✔
356
    pMeta->pHbInfo->hbStart = 0;
126✔
357
    code = taosReleaseRef(streamMetaRefPool, rid);
126✔
358
    if (code == TSDB_CODE_SUCCESS) {
126✔
359
      stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role);
126✔
360
    } else {
361
      stError("vgId:%d role:%d not leader not send hb to mnode, failed to release meta rid:%" PRId64, vgId, role, rid);
×
362
    }
363
    return;
126✔
364
  }
365

366
  if (!waitForEnoughDuration(pMeta->pHbInfo)) {
27,048✔
367
    streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
26,065✔
368
                   "meta-hb-tmr");
369

370
    code = taosReleaseRef(streamMetaRefPool, rid);
26,065✔
371
    if (code) {
26,065✔
372
      stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
×
373
    }
374
    return;
26,065✔
375
  }
376

377
  // set the hb start time
378
  if (pMeta->pHbInfo->hbStart == 0) {
983✔
379
    pMeta->pHbInfo->hbStart = taosGetTimestampMs();
304✔
380
  }
381

382
  // NOTE: stream task in restart procedure. not generate the hb now, try to acquire the lock may cause stuck this timer.
383
  int32_t count = 30;
983✔
384
  bool    send = false;
983✔
385
  while ((--count) >= 0) {
983✔
386
    int32_t ret = streamMetaTryRlock(pMeta);
983✔
387
    if (ret != 0) {
983✔
388
      taosMsleep(10);
×
389
    } else {
390
      send = true;
983✔
391
      code = streamMetaSendHbHelper(pMeta);
983✔
392
      streamMetaRUnLock(pMeta);
983✔
393
      break;
983✔
394
    }
395
  }
396

397
  if (!send) {
983✔
398
    stError("vgId:%d failed to send hbMsg to mnode due to acquire lock failure, retry again in 5s", pMeta->vgId);
×
399
  }
400
  if (code) {
983✔
401
    stError("vgId:%d failed to send hbMsg to mnode, retry in 5, code:%s", pMeta->vgId, tstrerror(code));
5✔
402
  }
403

404
  streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
983✔
405
                 "meta-hb-tmr");
406

407
  code = taosReleaseRef(streamMetaRefPool, rid);
983✔
408
  if (code) {
983✔
409
    stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
×
410
  }
411
}
412

413
int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) {
287✔
414
  *pRes = NULL;
287✔
415
  SMetaHbInfo* pInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
287✔
416
  if (pInfo == NULL) {
287✔
417
    return terrno;
×
418
  }
419

420
  pInfo->tickCounter = 0;
287✔
421
  pInfo->msgSendTs = -1;
287✔
422
  pInfo->hbCount = 0;
287✔
423

424
  *pRes = pInfo;
287✔
425

426
  streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer, &pInfo->hbTmr, 0, "stream-hb");
287✔
427
  return TSDB_CODE_SUCCESS;
287✔
428
}
429

430
void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
278✔
431
  if (pInfo != NULL) {
278✔
432
    tCleanupStreamHbMsg(&pInfo->hbMsg);
278✔
433

434
    if (pInfo->hbTmr != NULL) {
279✔
435
      streamTmrStop(pInfo->hbTmr);
279✔
436
      pInfo->hbTmr = NULL;
279✔
437
    }
438

439
    taosMemoryFree(pInfo);
279✔
440
  }
441
}
279✔
442

443
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
463✔
444
  // wait for the stream meta hb function stopping
445
  if (pMeta->role == NODE_ROLE_LEADER) {
463✔
446
    taosMsleep(3 * META_HB_CHECK_INTERVAL);
387✔
447
    stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
387✔
448
  }
449
}
463✔
450

451
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount) {
463✔
452
  *pStartTs = 0;
463✔
453
  *pSendCount = 0;
463✔
454

455
  if (pInfo == NULL) {
463✔
456
    return;
×
457
  }
458

459
  *pStartTs = pInfo->hbStart;
463✔
460
  *pSendCount = pInfo->hbCount;
463✔
461
}
462

463
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
69✔
464
  SMetaHbInfo* pInfo = pMeta->pHbInfo;
69✔
465
  SEpSet       epset = {0};
69✔
466
  int32_t      code = 0;
69✔
467

468
  stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId);
69✔
469
  streamMetaWLock(pMeta);
69✔
470

471
  // current waiting rsp recved
472
  if (pRsp->msgId == pInfo->hbCount) {
69✔
473
    tCleanupStreamHbMsg(&pInfo->hbMsg);
69✔
474
    stDebug("vgId:%d hbMsg msgId:%d sendTs:%" PRId64 " recved confirmed", pMeta->vgId, pRsp->msgId, pInfo->msgSendTs);
69✔
475

476
    pInfo->hbCount += 1;
69✔
477
    pInfo->msgSendTs = -1;
69✔
478

479
    code = streamTaskGetMndEpset(pMeta, &epset);
69✔
480
    if (!isEpsetEqual(&pRsp->mndEpset, &epset) && (code == 0)) {
69✔
481
      // we need to update the mnode epset for each tasks
482
      stInfo("vgId:%d mnode epset updated, update mnode epset for all tasks", pMeta->vgId);
×
483
      streamTaskUpdateMndEpset(pMeta, &pRsp->mndEpset);
×
484
    }
485
  } else {
486
    stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId);
×
487
  }
488

489
  streamMetaWUnLock(pMeta);
69✔
490
  return TSDB_CODE_SUCCESS;
69✔
491
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc