• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.66
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
struct SStreamTaskIter {
25
  SStreamObj  *pStream;
26
  int32_t      level;
27
  int32_t      ordinalIndex;
28
  int32_t      totalLevel;
29
  SStreamTask *pTask;
30
};
31

32
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
33

34
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter) {
65,539✔
35
  *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
65,539!
36
  if (*pIter == NULL) {
65,538!
37
    return terrno;
×
38
  }
39

40
  (*pIter)->level = -1;
65,538✔
41
  (*pIter)->ordinalIndex = 0;
65,538✔
42
  (*pIter)->pStream = pStream;
65,538✔
43
  (*pIter)->totalLevel = taosArrayGetSize(pStream->tasks);
65,538✔
44
  (*pIter)->pTask = NULL;
65,539✔
45

46
  return 0;
65,539✔
47
}
48

49
bool streamTaskIterNextTask(SStreamTaskIter *pIter) {
302,484✔
50
  if (pIter->level >= pIter->totalLevel) {
302,484!
51
    pIter->pTask = NULL;
×
52
    return false;
×
53
  }
54

55
  if (pIter->level == -1) {
302,484✔
56
    pIter->level += 1;
65,537✔
57
  }
58

59
  while (pIter->level < pIter->totalLevel) {
431,426✔
60
    SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
366,192✔
61
    if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
366,067✔
62
      pIter->level += 1;
128,942✔
63
      pIter->ordinalIndex = 0;
128,942✔
64
      pIter->pTask = NULL;
128,942✔
65
      continue;
128,942✔
66
    }
67

68
    pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
237,859✔
69
    pIter->ordinalIndex += 1;
237,772✔
70
    return true;
237,772✔
71
  }
72

73
  pIter->pTask = NULL;
65,234✔
74
  return false;
65,234✔
75
}
76

77
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask) {
237,796✔
78
  if (pTask) {
237,796!
79
    *pTask = pIter->pTask;
237,819✔
80
    if (*pTask != NULL) {
237,819!
81
      return TSDB_CODE_SUCCESS;
237,824✔
82
    }
83
  }
84

85
  return TSDB_CODE_INVALID_PARA;
×
86
}
87

88
void destroyStreamTaskIter(SStreamTaskIter *pIter) { taosMemoryFree(pIter); }
65,333!
89

90
static bool checkStatusForEachReplica(SVgObj *pVgroup) {
36,761✔
91
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
76,900✔
92
    if (!pVgroup->vnodeGid[i].syncRestore) {
41,434✔
93
      mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
1,289!
94
      return false;
1,289✔
95
    }
96

97
    ESyncState state = pVgroup->vnodeGid[i].syncState;
40,145✔
98
    if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
40,145!
99
        state == TAOS_SYNC_STATE_CANDIDATE) {
100
      mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
6!
101
            state);
102
      return false;
6✔
103
    }
104
  }
105

106
  return true;
35,466✔
107
}
108

109
static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) {
9,571✔
110
  SSnodeObj *pObj = NULL;
9,571✔
111
  void      *pIter = NULL;
9,571✔
112
  int32_t    code = 0;
9,571✔
113

114
  while (1) {
2,441✔
115
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
12,012✔
116
    if (pIter == NULL) {
12,012✔
117
      break;
9,571✔
118
    }
119

120
    SNodeEntry entry = {.nodeId = SNODE_HANDLE};
2,441✔
121
    code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
2,441✔
122
    if (code) {
2,441!
123
      sdbRelease(pMnode->pSdb, pObj);
×
UNCOV
124
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
125
      mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
×
UNCOV
126
      return code;
×
127
    }
128

129
    char buf[256] = {0};
2,441✔
130
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
2,441✔
131
    if (code != 0) {  // print error and continue
2,441!
UNCOV
132
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
133
    }
134

135
    void *p = taosArrayPush(pVgroupList, &entry);
2,441✔
136
    if (p == NULL) {
2,441!
UNCOV
137
      code = terrno;
×
UNCOV
138
      sdbRelease(pMnode->pSdb, pObj);
×
UNCOV
139
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
140
      mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
×
UNCOV
141
      return code;
×
142
    } else {
143
      mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
2,441✔
144
    }
145

146
    sdbRelease(pMnode->pSdb, pObj);
2,441✔
147
  }
148

149
  return code;
9,571✔
150
}
151

152
static int32_t mndCheckMnodeStatus(SMnode* pMnode) {
9,571✔
153
  int32_t    code = 0;
9,571✔
154
  ESdbStatus objStatus;
155
  void      *pIter = NULL;
9,571✔
156
  SMnodeObj *pObj = NULL;
9,571✔
157

158
  while (1) {
159
    pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true);
19,588✔
160
    if (pIter == NULL) {
19,588✔
161
      break;
9,466✔
162
    }
163

164
    if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) {
10,122✔
165
      mDebug("mnode sync state:%d not leader/follower", pObj->syncState);
104!
166
      sdbRelease(pMnode->pSdb, pObj);
104✔
167
      sdbCancelFetch(pMnode->pSdb, pIter);
104✔
168
      return TSDB_CODE_FAILED;
104✔
169
    }
170

171
    if (objStatus != SDB_STATUS_READY) {
10,018✔
172
      mWarn("mnode status:%d not ready", objStatus);
1!
173
      sdbRelease(pMnode->pSdb, pObj);
1✔
174
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
175
      return TSDB_CODE_FAILED;
1✔
176
    }
177

178
    sdbRelease(pMnode->pSdb, pObj);
10,017✔
179
  }
180

181
  return TSDB_CODE_SUCCESS;
9,466✔
182
}
183

184
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) {
9,571✔
185
  SSdb     *pSdb = pMnode->pSdb;
9,571✔
186
  void     *pIter = NULL;
9,571✔
187
  SVgObj   *pVgroup = NULL;
9,571✔
188
  int32_t   code = 0;
9,571✔
189
  SHashObj *pHash = NULL;
9,571✔
190

191
  pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
9,571✔
192
  if (pHash == NULL) {
9,571!
UNCOV
193
    mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
×
UNCOV
194
    return terrno;
×
195
  }
196

197
  while (1) {
43,603✔
198
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
53,174✔
199
    if (pIter == NULL) {
53,174✔
200
      break;
9,571✔
201
    }
202

203
    SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
43,603✔
204
    entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
43,603✔
205

206
    int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
43,603✔
207
    if (pReplica == NULL) {  // not exist, add it into hash map
43,603✔
208
      code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
17,014✔
209
      if (code) {
17,014!
UNCOV
210
        mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
×
211
        sdbRelease(pSdb, pVgroup);
×
212
        sdbCancelFetch(pSdb, pIter);
×
213
        goto _end;  // take snapshot failed, and not all ready
×
214
      }
215
    } else {
216
      if (*pReplica != pVgroup->replica) {
26,589✔
217
        mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
582!
218
              pVgroup->vgId, pVgroup->replica, *pReplica);
219
        *allReady = false;  // task snap success, but not all ready
582✔
220
      }
221
    }
222

223
    // if not all ready till now, no need to check the remaining vgroups,
224
    // but still we need to put the info of the existed vgroups into the snapshot list
225
    if (*allReady) {
43,603✔
226
      *allReady = checkStatusForEachReplica(pVgroup);
36,761✔
227
    }
228

229
    char buf[256] = {0};
43,603✔
230
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
43,603✔
231
    if (code != 0) {  // print error and continue
43,603!
232
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
233
    }
234

235
    void *p = taosArrayPush(pVgroupList, &entry);
43,603✔
236
    if (p == NULL) {
43,603!
UNCOV
237
      mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
×
UNCOV
238
      code = terrno;
×
UNCOV
239
      sdbRelease(pSdb, pVgroup);
×
UNCOV
240
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
241
      goto _end;
×
242
    } else {
243
      mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
43,603✔
244
    }
245

246
    sdbRelease(pSdb, pVgroup);
43,603✔
247
  }
248

249
_end:
9,571✔
250
  taosHashCleanup(pHash);
9,571✔
251
  return code;
9,571✔
252
}
253

254
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
9,571✔
255
  int32_t   code = 0;
9,571✔
256
  SArray   *pVgroupList = NULL;
9,571✔
257

258
  *pList = NULL;
9,571✔
259
  *allReady = true;
9,571✔
260

261
  pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
9,571✔
262
  if (pVgroupList == NULL) {
9,571!
263
    mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
×
UNCOV
264
    code = terrno;
×
265
    goto _err;
×
266
  }
267

268
  // 1. check for all vnodes status
269
  code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady);
9,571✔
270
  if (code) {
9,571!
UNCOV
271
    goto _err;
×
272
  }
273

274
  // 2. add snode info
275
  code = mndAddSnodeInfo(pMnode, pVgroupList);
9,571✔
276
  if (code) {
9,571!
UNCOV
277
    goto _err;
×
278
  }
279

280
  // 3. check for mnode status
281
  code = mndCheckMnodeStatus(pMnode);
9,571✔
282
  if (code != TSDB_CODE_SUCCESS) {
9,571✔
283
    *allReady = false;
105✔
284
  }
285

286
  *pList = pVgroupList;
9,571✔
287
  return code;
9,571✔
288

289
_err:
×
UNCOV
290
  *allReady = false;
×
UNCOV
291
  taosArrayDestroy(pVgroupList);
×
UNCOV
292
  return code;
×
293
}
294

295
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
9,241✔
296
  void *pIter = NULL;
9,241✔
297
  SSdb *pSdb = pMnode->pSdb;
9,241✔
298
  *pStream = NULL;
9,241✔
299

300
  SStreamObj *p = NULL;
9,241✔
301
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
13,986✔
302
    if (p->uid == streamId) {
13,982✔
303
      sdbCancelFetch(pSdb, pIter);
9,237✔
304
      *pStream = p;
9,237✔
305
      return TSDB_CODE_SUCCESS;
9,237✔
306
    }
307
    sdbRelease(pSdb, p);
4,745✔
308
  }
309

310
  return TSDB_CODE_STREAM_TASK_NOT_EXIST;
4✔
311
}
312

UNCOV
313
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
×
314
  STrans *pTrans = mndAcquireTrans(pMnode, transId);
×
315
  if (pTrans != NULL) {
×
UNCOV
316
    mInfo("kill active transId:%d in Db:%s", transId, pDbName);
×
UNCOV
317
    int32_t code = mndKillTrans(pMnode, pTrans);
×
UNCOV
318
    mndReleaseTrans(pMnode, pTrans);
×
UNCOV
319
    if (code) {
×
UNCOV
320
      mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
×
321
    }
322
  } else {
UNCOV
323
    mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
×
324
  }
UNCOV
325
}
×
326

327
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
15,323✔
328
  *hasEpset = false;
15,323✔
329

330
  pEpSet->numOfEps = 0;
15,323✔
331
  if (nodeId == SNODE_HANDLE) {
15,323✔
332
    SSnodeObj *pObj = NULL;
118✔
333
    void      *pIter = NULL;
118✔
334

335
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
118✔
336
    if (pIter != NULL) {
118!
337
      int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
118✔
338
      sdbRelease(pMnode->pSdb, pObj);
118✔
339
      sdbCancelFetch(pMnode->pSdb, pIter);
118✔
340
      if (code) {
118!
UNCOV
341
        *hasEpset = false;
×
UNCOV
342
        mError("failed to set epset");
×
343
      } else {
344
        *hasEpset = true;
118✔
345
      }
346
      return code;
118✔
347
    } else {
UNCOV
348
      mError("failed to acquire snode epset");
×
349
      return TSDB_CODE_INVALID_PARA;
×
350
    }
351
  } else {
352
    SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
15,205✔
353
    if (pVgObj != NULL) {
15,205✔
354
      SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
15,204✔
355
      mndReleaseVgroup(pMnode, pVgObj);
15,204✔
356

357
      epsetAssign(pEpSet, &epset);
15,204✔
358
      *hasEpset = true;
15,204✔
359
      return TSDB_CODE_SUCCESS;
15,204✔
360
    } else {
361
      mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
1!
362
      return TSDB_CODE_SUCCESS;
1✔
363
    }
364
  }
365
}
366

367
int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
107✔
368
  *pTask = NULL;
107✔
369

370
  SStreamTask     *p = NULL;
107✔
371
  SStreamTaskIter *pIter = NULL;
107✔
372
  int32_t          code = createStreamTaskIter(pStream, &pIter);
107✔
373
  if (code) {
107!
UNCOV
374
    mError("failed to create stream task iter:%s", pStream->name);
×
UNCOV
375
    return code;
×
376
  }
377

378
  while (streamTaskIterNextTask(pIter)) {
364!
379
    code = streamTaskIterGetCurrent(pIter, &p);
364✔
380
    if (code) {
364!
UNCOV
381
      continue;
×
382
    }
383

384
    if (p->id.taskId == pId->taskId) {
364✔
385
      destroyStreamTaskIter(pIter);
107✔
386
      *pTask = p;
107✔
387
      return 0;
107✔
388
    }
389
  }
390

UNCOV
391
  destroyStreamTaskIter(pIter);
×
UNCOV
392
  return TSDB_CODE_FAILED;
×
393
}
394

395
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
67,476✔
396
  int32_t num = 0;
67,476✔
397
  for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
201,903✔
398
    SArray *pLevel = taosArrayGetP(pStream->tasks, i);
134,425✔
399
    num += taosArrayGetSize(pLevel);
134,424✔
400
  }
401

402
  return num;
67,473✔
403
}
404

405
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
62✔
406
  SSdb   *pSdb = pMnode->pSdb;
62✔
407
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
62✔
408
  if (pDb == NULL) {
62!
UNCOV
409
    TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
×
410
  }
411

412
  int32_t numOfStreams = 0;
62✔
413
  void   *pIter = NULL;
62✔
UNCOV
414
  while (1) {
×
415
    SStreamObj *pStream = NULL;
62✔
416
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
62✔
417
    if (pIter == NULL) break;
62!
418

UNCOV
419
    if (pStream->sourceDbUid == pDb->uid) {
×
UNCOV
420
      numOfStreams++;
×
421
    }
422

UNCOV
423
    sdbRelease(pSdb, pStream);
×
424
  }
425

426
  *pNumOfStreams = numOfStreams;
62✔
427
  mndReleaseDb(pMnode, pDb);
62✔
428
  return 0;
62✔
429
}
430

431
static void freeTaskList(void *param) {
1,488✔
432
  SArray **pList = (SArray **)param;
1,488✔
433
  taosArrayDestroy(*pList);
1,488✔
434
}
1,488✔
435

436
int32_t mndInitExecInfo() {
1,517✔
437
  int32_t code = taosThreadMutexInit(&execInfo.lock, NULL);
1,517✔
438
  if (code) {
1,517!
UNCOV
439
    return code;
×
440
  }
441

442
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
1,517✔
443

444
  execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
1,517✔
445
  execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
1,517✔
446
  execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
1,517✔
447
  execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
1,517✔
448
  execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,517✔
449
  execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,517✔
450
  execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
1,517✔
451
  execInfo.pKilledChkptTrans = taosArrayInit(4, sizeof(SStreamTaskResetMsg));
1,517✔
452

453
  if (execInfo.pTaskList == NULL || execInfo.pTaskMap == NULL || execInfo.transMgmt.pDBTrans == NULL ||
1,517!
454
      execInfo.pTransferStateStreams == NULL || execInfo.pChkptStreams == NULL || execInfo.pStreamConsensus == NULL ||
1,517!
455
      execInfo.pNodeList == NULL || execInfo.pKilledChkptTrans == NULL) {
1,517!
UNCOV
456
    mError("failed to initialize the stream runtime env, code:%s", tstrerror(terrno));
×
UNCOV
457
    return terrno;
×
458
  }
459

460
  execInfo.role = NODE_ROLE_UNINIT;
1,517✔
461
  execInfo.switchFromFollower = false;
1,517✔
462

463
  taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
1,517✔
464
  taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
1,517✔
465
  taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
1,517✔
466
  return 0;
1,517✔
467
}
468

469
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
520✔
470
  SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
520✔
471
  if (pValidList == NULL) {  // not continue
520!
UNCOV
472
    return;
×
473
  }
474

475
  int32_t size = taosArrayGetSize(pNodeSnapshot);
520✔
476
  int32_t oldSize = taosArrayGetSize(execInfo.pNodeList);
520✔
477

478
  for (int32_t i = 0; i < oldSize; ++i) {
3,709✔
479
    SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
3,189✔
480
    if (p == NULL) {
3,189!
UNCOV
481
      continue;
×
482
    }
483

484
    for (int32_t j = 0; j < size; ++j) {
13,491✔
485
      SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
12,505✔
486
      if (pEntry == NULL) {
12,505!
487
        continue;
×
488
      }
489

490
      if (pEntry->nodeId == p->nodeId) {
12,505✔
491
        p->hbTimestamp = pEntry->hbTimestamp;
2,203✔
492

493
        void *px = taosArrayPush(pValidList, p);
2,203✔
494
        if (px == NULL) {
2,203!
UNCOV
495
          mError("failed to put node into list, nodeId:%d", p->nodeId);
×
496
        } else {
497
          mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
2,203✔
498
        }
499
        break;
2,203✔
500
      }
501
    }
502
  }
503

504
  taosArrayDestroy(execInfo.pNodeList);
520✔
505
  execInfo.pNodeList = pValidList;
520✔
506

507
  mDebug("remain %d valid node entries after clean expired nodes info, prev size:%d",
520✔
508
         (int32_t)taosArrayGetSize(pValidList), oldSize);
509
}
510

511
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
7,042✔
512
  void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
7,042✔
513
  if (p == NULL) {
7,042✔
514
    return TSDB_CODE_SUCCESS;
130✔
515
  }
516

517
  int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
6,912✔
518
  if (code) {
6,912!
UNCOV
519
    return code;
×
520
  }
521

522
  for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
19,924!
523
    STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
19,924✔
524
    if (pId == NULL) {
19,924!
UNCOV
525
      continue;
×
526
    }
527

528
    if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
19,924!
529
      taosArrayRemove(pExecNode->pTaskList, k);
6,912✔
530

531
      int32_t num = taosArrayGetSize(pExecNode->pTaskList);
6,912✔
532
      mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
6,912!
533
      break;
6,912✔
534
    }
535
  }
536

537
  return TSDB_CODE_SUCCESS;
6,912✔
538
}
539

540
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo) {
1,586✔
541
  for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
1,586!
UNCOV
542
    STaskId *pId = taosArrayGet(pTaskIds, i);
×
UNCOV
543
    if (pId == NULL) {
×
UNCOV
544
      continue;
×
545
    }
546

UNCOV
547
    int32_t code = doRemoveTasks(pExecInfo, pId);
×
548
    if (code) {
×
UNCOV
549
      mError("failed to remove task in buffer list, 0x%" PRIx64, pId->taskId);
×
550
    }
551
  }
552
}
1,586✔
553

554
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,307✔
555
  SStreamTaskIter *pIter = NULL;
1,307✔
556
  streamMutexLock(&pExecNode->lock);
1,307✔
557

558
  // 1. remove task entries
559
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,307✔
560
  if (code) {
1,307!
UNCOV
561
    streamMutexUnlock(&pExecNode->lock);
×
562
    mError("failed to create stream task iter:%s", pStream->name);
×
UNCOV
563
    return;
×
564
  }
565

566
  while (streamTaskIterNextTask(pIter)) {
8,349✔
567
    SStreamTask *pTask = NULL;
7,042✔
568
    code = streamTaskIterGetCurrent(pIter, &pTask);
7,042✔
569
    if (code) {
7,042!
UNCOV
570
      continue;
×
571
    }
572

573
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
7,042✔
574
    code = doRemoveTasks(pExecNode, &id);
7,042✔
575
    if (code) {
7,042!
UNCOV
576
      mError("failed to remove task in buffer list, 0x%" PRIx64, id.taskId);
×
577
    }
578
  }
579

580
  if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) {
1,307!
UNCOV
581
    streamMutexUnlock(&pExecNode->lock);
×
UNCOV
582
    destroyStreamTaskIter(pIter);
×
583
    mError("task map size, task list size, not equal");
×
584
    return;
×
585
  }
586

587
  // 2. remove stream entry in consensus hash table and checkpoint-report hash table
588
  code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
1,307✔
589
  if (code) {
1,307!
UNCOV
590
    mError("failed to clear consensus checkpointId, code:%s", tstrerror(code));
×
591
  }
592

593
  code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
1,307✔
594
  if (code) {
1,307✔
595
    mError("failed to clear the checkpoint report info, code:%s", tstrerror(code));
390!
596
  }
597

598
  streamMutexUnlock(&pExecNode->lock);
1,307✔
599
  destroyStreamTaskIter(pIter);
1,307✔
600
}
601

602
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
6,007✔
603
  size_t num = taosArrayGetSize(pList);
6,007✔
604

605
  for (int32_t i = 0; i < num; ++i) {
20,321!
606
    SNodeEntry *pEntry = taosArrayGet(pList, i);
20,321✔
607
    if (pEntry == NULL) {
20,321!
UNCOV
608
      continue;
×
609
    }
610

611
    if (pEntry->nodeId == nodeId) {
20,321✔
612
      return true;
6,007✔
613
    }
614
  }
615

UNCOV
616
  return false;
×
617
}
618

619
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
520✔
620
  SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
520✔
621
  if (pRemovedTasks == NULL) {
520!
UNCOV
622
    return terrno;
×
623
  }
624

625
  int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
520✔
626
  for (int32_t i = 0; i < numOfTask; ++i) {
6,650✔
627
    STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
6,130✔
628
    if (pId == NULL) {
6,130!
629
      continue;
×
630
    }
631

632
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
6,130✔
633
    if (pEntry == NULL) {
6,130!
UNCOV
634
      continue;
×
635
    }
636

637
    if (pEntry->nodeId == SNODE_HANDLE) {
6,130✔
638
      continue;
123✔
639
    }
640

641
    bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
6,007✔
642
    if (!existed) {
6,007!
UNCOV
643
      void *p = taosArrayPush(pRemovedTasks, pId);
×
UNCOV
644
      if (p == NULL) {
×
UNCOV
645
        mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
×
646
      }
647
    }
648
  }
649

650
  removeTasksInBuf(pRemovedTasks, &execInfo);
520✔
651

652
  mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
520✔
653
         (int32_t)taosArrayGetSize(execInfo.pTaskList));
654

655
  removeExpiredNodeInfo(pNodeSnapshot);
520✔
656

657
  taosArrayDestroy(pRemovedTasks);
520✔
658
  return 0;
520✔
659
}
660

661
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
8,061✔
662
  SMnode *pMnode = pReq->info.node;
8,061✔
663
  void   *pIter = NULL;
8,061✔
664
  int32_t code = 0;
8,061✔
665
  SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
8,061✔
666
  if (pDropped == NULL) {
8,061!
667
    return terrno;
×
668
  }
669

670
  mDebug("start to scan checkpoint report info");
8,061✔
671
  streamMutexLock(&execInfo.lock);
8,061✔
672

673
  while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
16,565✔
674
    SChkptReportInfo *px = (SChkptReportInfo *)pIter;
9,043✔
675
    if (taosArrayGetSize(px->pTaskList) == 0) {
9,043✔
676
      continue;
8,494✔
677
    }
678

679
    STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
549✔
680
    if (pInfo == NULL) {
549!
UNCOV
681
      continue;
×
682
    }
683

684
    SStreamObj *pStream = NULL;
549✔
685
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
549✔
686
    if (pStream == NULL || code != 0) {
549!
UNCOV
687
      mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
×
UNCOV
688
      void *p = taosArrayPush(pDropped, &pInfo->streamId);
×
UNCOV
689
      if (p == NULL) {
×
UNCOV
690
        mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
×
691
      }
UNCOV
692
      continue;
×
693
    }
694

695
    int32_t total = mndGetNumOfStreamTasks(pStream);
549✔
696
    int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList);
549✔
697

698
    if (total == existed) {
549✔
699
      mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
539✔
700
             pStream->uid, pStream->name, total);
701

702
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
539✔
703
      if (code == 0) {
539!
704
        code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
539✔
705
        if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {  // remove this entry
539!
706
          taosArrayClear(px->pTaskList);
539✔
707
          px->reportChkpt = pInfo->checkpointId;
539✔
708
          mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId);
539✔
709
        } else {
UNCOV
710
          mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet",
×
711
                 pInfo->streamId);
712
        }
713
        break;
539✔
714
      } else {
UNCOV
715
        mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
×
716
      }
717
    } else {
718
      mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name,
10✔
719
             existed, total, total - existed);
720
    }
721

722
    sdbRelease(pMnode->pSdb, pStream);
10✔
723
  }
724

725
  int32_t size = taosArrayGetSize(pDropped);
8,061✔
726
  if (size > 0) {
8,061!
UNCOV
727
    for (int32_t i = 0; i < size; ++i) {
×
UNCOV
728
      int64_t *pStreamId = (int64_t *)taosArrayGet(pDropped, i);
×
UNCOV
729
      if (pStreamId == NULL) {
×
UNCOV
730
        continue;
×
731
      }
732

733
      code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId));
×
734
      if (code) {
×
UNCOV
735
        mError("failed to remove stream in buf:0x%" PRIx64, *pStreamId);
×
736
      }
737
    }
738

UNCOV
739
    int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
×
UNCOV
740
    mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
×
741
  }
742

743
  streamMutexUnlock(&execInfo.lock);
8,061✔
744

745
  taosArrayDestroy(pDropped);
8,061✔
746
  return TSDB_CODE_SUCCESS;
8,061✔
747
}
748

749
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
107✔
750
                                          int64_t ts) {
751
  char msg[128] = {0};
107✔
752
  snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
107✔
753

754
  STrans *pTrans = NULL;
107✔
755
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
107✔
756
  if (pTrans == NULL || code != 0) {
107!
UNCOV
757
    return terrno;
×
758
  }
759

760
  STaskId      id = {.streamId = pStream->uid, .taskId = taskId};
107✔
761
  SStreamTask *pTask = NULL;
107✔
762
  code = mndGetStreamTask(&id, pStream, &pTask);
107✔
763
  if (code) {
107!
UNCOV
764
    mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);
×
UNCOV
765
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
766
    return code;
×
767
  }
768

769
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
107✔
770
  if (code) {
107!
UNCOV
771
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
772
    return code;
×
773
  }
774

775
  code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts);
107✔
776
  if (code != 0) {
107!
UNCOV
777
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
778
    mndTransDrop(pTrans);
×
UNCOV
779
    return code;
×
780
  }
781

782
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
107✔
783
  if (code) {
107!
UNCOV
784
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
785
    mndTransDrop(pTrans);
×
UNCOV
786
    return code;
×
787
  }
788

789
  code = mndTransPrepare(pMnode, pTrans);
107✔
790
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
107!
UNCOV
791
    mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
×
UNCOV
792
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
793
    mndTransDrop(pTrans);
×
UNCOV
794
    return code;
×
795
  }
796

797
  sdbRelease(pMnode->pSdb, pStream);
107✔
798
  mndTransDrop(pTrans);
107✔
799

800
  return TSDB_CODE_ACTION_IN_PROGRESS;
107✔
801
}
802

803
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
129✔
804
  *pInfo = NULL;
129✔
805

806
  void *px = taosHashGet(pHash, &streamId, sizeof(streamId));
129✔
807
  if (px != NULL) {
129✔
808
    *pInfo = px;
105✔
809
    return 0;
105✔
810
  }
811

812
  SCheckpointConsensusInfo p = {
24✔
813
      .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
24✔
814
      .numOfTasks = numOfTasks,
815
      .streamId = streamId,
816
  };
817

818
  if (p.pTaskList == NULL) {
24!
UNCOV
819
    return terrno;
×
820
  }
821

822
  int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
24✔
823
  if (code == 0) {
24!
824
    void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId));
24✔
825
    *pInfo = pChkptInfo;
24✔
826
  } else {
UNCOV
827
    *pInfo = NULL;
×
828
  }
829

830
  return code;
24✔
831
}
832

833
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
834
// discard the msg may lead to the lost of connections.
835
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) {
129✔
836
  SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
129✔
837
  memcpy(&info.req, pRestoreInfo, sizeof(info.req));
129✔
838

839
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
403✔
840
    SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
292✔
841
    if (p == NULL) {
292!
UNCOV
842
      continue;
×
843
    }
844

845
    if (p->req.taskId == info.req.taskId) {
292✔
846
      mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
18✔
847
             "->%" PRId64 " total existed:%d",
848
             pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs,
849
             (int32_t)taosArrayGetSize(pInfo->pTaskList));
850
      p->req.startTs = info.req.startTs;
18✔
851
      return;
18✔
852
    }
853
  }
854

855
  void *p = taosArrayPush(pInfo->pTaskList, &info);
111✔
856
  if (p == NULL) {
111!
UNCOV
857
    mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
×
858
  } else {
859
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
111✔
860
    mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
111✔
861
           " waiting tasks:%d",
862
           pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
863
  }
864
}
865

866
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) {
23✔
867
  taosArrayDestroy(pInfo->pTaskList);
23✔
868
  pInfo->pTaskList = NULL;
23✔
869
}
23✔
870

871
int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
1,330✔
872
  int32_t code = 0;
1,330✔
873
  int32_t numOfStreams = taosHashGetSize(pHash);
1,330✔
874
  if (numOfStreams == 0) {
1,330✔
875
    return code;
1,307✔
876
  }
877

878
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
23✔
879
  if (code == 0) {
23!
880
    mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
23✔
881
  } else {
UNCOV
882
    mError("failed to remove stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
×
883
  }
884

885
  return code;
23✔
886
}
887

888
int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
1,307✔
889
  int32_t code = 0;
1,307✔
890
  int32_t numOfStreams = taosHashGetSize(pHash);
1,307✔
891
  if (numOfStreams == 0) {
1,307✔
892
    return code;
304✔
893
  }
894

895
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
1,003✔
896
  if (code == 0) {
1,003✔
897
    mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
613✔
898
  } else {
899
    mError("failed to remove stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
390!
900
  }
901

902
  return code;
1,003✔
903
}
904

UNCOV
905
int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId) {
×
UNCOV
906
  SChkptReportInfo *pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
×
UNCOV
907
  if (pInfo != NULL) {
×
UNCOV
908
    taosArrayClear(pInfo->pTaskList);
×
UNCOV
909
    mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
×
910
           pInfo->reportChkpt);
UNCOV
911
    return 0;
×
912
  }
913

UNCOV
914
  return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
915
}
916

917
static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
33,056✔
918
  int8_t status = atomic_load_8(&pStream->status);
33,056✔
919
  if (status == STREAM_STATUS__NORMAL) {
33,071✔
920
    tstrncpy(dst, "ready", MND_STREAM_TRIGGER_NAME_SIZE);
33,062✔
921
  } else if (status == STREAM_STATUS__STOP) {
9!
UNCOV
922
    tstrncpy(dst, "stop", MND_STREAM_TRIGGER_NAME_SIZE);
×
923
  } else if (status == STREAM_STATUS__FAILED) {
9!
UNCOV
924
    tstrncpy(dst, "failed", MND_STREAM_TRIGGER_NAME_SIZE);
×
925
  } else if (status == STREAM_STATUS__RECOVER) {
9!
926
    tstrncpy(dst, "recover", MND_STREAM_TRIGGER_NAME_SIZE);
×
927
  } else if (status == STREAM_STATUS__PAUSE) {
9!
UNCOV
928
    tstrncpy(dst, "paused", MND_STREAM_TRIGGER_NAME_SIZE);
×
929
  }
930
}
33,071✔
931

932
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
33,106✔
933
  int8_t trigger = pStream->conf.trigger;
33,106✔
934
  if (trigger == STREAM_TRIGGER_AT_ONCE) {
33,106✔
935
    tstrncpy(dst, "at once", MND_STREAM_TRIGGER_NAME_SIZE);
11,348✔
936
  } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
21,758✔
937
    tstrncpy(dst, "window close", MND_STREAM_TRIGGER_NAME_SIZE);
10,926✔
938
  } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
10,832!
939
    tstrncpy(dst, "max delay", MND_STREAM_TRIGGER_NAME_SIZE);
10,847✔
UNCOV
940
  } else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
×
941
    tstrncpy(dst, "force window close", MND_STREAM_TRIGGER_NAME_SIZE);
14✔
942
  }
943
}
33,106✔
944

945
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
233,880✔
946
  memset(pBuf, 0, bufLen);
233,880✔
947
  pBuf[2] = '0';
233,880✔
948
  pBuf[3] = 'x';
233,880✔
949

950
  int32_t len = tintToHex(id, &pBuf[4]);
233,880✔
951
  varDataSetLen(pBuf, len + 2);
233,971✔
952
}
233,971✔
953

954
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
33,116✔
955
  int32_t code = 0;
33,116✔
956
  int32_t cols = 0;
33,116✔
957
  int32_t lino = 0;
33,116✔
958

959
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,116✔
960
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
33,116✔
961
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,124✔
962
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,105!
963

964
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
33,105✔
965
  TSDB_CHECK_CODE(code, lino, _end);
33,112!
966

967
  // create time
968
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,112✔
969
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,116!
970
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
33,116✔
971
  TSDB_CHECK_CODE(code, lino, _end);
33,092!
972

973
  // stream id
974
  char buf[128] = {0};
33,092✔
975
  int64ToHexStr(pStream->uid, buf, tListLen(buf));
33,092✔
976
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,107✔
977
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,110!
978
  code = colDataSetVal(pColInfo, numOfRows, buf, false);
33,110✔
979
  TSDB_CHECK_CODE(code, lino, _end);
33,133!
980

981
  // related fill-history stream id
982
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,133✔
983
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,135✔
984
  if (pStream->hTaskUid != 0) {
33,106!
UNCOV
985
    int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
×
UNCOV
986
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
987
  } else {
988
    code = colDataSetVal(pColInfo, numOfRows, buf, true);
33,106✔
989
  }
990
  TSDB_CHECK_CODE(code, lino, _end);
33,118!
991

992
  // related fill-history stream id
993
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
33,118✔
994
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
33,118✔
995
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,118✔
996
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,129!
997
  code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
33,129✔
998
  TSDB_CHECK_CODE(code, lino, _end);
33,072!
999

1000
  char status[20 + VARSTR_HEADER_SIZE] = {0};
33,072✔
1001
  char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
33,072✔
1002
  mndShowStreamStatus(status2, pStream);
33,072✔
1003
  STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
33,063✔
1004
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,063✔
1005
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,105!
1006

1007
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
33,105✔
1008
  TSDB_CHECK_CODE(code, lino, _end);
33,125!
1009

1010
  char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,125✔
1011
  STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
33,125✔
1012
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,116✔
1013
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,118!
1014

1015
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
33,118✔
1016
  TSDB_CHECK_CODE(code, lino, _end);
33,123!
1017

1018
  char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,123✔
1019
  STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
33,123✔
1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,108✔
1021
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,079!
1022

1023
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
33,079✔
1024
  TSDB_CHECK_CODE(code, lino, _end);
33,121!
1025

1026
  if (pStream->targetSTbName[0] == 0) {
33,121✔
1027
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2✔
1028
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2!
1029

1030
    code = colDataSetVal(pColInfo, numOfRows, NULL, true);
2✔
1031
  } else {
1032
    char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,119✔
1033
    STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
33,119✔
1034
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,112✔
1035
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,090!
1036

1037
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
33,090✔
1038
  }
1039
  TSDB_CHECK_CODE(code, lino, _end);
33,130!
1040

1041
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,130✔
1042
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,118!
1043

1044
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
33,118✔
1045
  TSDB_CHECK_CODE(code, lino, _end);
33,124!
1046

1047
  char trigger[20 + VARSTR_HEADER_SIZE] = {0};
33,124✔
1048
  char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
33,124✔
1049
  mndShowStreamTrigger(trigger2, pStream);
33,124✔
1050
  STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
33,089✔
1051
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,089✔
1052
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,130!
1053

1054
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
33,130✔
1055
  TSDB_CHECK_CODE(code, lino, _end);
33,119!
1056

1057
  // sink_quota
1058
  char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
33,119✔
1059
  sinkQuota[0] = '0';
33,119✔
1060
  char dstStr[20] = {0};
33,119✔
1061
  STR_TO_VARSTR(dstStr, sinkQuota)
33,119✔
1062
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,119✔
1063
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,120!
1064

1065
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
33,120✔
1066
  TSDB_CHECK_CODE(code, lino, _end);
33,123!
1067

1068
  // checkpoint interval
1069
  char tmp[20 + VARSTR_HEADER_SIZE] = {0};
33,123✔
1070
  (void)tsnprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%d sec", tsStreamCheckpointInterval);
33,123✔
1071
  varDataSetLen(tmp, strlen(varDataVal(tmp)));
33,069✔
1072

1073
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,069✔
1074
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,078!
1075

1076
  code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
33,078✔
1077
  TSDB_CHECK_CODE(code, lino, _end);
33,133!
1078

1079
  // checkpoint backup type
1080
  char backup[20 + VARSTR_HEADER_SIZE] = {0};
33,133✔
1081
  STR_TO_VARSTR(backup, "none")
33,133✔
1082
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,133✔
1083
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,120!
1084

1085
  code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
33,120✔
1086
  TSDB_CHECK_CODE(code, lino, _end);
33,123!
1087

1088
  // history scan idle
1089
  char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
33,123✔
1090
  tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
33,123✔
1091

1092
  memset(dstStr, 0, tListLen(dstStr));
33,123✔
1093
  STR_TO_VARSTR(dstStr, scanHistoryIdle)
33,123✔
1094
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,123✔
1095
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,118!
1096

1097
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
33,118✔
1098

1099
_end:
33,130✔
1100
  if (code) {
33,130!
UNCOV
1101
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1102
  }
1103
  return code;
33,130✔
1104
}
1105

1106
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows,
199,898✔
1107
                              int32_t precision) {
1108
  SColumnInfoData *pColInfo = NULL;
199,898✔
1109
  int32_t          cols = 0;
199,898✔
1110
  int32_t          code = 0;
199,898✔
1111
  int32_t          lino = 0;
199,898✔
1112

1113
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
199,898✔
1114

1115
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
199,898✔
1116
  if (pe == NULL) {
200,965!
UNCOV
1117
    mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
×
1118
           " no valid status/stage info",
1119
           id.taskId, pStream->name, pStream->uid, pStream->createTime);
UNCOV
1120
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1121
  }
1122

1123
  // stream name
1124
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
200,965✔
1125
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
200,965✔
1126

1127
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,923✔
1128
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,819!
1129

1130
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
200,819✔
1131
  TSDB_CHECK_CODE(code, lino, _end);
200,781!
1132

1133
  // task id
1134
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,781✔
1135
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,695!
1136

1137
  char idstr[128] = {0};
200,695✔
1138
  int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
200,695✔
1139
  code = colDataSetVal(pColInfo, numOfRows, idstr, false);
200,675✔
1140
  TSDB_CHECK_CODE(code, lino, _end);
200,700!
1141

1142
  // node type
1143
  char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
200,700✔
1144
  varDataSetLen(nodeType, 5);
200,700✔
1145
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,700✔
1146
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,650!
1147

1148
  if (pTask->info.nodeId > 0) {
200,766✔
1149
    memcpy(varDataVal(nodeType), "vnode", 5);
183,475✔
1150
  } else {
1151
    memcpy(varDataVal(nodeType), "snode", 5);
17,291✔
1152
  }
1153
  code = colDataSetVal(pColInfo, numOfRows, nodeType, false);
200,766✔
1154
  TSDB_CHECK_CODE(code, lino, _end);
200,688!
1155

1156
  // node id
1157
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,688✔
1158
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,597!
1159

1160
  int64_t nodeId = TMAX(pTask->info.nodeId, 0);
200,597✔
1161
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
200,597✔
1162
  TSDB_CHECK_CODE(code, lino, _end);
200,461!
1163

1164
  // level
1165
  char level[20 + VARSTR_HEADER_SIZE] = {0};
200,461✔
1166
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
200,461✔
1167
    STR_WITH_SIZE_TO_VARSTR(level, "source", 6);
102,289✔
1168
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
98,172✔
1169
    STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
18,719✔
1170
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
79,453!
1171
    STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
79,946✔
1172
  }
1173

1174
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,461✔
1175
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,307!
1176

1177
  code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
200,307✔
1178
  TSDB_CHECK_CODE(code, lino, _end);
200,573!
1179

1180
  // status
1181
  char status[20 + VARSTR_HEADER_SIZE] = {0};
200,573✔
1182

1183
  const char *pStatus = streamTaskGetStatusStr(pe->status);
200,573✔
1184
  STR_TO_VARSTR(status, pStatus);
200,519✔
1185

1186
  // status
1187
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,519✔
1188
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,384!
1189

1190
  code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
200,384✔
1191
  TSDB_CHECK_CODE(code, lino, _end);
200,590!
1192

1193
  // stage
1194
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,590✔
1195
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,531!
1196

1197
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
200,531✔
1198
  TSDB_CHECK_CODE(code, lino, _end);
200,470!
1199

1200
  // input queue
1201
  char        vbuf[40] = {0};
200,470✔
1202
  char        buf[38] = {0};
200,470✔
1203
  const char *queueInfoStr = "%4.2f MiB (%6.2f%)";
200,470✔
1204
  snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate);
200,470✔
1205
  STR_TO_VARSTR(vbuf, buf);
200,470✔
1206

1207
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,470✔
1208
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,133!
1209

1210
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
201,133✔
1211
  TSDB_CHECK_CODE(code, lino, _end);
200,922!
1212

1213
  // input total
1214
  const char *formatTotalMb = "%7.2f MiB";
200,922✔
1215
  const char *formatTotalGb = "%7.2f GiB";
200,922✔
1216
  if (pe->procsTotal < 1024) {
200,922✔
1217
    snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal);
200,875✔
1218
  } else {
1219
    snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024);
47✔
1220
  }
1221

1222
  memset(vbuf, 0, tListLen(vbuf));
200,922✔
1223
  STR_TO_VARSTR(vbuf, buf);
200,922✔
1224

1225
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,922✔
1226
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,202!
1227

1228
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
201,202✔
1229
  TSDB_CHECK_CODE(code, lino, _end);
200,867!
1230

1231
  // process throughput
1232
  const char *formatKb = "%7.2f KiB/s";
200,867✔
1233
  const char *formatMb = "%7.2f MiB/s";
200,867✔
1234
  if (pe->procsThroughput < 1024) {
200,867✔
1235
    snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput);
200,763✔
1236
  } else {
1237
    snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024);
104✔
1238
  }
1239

1240
  memset(vbuf, 0, tListLen(vbuf));
200,867✔
1241
  STR_TO_VARSTR(vbuf, buf);
200,867✔
1242

1243
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,867✔
1244
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,203!
1245

1246
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
201,203✔
1247
  TSDB_CHECK_CODE(code, lino, _end);
200,897!
1248

1249
  // output total
1250
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,897✔
1251
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,820!
1252

1253
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
201,038✔
1254
    colDataSetNULL(pColInfo, numOfRows);
79,983!
1255
  } else {
1256
    (void)tsnprintf(buf, sizeof(buf), formatTotalMb, pe->outputTotal);
121,055✔
1257
    memset(vbuf, 0, tListLen(vbuf));
121,083✔
1258
    STR_TO_VARSTR(vbuf, buf);
121,083✔
1259

1260
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
121,083✔
1261
    TSDB_CHECK_CODE(code, lino, _end);
121,137!
1262
  }
1263

1264
  // output throughput
1265
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,120✔
1266
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,802!
1267

1268
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
201,046✔
1269
    colDataSetNULL(pColInfo, numOfRows);
79,954!
1270
  } else {
1271
    if (pe->outputThroughput < 1024) {
121,092✔
1272
      snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput);
103,799✔
1273
    } else {
1274
      snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024);
17,293✔
1275
    }
1276

1277
    memset(vbuf, 0, tListLen(vbuf));
121,092✔
1278
    STR_TO_VARSTR(vbuf, buf);
121,092✔
1279

1280
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
121,092✔
1281
    TSDB_CHECK_CODE(code, lino, _end);
121,144!
1282
  }
1283
  // info
1284
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
201,098✔
1285
    const char *sinkStr = "%.2f MiB";
79,925✔
1286
    snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
79,925✔
1287
  } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {  // offset info
121,173✔
1288
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
102,425✔
1289
      int32_t ret = taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision);
1,732✔
1290
      if (ret != 0) {
1,732!
UNCOV
1291
        mError("failed to format processed timewindow, skey:%" PRId64, pe->processedVer);
×
UNCOV
1292
        memset(buf, 0, tListLen(buf));
×
1293
      }
1294
    } else {
1295
      const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
100,693✔
1296
      snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
100,693✔
1297
    }
1298
  } else {
1299
    memset(buf, 0, tListLen(buf));
18,748✔
1300
  }
1301

1302
  STR_TO_VARSTR(vbuf, buf);
201,098✔
1303

1304
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,098✔
1305
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,129!
1306

1307
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
201,129✔
1308
  TSDB_CHECK_CODE(code, lino, _end);
200,971!
1309

1310
  // start_time
1311
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,971✔
1312
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,949!
1313

1314
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false);
200,949✔
1315
  TSDB_CHECK_CODE(code, lino, _end);
200,767!
1316

1317
  // start id
1318
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,767✔
1319
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,713!
1320

1321
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false);
200,713✔
1322
  TSDB_CHECK_CODE(code, lino, _end);
200,691!
1323

1324
  // start ver
1325
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,691✔
1326
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,616!
1327

1328
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false);
200,616✔
1329
  TSDB_CHECK_CODE(code, lino, _end);
200,502!
1330

1331
  // checkpoint time
1332
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,502✔
1333
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,442!
1334

1335
  if (pe->checkpointInfo.latestTime != 0) {
200,828✔
1336
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
88,392✔
1337
  } else {
1338
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
112,436✔
1339
  }
1340
  TSDB_CHECK_CODE(code, lino, _end);
200,398!
1341

1342
  // checkpoint_id
1343
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,398✔
1344
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,384!
1345

1346
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false);
200,384✔
1347
  TSDB_CHECK_CODE(code, lino, _end);
200,361!
1348

1349
  // checkpoint version
1350
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,361✔
1351
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,311!
1352

1353
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false);
200,311✔
1354
  TSDB_CHECK_CODE(code, lino, _end);
200,189!
1355

1356
  // checkpoint size
1357
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,189✔
1358
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,148✔
1359

1360
  colDataSetNULL(pColInfo, numOfRows);
200,131!
1361

1362
  // checkpoint backup status
1363
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,131✔
1364
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,021!
1365

1366
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
200,021✔
1367
  TSDB_CHECK_CODE(code, lino, _end);
199,965!
1368

1369
  // ds_err_info
1370
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
199,965✔
1371
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
199,919!
1372

1373
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
199,919✔
1374
  TSDB_CHECK_CODE(code, lino, _end);
199,578!
1375

1376
  // history_task_id
1377
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
199,578✔
1378
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
199,514!
1379

1380
  if (pe->hTaskId != 0) {
199,547✔
1381
    int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
187✔
1382
    code = colDataSetVal(pColInfo, numOfRows, idstr, false);
187✔
1383
  } else {
1384
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
199,360✔
1385
  }
1386
  TSDB_CHECK_CODE(code, lino, _end);
199,357!
1387

1388
  // history_task_status
1389
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
199,357✔
1390
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
199,340!
1391

1392
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
199,340✔
1393
  TSDB_CHECK_CODE(code, lino, _end);
199,261!
1394

1395
_end:
199,261✔
1396
  if (code) {
199,261!
UNCOV
1397
    mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code));
×
1398
  }
1399
  return code;
199,307✔
1400
}
1401

1402
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
4,475✔
1403
  const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
4,475✔
1404
  const SEp *p = GET_ACTIVE_EP(pCurrent);
4,475✔
1405

1406
  if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
4,475!
1407
    return false;
4,474✔
1408
  }
1409
  return true;
1✔
1410
}
1411

1412
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo) {
1,207✔
1413
  if (pInfo != NULL) {
1,207!
1414
    taosArrayDestroy(pInfo->pUpdateNodeList);
1,207✔
1415
    taosHashCleanup(pInfo->pDBMap);
1,207✔
1416
  }
1417
}
1,207✔
1418

1419
// 1. increase the replica does not affect the stream process.
1420
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
1421
// tasks on the will be removed replica.
1422
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
1423
// will handle it as mentioned in 1 & 2 items.
1424
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
1,207✔
1425
                               SVgroupChangeInfo *pInfo) {
1426
  int32_t code = 0;
1,207✔
1427
  int32_t lino = 0;
1,207✔
1428

1429
  if (pInfo == NULL) {
1,207!
UNCOV
1430
    return TSDB_CODE_INVALID_PARA;
×
1431
  }
1432

1433
  pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
1,207✔
1434
  pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
1,207✔
1435

1436
  if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
1,207!
UNCOV
1437
    mndDestroyVgroupChangeInfo(pInfo);
×
UNCOV
1438
    TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
×
1439
  }
1440

1441
  int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
1,207✔
1442
  for (int32_t i = 0; i < numOfNodes; ++i) {
6,387✔
1443
    SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
5,180✔
1444
    if (pPrevEntry == NULL) {
5,180!
UNCOV
1445
      continue;
×
1446
    }
1447

1448
    int32_t num = taosArrayGetSize(pNodeList);
5,180✔
1449
    for (int32_t j = 0; j < num; ++j) {
19,472✔
1450
      SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
18,785✔
1451
      if (pCurrent == NULL) {
18,785!
UNCOV
1452
        continue;
×
1453
      }
1454

1455
      if (pCurrent->nodeId == pPrevEntry->nodeId) {
18,785✔
1456
        if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
4,493✔
1457
          const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
19✔
1458

1459
          char buf[256] = {0};
19✔
1460
          code = epsetToStr(&pCurrent->epset, buf, tListLen(buf));  // ignore this error
19✔
1461
          if (code) {
19!
UNCOV
1462
            mError("failed to convert epset string, code:%s", tstrerror(code));
×
UNCOV
1463
            TSDB_CHECK_CODE(code, lino, _err);
×
1464
          }
1465

1466
          mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
19✔
1467
                 pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
1468

1469
          SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
19✔
1470
          epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
19✔
1471
          epsetAssign(&updateInfo.newEp, &pCurrent->epset);
19✔
1472

1473
          void *p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
19✔
1474
          TSDB_CHECK_NULL(p, code, lino, _err, terrno);
19!
1475
        }
1476

1477
        // todo handle the snode info
1478
        if (pCurrent->nodeId != SNODE_HANDLE) {
4,493✔
1479
          SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
4,199✔
1480
          code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
4,199✔
1481
          mndReleaseVgroup(pMnode, pVgroup);
4,199✔
1482
          TSDB_CHECK_CODE(code, lino, _err);
4,199!
1483
        }
1484

1485
        break;
4,493✔
1486
      }
1487
    }
1488
  }
1489

1490
  return code;
1,207✔
1491

UNCOV
1492
_err:
×
UNCOV
1493
  mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
×
UNCOV
1494
  mndDestroyVgroupChangeInfo(pInfo);
×
UNCOV
1495
  return code;
×
1496
}
1497

1498
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
1,427✔
1499
  bool              allReady = false;
1,427✔
1500
  bool              nodeUpdated = false;
1,427✔
1501
  SVgroupChangeInfo changeInfo = {0};
1,427✔
1502

1503
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,427✔
1504

1505
  if (numOfNodes == 0) {
1,427✔
1506
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
710✔
1507
    execInfo.ts = taosGetTimestampSec();
710✔
1508
    return false;
710✔
1509
  }
1510

1511
  for (int32_t i = 0; i < numOfNodes; ++i) {
3,818✔
1512
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
3,120✔
1513
    if (pNodeEntry == NULL) {
3,120!
UNCOV
1514
      continue;
×
1515
    }
1516

1517
    if (pNodeEntry->stageUpdated) {
3,120✔
1518
      mDebug("stream task not ready due to node update detected, checkpoint not issued");
19✔
1519
      return true;
19✔
1520
    }
1521
  }
1522

1523
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
698✔
1524
  if (code) {
698!
UNCOV
1525
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
1526
  }
1527

1528
  if (!allReady) {
698✔
1529
    mWarn("not all vnodes ready, quit from vnodes status check");
11!
1530
    return true;
11✔
1531
  }
1532

1533
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
687✔
1534
  if (code) {
687!
UNCOV
1535
    nodeUpdated = false;
×
1536
  } else {
1537
    nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
687✔
1538
    if (nodeUpdated) {
687!
UNCOV
1539
      mDebug("stream tasks not ready due to node update");
×
1540
    }
1541
  }
1542

1543
  mndDestroyVgroupChangeInfo(&changeInfo);
687✔
1544
  return nodeUpdated;
687✔
1545
}
1546

1547
// check if the node update happens or not
1548
bool mndStreamNodeIsUpdated(SMnode *pMnode) {
1,427✔
1549
  SArray *pNodeSnapshot = NULL;
1,427✔
1550

1551
  streamMutexLock(&execInfo.lock);
1,427✔
1552
  bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
1,427✔
1553
  streamMutexUnlock(&execInfo.lock);
1,427✔
1554

1555
  taosArrayDestroy(pNodeSnapshot);
1,427✔
1556
  return updated;
1,427✔
1557
}
1558

1559
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
1,577✔
1560
  SSdb      *pSdb = pMnode->pSdb;
1,577✔
1561
  void      *pIter = NULL;
1,577✔
1562
  SSnodeObj *pObj = NULL;
1,577✔
1563

1564
  if (pSrcDb->cfg.replications == 1) {
1,577✔
1565
    return TSDB_CODE_SUCCESS;
1,574✔
1566
  } else {
1567
    while (1) {
1568
      pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
3✔
1569
      if (pIter == NULL) {
3✔
1570
        break;
2✔
1571
      }
1572

1573
      sdbRelease(pSdb, pObj);
1✔
1574
      sdbCancelFetch(pSdb, pIter);
1✔
1575
      return TSDB_CODE_SUCCESS;
1✔
1576
    }
1577

1578
    mError("snode not existed when trying to create stream in db with multiple replica");
2!
1579
    return TSDB_CODE_SNODE_NOT_DEPLOYED;
2✔
1580
  }
1581
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc