• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.7
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
struct SStreamTaskIter {
25
  SStreamObj  *pStream;
26
  int32_t      level;
27
  int32_t      ordinalIndex;
28
  int32_t      totalLevel;
29
  SStreamTask *pTask;
30
};
31

32
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
33

34
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter) {
109,781✔
35
  *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
109,781!
36
  if (*pIter == NULL) {
109,794!
37
    return terrno;
×
38
  }
39

40
  (*pIter)->level = -1;
109,794✔
41
  (*pIter)->ordinalIndex = 0;
109,794✔
42
  (*pIter)->pStream = pStream;
109,794✔
43
  (*pIter)->totalLevel = taosArrayGetSize(pStream->pTaskList);
109,794✔
44
  (*pIter)->pTask = NULL;
109,777✔
45

46
  return 0;
109,777✔
47
}
48

49
bool streamTaskIterNextTask(SStreamTaskIter *pIter) {
499,379✔
50
  if (pIter->level >= pIter->totalLevel) {
499,379!
51
    pIter->pTask = NULL;
×
52
    return false;
×
53
  }
54

55
  if (pIter->level == -1) {
499,379✔
56
    pIter->level += 1;
109,779✔
57
  }
58

59
  while (pIter->level < pIter->totalLevel) {
714,933✔
60
    SArray *pList = taosArrayGetP(pIter->pStream->pTaskList, pIter->level);
605,234✔
61
    if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
604,765✔
62
      pIter->level += 1;
215,554✔
63
      pIter->ordinalIndex = 0;
215,554✔
64
      pIter->pTask = NULL;
215,554✔
65
      continue;
215,554✔
66
    }
67

68
    pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
389,640✔
69
    pIter->ordinalIndex += 1;
389,486✔
70
    return true;
389,486✔
71
  }
72

73
  pIter->pTask = NULL;
109,699✔
74
  return false;
109,699✔
75
}
76

77
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask) {
389,554✔
78
  if (pTask) {
389,554!
79
    *pTask = pIter->pTask;
389,569✔
80
    if (*pTask != NULL) {
389,569✔
81
      return TSDB_CODE_SUCCESS;
389,566✔
82
    }
83
  }
84

85
  return TSDB_CODE_INVALID_PARA;
×
86
}
87

88
void destroyStreamTaskIter(SStreamTaskIter *pIter) { taosMemoryFree(pIter); }
109,624!
89

90
static bool checkStatusForEachReplica(SVgObj *pVgroup) {
303,882✔
91
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
611,623✔
92
    if (!pVgroup->vnodeGid[i].syncRestore) {
308,982✔
93
      mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
1,235!
94
      return false;
1,235✔
95
    }
96

97
    ESyncState state = pVgroup->vnodeGid[i].syncState;
307,747✔
98
    if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
307,747!
99
        state == TAOS_SYNC_STATE_CANDIDATE) {
100
      mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
6!
101
            state);
102
      return false;
6✔
103
    }
104
  }
105

106
  return true;
302,641✔
107
}
108

109
static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) {
16,252✔
110
  SSnodeObj *pObj = NULL;
16,252✔
111
  void      *pIter = NULL;
16,252✔
112
  int32_t    code = 0;
16,252✔
113

114
  while (1) {
7,896✔
115
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
24,148✔
116
    if (pIter == NULL) {
24,148✔
117
      break;
16,252✔
118
    }
119

120
    SNodeEntry entry = {.nodeId = SNODE_HANDLE};
7,896✔
121
    code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
7,896✔
122
    if (code) {
7,896!
123
      sdbRelease(pMnode->pSdb, pObj);
×
124
      sdbCancelFetch(pMnode->pSdb, pIter);
×
125
      mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
×
126
      return code;
×
127
    }
128

129
    char buf[256] = {0};
7,896✔
130
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
7,896✔
131
    if (code != 0) {  // print error and continue
7,896!
132
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
133
    }
134

135
    void *p = taosArrayPush(pVgroupList, &entry);
7,896✔
136
    if (p == NULL) {
7,896!
137
      code = terrno;
×
138
      sdbRelease(pMnode->pSdb, pObj);
×
139
      sdbCancelFetch(pMnode->pSdb, pIter);
×
140
      mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
×
141
      return code;
×
142
    } else {
143
      mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
7,896✔
144
    }
145

146
    sdbRelease(pMnode->pSdb, pObj);
7,896✔
147
  }
148

149
  return code;
16,252✔
150
}
151

152
static int32_t mndCheckMnodeStatus(SMnode* pMnode) {
16,252✔
153
  int32_t    code = 0;
16,252✔
154
  ESdbStatus objStatus;
155
  void      *pIter = NULL;
16,252✔
156
  SMnodeObj *pObj = NULL;
16,252✔
157

158
  while (1) {
159
    pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true);
33,064✔
160
    if (pIter == NULL) {
33,064✔
161
      break;
16,126✔
162
    }
163

164
    if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) {
16,938✔
165
      mDebug("mnode sync state:%d not leader/follower", pObj->syncState);
124!
166
      sdbRelease(pMnode->pSdb, pObj);
124✔
167
      sdbCancelFetch(pMnode->pSdb, pIter);
124✔
168
      return TSDB_CODE_FAILED;
124✔
169
    }
170

171
    if (objStatus != SDB_STATUS_READY) {
16,814✔
172
      mWarn("mnode status:%d not ready", objStatus);
2!
173
      sdbRelease(pMnode->pSdb, pObj);
2✔
174
      sdbCancelFetch(pMnode->pSdb, pIter);
2✔
175
      return TSDB_CODE_FAILED;
2✔
176
    }
177

178
    sdbRelease(pMnode->pSdb, pObj);
16,812✔
179
  }
180

181
  return TSDB_CODE_SUCCESS;
16,126✔
182
}
183

184
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) {
16,252✔
185
  SSdb     *pSdb = pMnode->pSdb;
16,252✔
186
  void     *pIter = NULL;
16,252✔
187
  SVgObj   *pVgroup = NULL;
16,252✔
188
  int32_t   code = 0;
16,252✔
189
  SHashObj *pHash = NULL;
16,252✔
190

191
  pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
16,252✔
192
  if (pHash == NULL) {
16,252!
193
    mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
×
194
    return terrno;
×
195
  }
196

197
  while (1) {
307,180✔
198
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
323,432✔
199
    if (pIter == NULL) {
323,432✔
200
      break;
16,252✔
201
    }
202

203
    SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
307,180✔
204
    entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
307,180✔
205

206
    int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
307,180✔
207
    if (pReplica == NULL) {  // not exist, add it into hash map
307,180✔
208
      code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
153,413✔
209
      if (code) {
153,413!
210
        mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
×
211
        sdbRelease(pSdb, pVgroup);
×
212
        sdbCancelFetch(pSdb, pIter);
×
213
        goto _end;  // take snapshot failed, and not all ready
×
214
      }
215
    } else {
216
      if (*pReplica != pVgroup->replica) {
153,767✔
217
        mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
391!
218
              pVgroup->vgId, pVgroup->replica, *pReplica);
219
        *allReady = false;  // task snap success, but not all ready
391✔
220
      }
221
    }
222

223
    // if not all ready till now, no need to check the remaining vgroups,
224
    // but still we need to put the info of the existed vgroups into the snapshot list
225
    if (*allReady) {
307,180✔
226
      *allReady = checkStatusForEachReplica(pVgroup);
303,882✔
227
    }
228

229
    char buf[256] = {0};
307,180✔
230
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
307,180✔
231
    if (code != 0) {  // print error and continue
307,180!
232
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
233
    }
234

235
    void *p = taosArrayPush(pVgroupList, &entry);
307,180✔
236
    if (p == NULL) {
307,180!
237
      mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
×
238
      code = terrno;
×
239
      sdbRelease(pSdb, pVgroup);
×
240
      sdbCancelFetch(pSdb, pIter);
×
241
      goto _end;
×
242
    } else {
243
      mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
307,180✔
244
    }
245

246
    sdbRelease(pSdb, pVgroup);
307,180✔
247
  }
248

249
_end:
16,252✔
250
  taosHashCleanup(pHash);
16,252✔
251
  return code;
16,252✔
252
}
253

254
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
16,252✔
255
  int32_t   code = 0;
16,252✔
256
  SArray   *pVgroupList = NULL;
16,252✔
257

258
  *pList = NULL;
16,252✔
259
  *allReady = true;
16,252✔
260

261
  pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
16,252✔
262
  if (pVgroupList == NULL) {
16,252!
263
    mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
×
264
    code = terrno;
×
265
    goto _err;
×
266
  }
267

268
  // 1. check for all vnodes status
269
  code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady);
16,252✔
270
  if (code) {
16,252!
271
    goto _err;
×
272
  }
273

274
  // 2. add snode info
275
  code = mndAddSnodeInfo(pMnode, pVgroupList);
16,252✔
276
  if (code) {
16,252!
277
    goto _err;
×
278
  }
279

280
  // 3. check for mnode status
281
  code = mndCheckMnodeStatus(pMnode);
16,252✔
282
  if (code != TSDB_CODE_SUCCESS) {
16,252✔
283
    *allReady = false;
126✔
284
  }
285

286
  *pList = pVgroupList;
16,252✔
287
  return code;
16,252✔
288

289
_err:
×
290
  *allReady = false;
×
291
  taosArrayDestroy(pVgroupList);
×
292
  return code;
×
293
}
294

295
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
13,205✔
296
  void *pIter = NULL;
13,205✔
297
  SSdb *pSdb = pMnode->pSdb;
13,205✔
298
  *pStream = NULL;
13,205✔
299

300
  SStreamObj *p = NULL;
13,205✔
301
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
22,668✔
302
    if (p->uid == streamId) {
22,665✔
303
      sdbCancelFetch(pSdb, pIter);
13,202✔
304
      *pStream = p;
13,202✔
305
      return TSDB_CODE_SUCCESS;
13,202✔
306
    }
307
    sdbRelease(pSdb, p);
9,463✔
308
  }
309

310
  return TSDB_CODE_STREAM_TASK_NOT_EXIST;
3✔
311
}
312

313
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
×
314
  STrans *pTrans = mndAcquireTrans(pMnode, transId);
×
315
  if (pTrans != NULL) {
×
316
    mInfo("kill active transId:%d in Db:%s", transId, pDbName);
×
317
    int32_t code = mndKillTrans(pMnode, pTrans);
×
318
    mndReleaseTrans(pMnode, pTrans);
×
319
    if (code) {
×
320
      mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
×
321
    }
322
  } else {
323
    mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
×
324
  }
325
}
×
326

327
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
20,265✔
328
  *hasEpset = false;
20,265✔
329

330
  pEpSet->numOfEps = 0;
20,265✔
331
  if (nodeId == SNODE_HANDLE) {
20,265✔
332
    SSnodeObj *pObj = NULL;
335✔
333
    void      *pIter = NULL;
335✔
334

335
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
335✔
336
    if (pIter != NULL) {
335!
337
      int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
335✔
338
      sdbRelease(pMnode->pSdb, pObj);
335✔
339
      sdbCancelFetch(pMnode->pSdb, pIter);
335✔
340
      if (code) {
335!
341
        *hasEpset = false;
×
342
        mError("failed to set epset");
×
343
      } else {
344
        *hasEpset = true;
335✔
345
      }
346
      return code;
335✔
347
    } else {
348
      mError("failed to acquire snode epset");
×
349
      return TSDB_CODE_INVALID_PARA;
×
350
    }
351
  } else {
352
    SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
19,930✔
353
    if (pVgObj != NULL) {
19,930✔
354
      SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
19,929✔
355
      mndReleaseVgroup(pMnode, pVgObj);
19,929✔
356

357
      epsetAssign(pEpSet, &epset);
19,929✔
358
      *hasEpset = true;
19,929✔
359
      return TSDB_CODE_SUCCESS;
19,929✔
360
    } else {
361
      mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
1!
362
      return TSDB_CODE_SUCCESS;
1✔
363
    }
364
  }
365
}
366

367
int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
240✔
368
  *pTask = NULL;
240✔
369

370
  SStreamTask     *p = NULL;
240✔
371
  SStreamTaskIter *pIter = NULL;
240✔
372
  int32_t          code = createStreamTaskIter(pStream, &pIter);
240✔
373
  if (code) {
240!
374
    mError("failed to create stream task iter:%s", pStream->name);
×
375
    return code;
×
376
  }
377

378
  while (streamTaskIterNextTask(pIter)) {
875!
379
    code = streamTaskIterGetCurrent(pIter, &p);
875✔
380
    if (code) {
875!
381
      continue;
×
382
    }
383

384
    if (p->id.taskId == pId->taskId) {
875✔
385
      destroyStreamTaskIter(pIter);
240✔
386
      *pTask = p;
240✔
387
      return 0;
240✔
388
    }
389
  }
390

391
  destroyStreamTaskIter(pIter);
×
392
  return TSDB_CODE_FAILED;
×
393
}
394

395
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
78,029✔
396
  int32_t num = 0;
78,029✔
397
  for (int32_t i = 0; i < taosArrayGetSize(pStream->pTaskList); ++i) {
232,473✔
398
    SArray *pLevel = taosArrayGetP(pStream->pTaskList, i);
154,399✔
399
    num += taosArrayGetSize(pLevel);
154,461✔
400
  }
401

402
  return num;
77,997✔
403
}
404

405
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
48✔
406
  SSdb   *pSdb = pMnode->pSdb;
48✔
407
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
48✔
408
  if (pDb == NULL) {
48!
409
    TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
×
410
  }
411

412
  int32_t numOfStreams = 0;
48✔
413
  void   *pIter = NULL;
48✔
414
  while (1) {
×
415
    SStreamObj *pStream = NULL;
48✔
416
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
48✔
417
    if (pIter == NULL) break;
48!
418

419
    if (pStream->sourceDbUid == pDb->uid) {
×
420
      numOfStreams++;
×
421
    }
422

423
    sdbRelease(pSdb, pStream);
×
424
  }
425

426
  *pNumOfStreams = numOfStreams;
48✔
427
  mndReleaseDb(pMnode, pDb);
48✔
428
  return 0;
48✔
429
}
430

431
static void freeTaskList(void *param) {
1,574✔
432
  SArray **pList = (SArray **)param;
1,574✔
433
  taosArrayDestroy(*pList);
1,574✔
434
}
1,574✔
435

436
int32_t mndInitExecInfo() {
1,749✔
437
  int32_t code = taosThreadMutexInit(&execInfo.lock, NULL);
1,749✔
438
  if (code) {
1,749!
439
    return code;
×
440
  }
441

442
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
1,749✔
443

444
  execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
1,749✔
445
  execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
1,749✔
446
  execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
1,749✔
447
  execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
1,749✔
448
  execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,749✔
449
  execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,749✔
450
  execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
1,749✔
451
  execInfo.pKilledChkptTrans = taosArrayInit(4, sizeof(SStreamTaskResetMsg));
1,749✔
452

453
  if (execInfo.pTaskList == NULL || execInfo.pTaskMap == NULL || execInfo.transMgmt.pDBTrans == NULL ||
1,749!
454
      execInfo.pTransferStateStreams == NULL || execInfo.pChkptStreams == NULL || execInfo.pStreamConsensus == NULL ||
1,749!
455
      execInfo.pNodeList == NULL || execInfo.pKilledChkptTrans == NULL) {
1,749!
456
    mError("failed to initialize the stream runtime env, code:%s", tstrerror(terrno));
×
457
    return terrno;
×
458
  }
459

460
  execInfo.role = NODE_ROLE_UNINIT;
1,749✔
461
  execInfo.switchFromFollower = false;
1,749✔
462

463
  taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
1,749✔
464
  taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
1,749✔
465
  taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
1,749✔
466
  return 0;
1,749✔
467
}
468

469
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
1,601✔
470
  SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
1,601✔
471
  if (pValidList == NULL) {  // not continue
1,601!
472
    return;
×
473
  }
474

475
  int32_t size = taosArrayGetSize(pNodeSnapshot);
1,601✔
476
  int32_t oldSize = taosArrayGetSize(execInfo.pNodeList);
1,601✔
477

478
  for (int32_t i = 0; i < oldSize; ++i) {
10,842✔
479
    SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
9,241✔
480
    if (p == NULL) {
9,241!
481
      continue;
×
482
    }
483

484
    for (int32_t j = 0; j < size; ++j) {
125,621✔
485
      SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
124,568✔
486
      if (pEntry == NULL) {
124,568!
487
        continue;
×
488
      }
489

490
      if (pEntry->nodeId == p->nodeId) {
124,568✔
491
        p->hbTimestamp = pEntry->hbTimestamp;
8,188✔
492

493
        void *px = taosArrayPush(pValidList, p);
8,188✔
494
        if (px == NULL) {
8,188!
495
          mError("failed to put node into list, nodeId:%d", p->nodeId);
×
496
        } else {
497
          mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
8,188✔
498
        }
499
        break;
8,188✔
500
      }
501
    }
502
  }
503

504
  taosArrayDestroy(execInfo.pNodeList);
1,601✔
505
  execInfo.pNodeList = pValidList;
1,601✔
506

507
  mDebug("remain %d valid node entries after clean expired nodes info, prev size:%d",
1,601✔
508
         (int32_t)taosArrayGetSize(pValidList), oldSize);
509
}
510

511
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
7,330✔
512
  void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
7,330✔
513
  if (p == NULL) {
7,330✔
514
    return TSDB_CODE_SUCCESS;
116✔
515
  }
516

517
  int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
7,214✔
518
  if (code) {
7,214!
519
    return code;
×
520
  }
521

522
  for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
20,994!
523
    STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
20,994✔
524
    if (pId == NULL) {
20,994!
525
      continue;
×
526
    }
527

528
    if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
20,994!
529
      taosArrayRemove(pExecNode->pTaskList, k);
7,214✔
530

531
      int32_t num = taosArrayGetSize(pExecNode->pTaskList);
7,214✔
532
      mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
7,214!
533
      break;
7,214✔
534
    }
535
  }
536

537
  return TSDB_CODE_SUCCESS;
7,214✔
538
}
539

540
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo) {
1,601✔
541
  for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
1,601!
542
    STaskId *pId = taosArrayGet(pTaskIds, i);
×
543
    if (pId == NULL) {
×
544
      continue;
×
545
    }
546

547
    int32_t code = doRemoveTasks(pExecInfo, pId);
×
548
    if (code) {
×
549
      mError("failed to remove task in buffer list, 0x%" PRIx64, pId->taskId);
×
550
    }
551
  }
552
}
1,601✔
553

554
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,348✔
555
  SStreamTaskIter *pIter = NULL;
1,348✔
556
  streamMutexLock(&pExecNode->lock);
1,348✔
557

558
  // 1. remove task entries
559
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,348✔
560
  if (code) {
1,348!
561
    streamMutexUnlock(&pExecNode->lock);
×
562
    mError("failed to create stream task iter:%s", pStream->name);
×
563
    return;
×
564
  }
565

566
  while (streamTaskIterNextTask(pIter)) {
8,678✔
567
    SStreamTask *pTask = NULL;
7,330✔
568
    code = streamTaskIterGetCurrent(pIter, &pTask);
7,330✔
569
    if (code) {
7,330!
570
      continue;
×
571
    }
572

573
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
7,330✔
574
    code = doRemoveTasks(pExecNode, &id);
7,330✔
575
    if (code) {
7,330!
576
      mError("failed to remove task in buffer list, 0x%" PRIx64, id.taskId);
×
577
    }
578
  }
579

580
  if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) {
1,348!
581
    streamMutexUnlock(&pExecNode->lock);
×
582
    destroyStreamTaskIter(pIter);
×
583
    mError("task map size, task list size, not equal");
×
584
    return;
×
585
  }
586

587
  // 2. remove stream entry in consensus hash table and checkpoint-report hash table
588
  code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
1,348✔
589
  if (code) {
1,348!
590
    mError("failed to clear consensus checkpointId, code:%s", tstrerror(code));
×
591
  }
592

593
  code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
1,348✔
594
  if (code) {
1,348✔
595
    mError("failed to clear the checkpoint report info, code:%s", tstrerror(code));
372!
596
  }
597

598
  streamMutexUnlock(&pExecNode->lock);
1,348✔
599
  destroyStreamTaskIter(pIter);
1,348✔
600
}
601

602
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
16,636✔
603
  size_t num = taosArrayGetSize(pList);
16,636✔
604

605
  for (int32_t i = 0; i < num; ++i) {
141,642!
606
    SNodeEntry *pEntry = taosArrayGet(pList, i);
141,642✔
607
    if (pEntry == NULL) {
141,642!
608
      continue;
×
609
    }
610

611
    if (pEntry->nodeId == nodeId) {
141,642✔
612
      return true;
16,636✔
613
    }
614
  }
615

616
  return false;
×
617
}
618

619
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
1,601✔
620
  SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
1,601✔
621
  if (pRemovedTasks == NULL) {
1,601!
622
    return terrno;
×
623
  }
624

625
  int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
1,601✔
626
  for (int32_t i = 0; i < numOfTask; ++i) {
19,277✔
627
    STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
17,676✔
628
    if (pId == NULL) {
17,676!
629
      continue;
×
630
    }
631

632
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
17,676✔
633
    if (pEntry == NULL) {
17,676!
634
      continue;
×
635
    }
636

637
    if (pEntry->nodeId == SNODE_HANDLE) {
17,676✔
638
      continue;
1,040✔
639
    }
640

641
    bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
16,636✔
642
    if (!existed) {
16,636!
643
      void *p = taosArrayPush(pRemovedTasks, pId);
×
644
      if (p == NULL) {
×
645
        mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
×
646
      }
647
    }
648
  }
649

650
  removeTasksInBuf(pRemovedTasks, &execInfo);
1,601✔
651

652
  mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
1,601✔
653
         (int32_t)taosArrayGetSize(execInfo.pTaskList));
654

655
  removeExpiredNodeInfo(pNodeSnapshot);
1,601✔
656

657
  taosArrayDestroy(pRemovedTasks);
1,601✔
658
  return 0;
1,601✔
659
}
660

661
static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) {
1,268✔
662
  int64_t checkpointId = -1;
1,268✔
663
  int32_t transId = -1;
1,268✔
664
  int32_t taskId = -1;
1,268✔
665

666
  int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList);
1,268✔
667
  if (existed != numOfTasks) {
1,268✔
668
    mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName,
40✔
669
           existed, numOfTasks, numOfTasks - existed);
670
    return -1;
40✔
671
  }
672

673
  // acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList
674
  for(int32_t i = 0; i < numOfTasks; ++i) {
6,947✔
675
    STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i);
5,719✔
676
    if (pInfo == NULL) {
5,719!
677
      continue;
×
678
    }
679

680
    if (checkpointId == -1) {
5,719✔
681
      checkpointId = pInfo->checkpointId;
1,228✔
682
      transId = pInfo->transId;
1,228✔
683
      taskId = pInfo->taskId;
1,228✔
684
    } else if (checkpointId != pInfo->checkpointId) {
4,491!
685
      mError("stream:0x%" PRIx64
×
686
             " checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64
687
             ", type 2 taskId:0x%x checkpointId:%" PRId64,
688
             pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId);
689
      return -1;
×
690
    }
691
  }
692

693
  // check for the correct checkpointId for current task info in STaskChkptInfo
694
  STaskChkptInfo  *p = taosArrayGet(pReportInfo->pTaskList, 0);
1,228✔
695
  STaskId id = {.streamId = p->streamId, .taskId = p->taskId};
1,228✔
696
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
1,228✔
697

698
  // cross-check failed, there must be something unknown wrong
699
  SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId));
1,228✔
700
  if (pTransInfo == NULL) {
1,228✔
701
    mWarn("stream:0x%" PRIx64 " no active trans exists for checkpoint transId:%d, it may have been cleared already",
180!
702
           id.streamId, transId);
703

704
    if (pe->checkpointInfo.activeId != 0 && pe->checkpointInfo.activeId != checkpointId) {
180!
705
      mWarn("stream:0x%" PRIx64 " active checkpointId is not equalled to the required, current:%" PRId64
×
706
            ", req:%" PRId64 " recheck next time",
707
            id.streamId, pe->checkpointInfo.activeId, checkpointId);
708
      return -1;
×
709
    } else {
710
      //  do nothing
711
    }
712
  } else {
713
    if (pTransInfo->transId != transId) {
1,048✔
714
      mError("stream:0x%" PRIx64
2!
715
             " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time",
716
             id.streamId, pTransInfo->transId, transId);
717
      return -1;
2✔
718
    }
719
  }
720

721
  mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId,
1,226✔
722
         pName, numOfTasks);
723

724
  return TSDB_CODE_SUCCESS;
1,226✔
725
}
726

727
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
32,010✔
728
  SMnode *pMnode = pReq->info.node;
32,010✔
729
  void   *pIter = NULL;
32,010✔
730
  int32_t code = 0;
32,010✔
731
  SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
32,010✔
732
  if (pDropped == NULL) {
32,010!
733
    return terrno;
×
734
  }
735

736
  mDebug("start to scan checkpoint report info");
32,010✔
737

738
  streamMutexLock(&execInfo.lock);
32,010✔
739

740
  while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
110,064✔
741
    SChkptReportInfo *px = (SChkptReportInfo *)pIter;
79,280✔
742
    if (taosArrayGetSize(px->pTaskList) == 0) {
79,280✔
743
      continue;
78,012✔
744
    }
745

746
    STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
1,268✔
747
    if (pInfo == NULL) {
1,268!
748
      continue;
×
749
    }
750

751
    SStreamObj *pStream = NULL;
1,268✔
752
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
1,268✔
753
    if (pStream == NULL || code != 0) {
1,268!
UNCOV
754
      mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
×
UNCOV
755
      void *p = taosArrayPush(pDropped, &pInfo->streamId);
×
UNCOV
756
      if (p == NULL) {
×
757
        mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
×
758
      }
UNCOV
759
      continue;
×
760
    }
761

762
    int32_t total = mndGetNumOfStreamTasks(pStream);
1,268✔
763
    int32_t ret = allTasksSendChkptReport(px, total, pStream->name);
1,268✔
764
    if (ret == 0) {
1,268✔
765
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
1,226✔
766
      if (code == 0) {
1,226!
767
        code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
1,226✔
768
        if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {  // remove this entry
1,226!
769
          taosArrayClear(px->pTaskList);
1,226✔
770
          mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64
1,226!
771
                " to %" PRId64,
772
                pInfo->streamId, px->reportChkpt, pInfo->checkpointId);
773
          px->reportChkpt = pInfo->checkpointId;
1,226✔
774
        } else {
775
          mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet",
×
776
                 pInfo->streamId);
777
        }
778

779
        sdbRelease(pMnode->pSdb, pStream);
1,226✔
780
        break;
1,226✔
781
      } else {
782
        mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
×
783
      }
784
    }
785

786
    sdbRelease(pMnode->pSdb, pStream);
42✔
787
  }
788

789
  int32_t size = taosArrayGetSize(pDropped);
32,010✔
790
  if (size > 0) {
32,010!
UNCOV
791
    for (int32_t i = 0; i < size; ++i) {
×
UNCOV
792
      int64_t *pStreamId = (int64_t *)taosArrayGet(pDropped, i);
×
UNCOV
793
      if (pStreamId == NULL) {
×
794
        continue;
×
795
      }
796

UNCOV
797
      code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId));
×
UNCOV
798
      if (code) {
×
799
        mError("failed to remove stream in buf:0x%" PRIx64, *pStreamId);
×
800
      }
801
    }
802

UNCOV
803
    int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
×
UNCOV
804
    mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
×
805
  }
806

807
  streamMutexUnlock(&execInfo.lock);
32,010✔
808

809
  taosArrayDestroy(pDropped);
32,010✔
810

811
  mDebug("end to scan checkpoint report info")
32,010✔
812
  return TSDB_CODE_SUCCESS;
32,010✔
813
}
814

815
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
240✔
816
                                          int64_t ts) {
817
  char         msg[128] = {0};
240✔
818
  STrans      *pTrans = NULL;
240✔
819
  SStreamTask *pTask = NULL;
240✔
820

821
  snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
240✔
822

823
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
240✔
824
  if (pTrans == NULL || code != 0) {
240!
825
    return terrno;
×
826
  }
827

828
  STaskId id = {.streamId = pStream->uid, .taskId = taskId};
240✔
829
  code = mndGetStreamTask(&id, pStream, &pTask);
240✔
830
  if (code) {
240!
831
    mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);
×
832
    sdbRelease(pMnode->pSdb, pStream);
×
833
    return code;
×
834
  }
835

836
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
240✔
837
  if (code) {
240!
838
    sdbRelease(pMnode->pSdb, pStream);
×
839
    return code;
×
840
  }
841

842
  code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts);
240✔
843
  if (code != 0) {
240!
844
    sdbRelease(pMnode->pSdb, pStream);
×
845
    mndTransDrop(pTrans);
×
846
    return code;
×
847
  }
848

849
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
240✔
850
  if (code) {
240!
851
    sdbRelease(pMnode->pSdb, pStream);
×
852
    mndTransDrop(pTrans);
×
853
    return code;
×
854
  }
855

856
  code = mndTransPrepare(pMnode, pTrans);
240✔
857
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
240!
858
    mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
×
859
    sdbRelease(pMnode->pSdb, pStream);
×
860
    mndTransDrop(pTrans);
×
861
    return code;
×
862
  }
863

864
  sdbRelease(pMnode->pSdb, pStream);
240✔
865
  mndTransDrop(pTrans);
240✔
866

867
  return TSDB_CODE_ACTION_IN_PROGRESS;
240✔
868
}
869

870
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
248✔
871
  *pInfo = NULL;
248✔
872

873
  void *px = taosHashGet(pHash, &streamId, sizeof(streamId));
248✔
874
  if (px != NULL) {
248✔
875
    *pInfo = px;
195✔
876
    return 0;
195✔
877
  }
878

879
  SCheckpointConsensusInfo p = {
53✔
880
      .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
53✔
881
      .numOfTasks = numOfTasks,
882
      .streamId = streamId,
883
  };
884

885
  if (p.pTaskList == NULL) {
53!
886
    return terrno;
×
887
  }
888

889
  int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
53✔
890
  if (code == 0) {
53!
891
    void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId));
53✔
892
    *pInfo = pChkptInfo;
53✔
893
  } else {
894
    *pInfo = NULL;
×
895
  }
896

897
  return code;
53✔
898
}
899

900
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
901
// discard the msg may lead to the lost of connections.
902
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) {
248✔
903
  SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
248✔
904
  memcpy(&info.req, pRestoreInfo, sizeof(info.req));
248✔
905

906
  int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList);
248✔
907
  for (int32_t i = 0; i < num; ++i) {
848✔
908
    SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
608✔
909
    if (p == NULL) {
608!
910
      continue;
×
911
    }
912

913
    if (p->req.taskId == info.req.taskId) {
608✔
914
      mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
8✔
915
             "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d",
916
             pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId,
917
             info.req.checkpointId, num);
918
      p->req.startTs = info.req.startTs;
8✔
919
      p->req.checkpointId = info.req.checkpointId;
8✔
920
      p->req.transId = info.req.transId;
8✔
921
      return;
8✔
922
    }
923
  }
924

925
  void *p = taosArrayPush(pInfo->pTaskList, &info);
240✔
926
  if (p == NULL) {
240!
927
    mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
×
928
  } else {
929
    num = taosArrayGetSize(pInfo->pTaskList);
240✔
930
    mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
240✔
931
           " waiting tasks:%d",
932
           pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
933
  }
934
}
935

936
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) {
53✔
937
  taosArrayDestroy(pInfo->pTaskList);
53✔
938
  pInfo->pTaskList = NULL;
53✔
939
}
53✔
940

941
int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
1,401✔
942
  int32_t code = 0;
1,401✔
943
  int32_t numOfStreams = taosHashGetSize(pHash);
1,401✔
944
  if (numOfStreams == 0) {
1,401✔
945
    return code;
1,348✔
946
  }
947

948
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
53✔
949
  if (code == 0) {
53!
950
    mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
53✔
951
  } else {
952
    mError("failed to remove stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
×
953
  }
954

955
  return code;
53✔
956
}
957

958
int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
1,348✔
959
  int32_t code = 0;
1,348✔
960
  int32_t numOfStreams = taosHashGetSize(pHash);
1,348✔
961
  if (numOfStreams == 0) {
1,348✔
962
    return code;
338✔
963
  }
964

965
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
1,010✔
966
  if (code == 0) {
1,010✔
967
    mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
638✔
968
  } else {
969
    mError("failed to remove stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
372!
970
  }
971

972
  return code;
1,010✔
973
}
974

975
int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId) {
×
976
  SChkptReportInfo *pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
×
977
  if (pInfo != NULL) {
×
978
    taosArrayClear(pInfo->pTaskList);
×
979
    mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
×
980
           pInfo->reportChkpt);
981
    return 0;
×
982
  }
983

984
  return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
985
}
986

987
static void mndShowStreamStatus(char *dst, int8_t status) {
35,373✔
988
  if (status == STREAM_STATUS__NORMAL) {
35,373✔
989
    tstrncpy(dst, "ready", MND_STREAM_TRIGGER_NAME_SIZE);
35,347✔
990
  } else if (status == STREAM_STATUS__STOP) {
26!
991
    tstrncpy(dst, "stop", MND_STREAM_TRIGGER_NAME_SIZE);
×
992
  } else if (status == STREAM_STATUS__FAILED) {
26✔
993
    tstrncpy(dst, "failed", MND_STREAM_TRIGGER_NAME_SIZE);
1✔
994
  } else if (status == STREAM_STATUS__RECOVER) {
25!
995
    tstrncpy(dst, "recover", MND_STREAM_TRIGGER_NAME_SIZE);
×
996
  } else if (status == STREAM_STATUS__PAUSE) {
25✔
997
    tstrncpy(dst, "paused", MND_STREAM_TRIGGER_NAME_SIZE);
24✔
998
  } else if (status == STREAM_STATUS__INIT) {
1!
999
    tstrncpy(dst, "init", MND_STREAM_TRIGGER_NAME_SIZE);
2✔
1000
  }
1001
}
35,373✔
1002

1003
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
35,306✔
1004
  int8_t trigger = pStream->conf.trigger;
35,306✔
1005
  if (trigger == STREAM_TRIGGER_AT_ONCE) {
35,306✔
1006
    tstrncpy(dst, "at once", MND_STREAM_TRIGGER_NAME_SIZE);
12,077✔
1007
  } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
23,229✔
1008
    tstrncpy(dst, "window close", MND_STREAM_TRIGGER_NAME_SIZE);
11,626✔
1009
  } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
11,603✔
1010
    tstrncpy(dst, "max delay", MND_STREAM_TRIGGER_NAME_SIZE);
11,563✔
1011
  } else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
40!
1012
    tstrncpy(dst, "force window close", MND_STREAM_TRIGGER_NAME_SIZE);
70✔
1013
  }
1014
}
35,306✔
1015

1016
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
262,633✔
1017
  memset(pBuf, 0, bufLen);
262,633✔
1018
  pBuf[2] = '0';
262,633✔
1019
  pBuf[3] = 'x';
262,633✔
1020

1021
  int32_t len = tintToHex(id, &pBuf[4]);
262,633✔
1022
  varDataSetLen(pBuf, len + 2);
262,961✔
1023
}
262,961✔
1024

1025
static int32_t isAllTaskPaused(SStreamObj *pStream, bool *pRes) {
35,308✔
1026
  int32_t          code = TSDB_CODE_SUCCESS;
35,308✔
1027
  int32_t          lino = 0;
35,308✔
1028
  SStreamTaskIter *pIter = NULL;
35,308✔
1029
  bool             isPaused =  true;
35,308✔
1030

1031
  taosRLockLatch(&pStream->lock);
35,308✔
1032
  code = createStreamTaskIter(pStream, &pIter);
35,401✔
1033
  TSDB_CHECK_CODE(code, lino, _end);
35,384!
1034

1035
  while (streamTaskIterNextTask(pIter)) {
154,711✔
1036
    SStreamTask *pTask = NULL;
118,669✔
1037
    code = streamTaskIterGetCurrent(pIter, &pTask);
118,669✔
1038
    TSDB_CHECK_CODE(code, lino, _end);
119,048!
1039

1040
    STaskId           id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
119,048✔
1041
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
119,048✔
1042
    if (pe == NULL) {
119,327✔
1043
      continue;
116✔
1044
    }
1045
    if (pe->status != TASK_STATUS__PAUSE) {
119,211✔
1046
      isPaused = false;
119,196✔
1047
    }
1048
  }
1049
  (*pRes) = isPaused;
35,301✔
1050

1051
_end:
35,301✔
1052
  destroyStreamTaskIter(pIter);
35,301✔
1053
  taosRUnLockLatch(&pStream->lock);
35,388✔
1054
  if (code != TSDB_CODE_SUCCESS) {
35,397!
1055
    mError("error happens when get stream status, lino:%d, code:%s", lino, tstrerror(code));
×
1056
  }
1057
  return code;
35,397✔
1058
}
1059

1060
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
35,378✔
1061
  int32_t code = 0;
35,378✔
1062
  int32_t cols = 0;
35,378✔
1063
  int32_t lino = 0;
35,378✔
1064

1065
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
35,378✔
1066
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
35,378✔
1067
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,374✔
1068
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,339!
1069

1070
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
35,339✔
1071
  TSDB_CHECK_CODE(code, lino, _end);
35,334!
1072

1073
  // create time
1074
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,334✔
1075
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,318!
1076
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
35,318✔
1077
  TSDB_CHECK_CODE(code, lino, _end);
35,318!
1078

1079
  // stream id
1080
  char buf[128] = {0};
35,318✔
1081
  int64ToHexStr(pStream->uid, buf, tListLen(buf));
35,318✔
1082
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,397✔
1083
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,368!
1084
  code = colDataSetVal(pColInfo, numOfRows, buf, false);
35,368✔
1085
  TSDB_CHECK_CODE(code, lino, _end);
35,354!
1086

1087
  // related fill-history stream id
1088
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,354✔
1089
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,332✔
1090
  if (pStream->hTaskUid != 0) {
35,330!
1091
    int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
×
1092
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1093
  } else {
1094
    code = colDataSetVal(pColInfo, numOfRows, buf, true);
35,330✔
1095
  }
1096
  TSDB_CHECK_CODE(code, lino, _end);
35,341!
1097

1098
  // related fill-history stream id
1099
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
35,341✔
1100
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
35,341✔
1101
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,341✔
1102
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,339!
1103
  code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
35,339✔
1104
  TSDB_CHECK_CODE(code, lino, _end);
35,319!
1105

1106
  char status[20 + VARSTR_HEADER_SIZE] = {0};
35,319✔
1107
  char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
35,319✔
1108
  bool isPaused = false;
35,319✔
1109
  code = isAllTaskPaused(pStream, &isPaused);
35,319✔
1110
  TSDB_CHECK_CODE(code, lino, _end);
35,388!
1111

1112
  int8_t streamStatus = atomic_load_8(&pStream->status);
35,388✔
1113
  if (isPaused && pStream->pTaskList != NULL) {
35,379✔
1114
    streamStatus = STREAM_STATUS__PAUSE;
24✔
1115
  }
1116
  mndShowStreamStatus(status2, streamStatus);
35,379✔
1117
  STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
35,367✔
1118
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,367✔
1119
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,315!
1120

1121
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
35,315✔
1122
  TSDB_CHECK_CODE(code, lino, _end);
35,319!
1123

1124
  char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
35,319✔
1125
  STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
35,319✔
1126
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,310✔
1127
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,318!
1128

1129
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
35,318✔
1130
  TSDB_CHECK_CODE(code, lino, _end);
35,337!
1131

1132
  char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
35,337✔
1133
  STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
35,337✔
1134
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,338✔
1135
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,330!
1136

1137
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
35,330✔
1138
  TSDB_CHECK_CODE(code, lino, _end);
35,341!
1139

1140
  if (pStream->targetSTbName[0] == 0) {
35,341!
UNCOV
1141
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1142
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1143

UNCOV
1144
    code = colDataSetVal(pColInfo, numOfRows, NULL, true);
×
1145
  } else {
1146
    char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
35,341✔
1147
    STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
35,341✔
1148
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,377✔
1149
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,326!
1150

1151
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
35,326✔
1152
  }
1153
  TSDB_CHECK_CODE(code, lino, _end);
35,318!
1154

1155
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,318✔
1156
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,294!
1157

1158
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
35,294✔
1159
  TSDB_CHECK_CODE(code, lino, _end);
35,310!
1160

1161
  char trigger[20 + VARSTR_HEADER_SIZE] = {0};
35,310✔
1162
  char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
35,310✔
1163
  mndShowStreamTrigger(trigger2, pStream);
35,310✔
1164
  STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
35,359✔
1165
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,359✔
1166
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,342!
1167

1168
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
35,342✔
1169
  TSDB_CHECK_CODE(code, lino, _end);
35,349!
1170

1171
  // sink_quota
1172
  char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
35,349✔
1173
  sinkQuota[0] = '0';
35,349✔
1174
  char dstStr[20] = {0};
35,349✔
1175
  STR_TO_VARSTR(dstStr, sinkQuota)
35,349✔
1176
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,349✔
1177
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,360!
1178

1179
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
35,360✔
1180
  TSDB_CHECK_CODE(code, lino, _end);
35,323!
1181

1182
  // checkpoint interval
1183
  char tmp[20 + VARSTR_HEADER_SIZE] = {0};
35,323✔
1184
  (void)tsnprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%d sec", tsStreamCheckpointInterval);
35,323✔
1185
  varDataSetLen(tmp, strlen(varDataVal(tmp)));
35,382✔
1186

1187
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,382✔
1188
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,333!
1189

1190
  code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
35,333✔
1191
  TSDB_CHECK_CODE(code, lino, _end);
35,327!
1192

1193
  // checkpoint backup type
1194
  char backup[20 + VARSTR_HEADER_SIZE] = {0};
35,327✔
1195
  STR_TO_VARSTR(backup, "none")
35,327✔
1196
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,327✔
1197
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,298!
1198

1199
  code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
35,298✔
1200
  TSDB_CHECK_CODE(code, lino, _end);
35,337!
1201

1202
  // history scan idle
1203
  char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
35,337✔
1204
  tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
35,337✔
1205

1206
  memset(dstStr, 0, tListLen(dstStr));
35,337✔
1207
  STR_TO_VARSTR(dstStr, scanHistoryIdle)
35,337✔
1208
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,337✔
1209
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,349!
1210

1211
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
35,349✔
1212
  TSDB_CHECK_CODE(code, lino, _end);
35,338!
1213

1214
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,338✔
1215
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,309!
1216
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
35,371✔
1217
  if (streamStatus == STREAM_STATUS__FAILED){
35,371✔
1218
    STR_TO_VARSTR(msg, pStream->reserve)
1✔
1219
  } else {
1220
    STR_TO_VARSTR(msg, " ")
35,370✔
1221
  }
1222
  code = colDataSetVal(pColInfo, numOfRows, (const char *)msg, false);
35,371✔
1223

1224
_end:
35,333✔
1225
  if (code) {
35,333!
1226
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1227
  }
1228
  return code;
35,334✔
1229
}
1230

1231
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows,
226,235✔
1232
                              int32_t precision) {
1233
  SColumnInfoData *pColInfo = NULL;
226,235✔
1234
  int32_t          cols = 0;
226,235✔
1235
  int32_t          code = 0;
226,235✔
1236
  int32_t          lino = 0;
226,235✔
1237

1238
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
226,235✔
1239

1240
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
226,235✔
1241
  if (pe == NULL) {
226,375!
1242
    mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
×
1243
           " no valid status/stage info",
1244
           id.taskId, pStream->name, pStream->uid, pStream->createTime);
1245
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1246
  }
1247

1248
  // stream name
1249
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
226,375✔
1250
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
226,375✔
1251

1252
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,409✔
1253
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
226,112!
1254

1255
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
226,112✔
1256
  TSDB_CHECK_CODE(code, lino, _end);
226,040!
1257

1258
  // task id
1259
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,040✔
1260
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,858!
1261

1262
  char idstr[128] = {0};
225,858✔
1263
  int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
225,858✔
1264
  code = colDataSetVal(pColInfo, numOfRows, idstr, false);
226,440✔
1265
  TSDB_CHECK_CODE(code, lino, _end);
226,097!
1266

1267
  // node type
1268
  char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
226,097✔
1269
  varDataSetLen(nodeType, 5);
226,097✔
1270
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,097✔
1271
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,945!
1272

1273
  if (pTask->info.nodeId > 0) {
226,006✔
1274
    memcpy(varDataVal(nodeType), "vnode", 5);
207,932✔
1275
  } else {
1276
    memcpy(varDataVal(nodeType), "snode", 5);
18,074✔
1277
  }
1278
  code = colDataSetVal(pColInfo, numOfRows, nodeType, false);
226,006✔
1279
  TSDB_CHECK_CODE(code, lino, _end);
226,033!
1280

1281
  // node id
1282
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,033✔
1283
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,884!
1284

1285
  int64_t nodeId = TMAX(pTask->info.nodeId, 0);
225,884✔
1286
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
225,884✔
1287
  TSDB_CHECK_CODE(code, lino, _end);
225,863!
1288

1289
  // level
1290
  char level[20 + VARSTR_HEADER_SIZE] = {0};
225,863✔
1291
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
225,863✔
1292
    STR_WITH_SIZE_TO_VARSTR(level, "source", 6);
115,826✔
1293
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
110,037✔
1294
    STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
19,803✔
1295
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
90,234!
1296
    STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
90,528✔
1297
  }
1298

1299
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,863✔
1300
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,775!
1301

1302
  code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
225,775✔
1303
  TSDB_CHECK_CODE(code, lino, _end);
226,037!
1304

1305
  // status
1306
  char status[20 + VARSTR_HEADER_SIZE] = {0};
226,037✔
1307

1308
  const char *pStatus = streamTaskGetStatusStr(pe->status);
226,037✔
1309
  STR_TO_VARSTR(status, pStatus);
226,060✔
1310

1311
  // status
1312
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,060✔
1313
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,987!
1314

1315
  code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
225,987✔
1316
  TSDB_CHECK_CODE(code, lino, _end);
226,005!
1317

1318
  // stage
1319
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,005✔
1320
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,856!
1321

1322
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
225,856✔
1323
  TSDB_CHECK_CODE(code, lino, _end);
225,857!
1324

1325
  // input queue
1326
  char        vbuf[TSDB_STREAM_NOTIFY_STAT_LEN + 2] = {0};
225,857✔
1327
  char        buf[TSDB_STREAM_NOTIFY_STAT_LEN] = {0};
225,857✔
1328
  const char *queueInfoStr = "%4.2f MiB (%6.2f%)";
225,857✔
1329
  snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate);
225,857✔
1330
  STR_TO_VARSTR(vbuf, buf);
225,857✔
1331

1332
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,857✔
1333
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
226,084!
1334

1335
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
226,084✔
1336
  TSDB_CHECK_CODE(code, lino, _end);
226,037!
1337

1338
  // input total
1339
  const char *formatTotalMb = "%7.2f MiB";
226,037✔
1340
  const char *formatTotalGb = "%7.2f GiB";
226,037✔
1341
  if (pe->procsTotal < 1024) {
226,037!
1342
    snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal);
226,055✔
1343
  } else {
1344
    snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024);
×
1345
  }
1346

1347
  memset(vbuf, 0, tListLen(vbuf));
226,037✔
1348
  STR_TO_VARSTR(vbuf, buf);
226,037✔
1349

1350
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,037✔
1351
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
226,141!
1352

1353
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
226,141✔
1354
  TSDB_CHECK_CODE(code, lino, _end);
226,052!
1355

1356
  // process throughput
1357
  const char *formatKb = "%7.2f KiB/s";
226,052✔
1358
  const char *formatMb = "%7.2f MiB/s";
226,052✔
1359
  if (pe->procsThroughput < 1024) {
226,052✔
1360
    snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput);
225,880✔
1361
  } else {
1362
    snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024);
172✔
1363
  }
1364

1365
  memset(vbuf, 0, tListLen(vbuf));
226,052✔
1366
  STR_TO_VARSTR(vbuf, buf);
226,052✔
1367

1368
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,052✔
1369
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
226,153!
1370

1371
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
226,153✔
1372
  TSDB_CHECK_CODE(code, lino, _end);
226,037!
1373

1374
  // output total
1375
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,037✔
1376
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,860!
1377

1378
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
226,067✔
1379
    colDataSetNULL(pColInfo, numOfRows);
90,500!
1380
  } else {
1381
    (void)tsnprintf(buf, sizeof(buf), formatTotalMb, pe->outputTotal);
135,567✔
1382
    memset(vbuf, 0, tListLen(vbuf));
135,814✔
1383
    STR_TO_VARSTR(vbuf, buf);
135,814✔
1384

1385
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
135,814✔
1386
    TSDB_CHECK_CODE(code, lino, _end);
135,685!
1387
  }
1388

1389
  // output throughput
1390
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,185✔
1391
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,928!
1392

1393
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
226,102✔
1394
    colDataSetNULL(pColInfo, numOfRows);
90,488!
1395
  } else {
1396
    if (pe->outputThroughput < 1024) {
135,614✔
1397
      snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput);
135,567✔
1398
    } else {
1399
      snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024);
47✔
1400
    }
1401

1402
    memset(vbuf, 0, tListLen(vbuf));
135,614✔
1403
    STR_TO_VARSTR(vbuf, buf);
135,614✔
1404

1405
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
135,614✔
1406
    TSDB_CHECK_CODE(code, lino, _end);
135,675!
1407
  }
1408
  // info
1409
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
226,163✔
1410
    const char *sinkStr = "%.2f MiB";
90,491✔
1411
    snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
90,491✔
1412
  } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {  // offset info
135,672✔
1413
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
115,903✔
1414
      int32_t ret = taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision);
5,715✔
1415
      if (ret != 0) {
5,715!
1416
        mError("failed to format processed timewindow, skey:%" PRId64, pe->processedVer);
×
1417
        memset(buf, 0, tListLen(buf));
×
1418
      }
1419
    } else {
1420
      const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
110,188✔
1421
      snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
110,188✔
1422
    }
1423
  } else {
1424
    memset(buf, 0, tListLen(buf));
19,769✔
1425
  }
1426

1427
  STR_TO_VARSTR(vbuf, buf);
226,163✔
1428

1429
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,163✔
1430
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
226,138!
1431

1432
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
226,138✔
1433
  TSDB_CHECK_CODE(code, lino, _end);
226,038!
1434

1435
  // start_time
1436
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
226,038✔
1437
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,930!
1438

1439
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false);
225,930✔
1440
  TSDB_CHECK_CODE(code, lino, _end);
225,828!
1441

1442
  // start id
1443
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,828✔
1444
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,684!
1445

1446
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false);
225,684✔
1447
  TSDB_CHECK_CODE(code, lino, _end);
225,809!
1448

1449
  // start ver
1450
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,809✔
1451
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,680!
1452

1453
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false);
225,680✔
1454
  TSDB_CHECK_CODE(code, lino, _end);
225,750!
1455

1456
  // checkpoint time
1457
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,750✔
1458
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,627!
1459

1460
  if (pe->checkpointInfo.latestTime != 0) {
225,636✔
1461
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
181,743✔
1462
  } else {
1463
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
43,893✔
1464
  }
1465
  TSDB_CHECK_CODE(code, lino, _end);
225,749!
1466

1467
  // checkpoint_id
1468
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,749✔
1469
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,640!
1470

1471
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false);
225,640✔
1472
  TSDB_CHECK_CODE(code, lino, _end);
225,748!
1473

1474
  // checkpoint version
1475
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,748✔
1476
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,646!
1477

1478
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false);
225,646✔
1479
  TSDB_CHECK_CODE(code, lino, _end);
225,792!
1480

1481
  // checkpoint size
1482
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,792✔
1483
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,622!
1484

1485
  colDataSetNULL(pColInfo, numOfRows);
225,622!
1486

1487
  // checkpoint backup status
1488
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,622✔
1489
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,760!
1490

1491
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
225,760✔
1492
  TSDB_CHECK_CODE(code, lino, _end);
225,865!
1493

1494
  // ds_err_info
1495
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,865✔
1496
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,870!
1497

1498
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
225,870✔
1499
  TSDB_CHECK_CODE(code, lino, _end);
225,884!
1500

1501
  // history_task_id
1502
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,884✔
1503
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,778!
1504

1505
  if (pe->hTaskId != 0) {
225,798✔
1506
    int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
1,120✔
1507
    code = colDataSetVal(pColInfo, numOfRows, idstr, false);
1,120✔
1508
  } else {
1509
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
224,678✔
1510
  }
1511
  TSDB_CHECK_CODE(code, lino, _end);
225,883!
1512

1513
  // history_task_status
1514
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,883✔
1515
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
225,825!
1516

1517
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
225,825✔
1518
  TSDB_CHECK_CODE(code, lino, _end);
225,992!
1519

1520
  // notify_event_stat
1521
  int32_t offset =0;
225,992✔
1522
  if (pe->notifyEventStat.notifyEventAddTimes > 0) {
225,992!
1523
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Add %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1524
                        pe->notifyEventStat.notifyEventAddTimes, pe->notifyEventStat.notifyEventAddElems,
1525
                        pe->notifyEventStat.notifyEventAddCostSec);
1526
  }
1527
  if (pe->notifyEventStat.notifyEventPushTimes > 0) {
225,992!
1528
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Push %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1529
                        pe->notifyEventStat.notifyEventPushTimes, pe->notifyEventStat.notifyEventPushElems,
1530
                        pe->notifyEventStat.notifyEventPushCostSec);
1531
  }
1532
  if (pe->notifyEventStat.notifyEventPackTimes > 0) {
225,992!
1533
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Pack %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1534
                        pe->notifyEventStat.notifyEventPackTimes, pe->notifyEventStat.notifyEventPackElems,
1535
                        pe->notifyEventStat.notifyEventPackCostSec);
1536
  }
1537
  if (pe->notifyEventStat.notifyEventSendTimes > 0) {
225,992!
1538
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Send %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1539
                        pe->notifyEventStat.notifyEventSendTimes, pe->notifyEventStat.notifyEventSendElems,
1540
                        pe->notifyEventStat.notifyEventSendCostSec);
1541
  }
1542
  if (pe->notifyEventStat.notifyEventHoldElems > 0) {
225,992!
1543
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "[Hold %" PRId64 " elems] ",
×
1544
                        pe->notifyEventStat.notifyEventHoldElems);
1545
  }
1546
  TSDB_CHECK_CONDITION(offset < sizeof(buf), code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
225,992!
1547
  buf[offset] = '\0';
225,992✔
1548

1549
  STR_TO_VARSTR(vbuf, buf);
225,992✔
1550

1551
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
225,992✔
1552
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
226,176!
1553

1554
  if (offset == 0) {
226,181!
1555
    colDataSetNULL(pColInfo, numOfRows);
226,181!
1556
  } else {
1557
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
1558
    TSDB_CHECK_CODE(code, lino, _end);
×
1559
  }
1560

1561
_end:
×
1562
  if (code) {
226,181!
1563
    mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code));
×
1564
  }
1565
  return code;
226,166✔
1566
}
1567

1568
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
14,367✔
1569
  const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
14,367✔
1570
  const SEp *p = GET_ACTIVE_EP(pCurrent);
14,367✔
1571

1572
  if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
14,367!
1573
    return false;
14,367✔
1574
  }
1575
  return true;
×
1576
}
1577

1578
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo) {
2,993✔
1579
  if (pInfo != NULL) {
2,993!
1580
    taosArrayDestroy(pInfo->pUpdateNodeList);
2,993✔
1581
    taosHashCleanup(pInfo->pDBMap);
2,993✔
1582
  }
1583
}
2,993✔
1584

1585
// 1. increase the replica does not affect the stream process.
1586
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
1587
// tasks on the will be removed replica.
1588
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
1589
// will handle it as mentioned in 1 & 2 items.
1590
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
2,993✔
1591
                               SVgroupChangeInfo *pInfo) {
1592
  int32_t code = 0;
2,993✔
1593
  int32_t lino = 0;
2,993✔
1594

1595
  if (pInfo == NULL) {
2,993!
1596
    return TSDB_CODE_INVALID_PARA;
×
1597
  }
1598

1599
  pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo));
2,993✔
1600
  pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
2,993✔
1601

1602
  if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
2,993!
1603
    mndDestroyVgroupChangeInfo(pInfo);
×
1604
    TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
×
1605
  }
1606

1607
  int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
2,993✔
1608
  for (int32_t i = 0; i < numOfNodes; ++i) {
18,108✔
1609
    SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
15,115✔
1610
    if (pPrevEntry == NULL) {
15,115!
1611
      continue;
×
1612
    }
1613

1614
    int32_t num = taosArrayGetSize(pNodeList);
15,115✔
1615
    for (int32_t j = 0; j < num; ++j) {
205,263✔
1616
      SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
204,532✔
1617
      if (pCurrent == NULL) {
204,532!
1618
        continue;
×
1619
      }
1620

1621
      if (pCurrent->nodeId == pPrevEntry->nodeId) {
204,532✔
1622
        if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
14,384!
1623
          const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
17✔
1624

1625
          char buf[256] = {0};
17✔
1626
          code = epsetToStr(&pCurrent->epset, buf, tListLen(buf));  // ignore this error
17✔
1627
          if (code) {
17!
1628
            mError("failed to convert epset string, code:%s", tstrerror(code));
×
1629
            TSDB_CHECK_CODE(code, lino, _err);
×
1630
          }
1631

1632
          mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
17✔
1633
                 pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
1634

1635
          SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
17✔
1636
          epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
17✔
1637
          epsetAssign(&updateInfo.newEp, &pCurrent->epset);
17✔
1638

1639
          void *p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
17✔
1640
          TSDB_CHECK_NULL(p, code, lino, _err, terrno);
17!
1641
        }
1642

1643
        // todo handle the snode info
1644
        if (pCurrent->nodeId != SNODE_HANDLE) {
14,384✔
1645
          SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
12,568✔
1646
          code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
12,568✔
1647
          mndReleaseVgroup(pMnode, pVgroup);
12,568✔
1648
          TSDB_CHECK_CODE(code, lino, _err);
12,568!
1649
        }
1650

1651
        break;
14,384✔
1652
      }
1653
    }
1654
  }
1655

1656
  return code;
2,993✔
1657

1658
_err:
×
1659
  mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
×
1660
  mndDestroyVgroupChangeInfo(pInfo);
×
1661
  return code;
×
1662
}
1663

1664
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
2,198✔
1665
  bool              allReady = false;
2,198✔
1666
  bool              nodeUpdated = false;
2,198✔
1667
  SVgroupChangeInfo changeInfo = {0};
2,198✔
1668

1669
  int32_t numOfNodes = extractStreamNodeList(pMnode);
2,198✔
1670

1671
  if (numOfNodes == 0) {
2,198✔
1672
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
764✔
1673
    execInfo.ts = taosGetTimestampSec();
764✔
1674
    return false;
764✔
1675
  }
1676

1677
  for (int32_t i = 0; i < numOfNodes; ++i) {
8,539✔
1678
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
7,118✔
1679
    if (pNodeEntry == NULL) {
7,118!
1680
      continue;
×
1681
    }
1682

1683
    if (pNodeEntry->stageUpdated) {
7,118✔
1684
      mDebug("stream task not ready due to node update detected, checkpoint not issued");
13✔
1685
      return true;
13✔
1686
    }
1687
  }
1688

1689
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
1,421✔
1690
  if (code) {
1,421!
1691
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
1692
  }
1693

1694
  if (!allReady) {
1,421✔
1695
    mWarn("not all vnodes ready, quit from vnodes status check");
29!
1696
    return true;
29✔
1697
  }
1698

1699
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
1,392✔
1700
  if (code) {
1,392!
1701
    nodeUpdated = false;
×
1702
  } else {
1703
    nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
1,392✔
1704
    if (nodeUpdated) {
1,392!
1705
      mDebug("stream tasks not ready due to node update");
×
1706
    }
1707
  }
1708

1709
  mndDestroyVgroupChangeInfo(&changeInfo);
1,392✔
1710
  return nodeUpdated;
1,392✔
1711
}
1712

1713
// check if the node update happens or not
1714
bool mndStreamNodeIsUpdated(SMnode *pMnode) {
2,198✔
1715
  SArray *pNodeSnapshot = NULL;
2,198✔
1716

1717
  streamMutexLock(&execInfo.lock);
2,198✔
1718
  bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
2,198✔
1719
  streamMutexUnlock(&execInfo.lock);
2,198✔
1720

1721
  taosArrayDestroy(pNodeSnapshot);
2,198✔
1722
  return updated;
2,198✔
1723
}
1724

1725
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
1,783✔
1726
  SSdb      *pSdb = pMnode->pSdb;
1,783✔
1727
  void      *pIter = NULL;
1,783✔
1728
  SSnodeObj *pObj = NULL;
1,783✔
1729

1730
  if (pSrcDb->cfg.replications == 1) {
1,783✔
1731
    return TSDB_CODE_SUCCESS;
1,780✔
1732
  } else {
1733
    while (1) {
1734
      pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
3✔
1735
      if (pIter == NULL) {
3✔
1736
        break;
2✔
1737
      }
1738

1739
      sdbRelease(pSdb, pObj);
1✔
1740
      sdbCancelFetch(pSdb, pIter);
1✔
1741
      return TSDB_CODE_SUCCESS;
1✔
1742
    }
1743

1744
    mError("snode not existed when trying to create stream in db with multiple replica");
2!
1745
    return TSDB_CODE_SNODE_NOT_DEPLOYED;
2✔
1746
  }
1747
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc