• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3615

18 Feb 2025 07:41AM UTC coverage: 62.953% (+1.6%) from 61.4%
#3615

push

travis-ci

web-flow
Merge pull request #29812 from taosdata/doc/analysis

doc: update tdgpt doc.

146885 of 299602 branches covered (49.03%)

Branch coverage included in aggregate %.

230802 of 300346 relevant lines covered (76.85%)

17263824.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.5
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
struct SStreamTaskIter {
25
  SStreamObj  *pStream;
26
  int32_t      level;
27
  int32_t      ordinalIndex;
28
  int32_t      totalLevel;
29
  SStreamTask *pTask;
30
};
31

32
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
33

34
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter) {
105,772✔
35
  *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
105,772!
36
  if (*pIter == NULL) {
105,762!
37
    return terrno;
×
38
  }
39

40
  (*pIter)->level = -1;
105,762✔
41
  (*pIter)->ordinalIndex = 0;
105,762✔
42
  (*pIter)->pStream = pStream;
105,762✔
43
  (*pIter)->totalLevel = taosArrayGetSize(pStream->tasks);
105,762✔
44
  (*pIter)->pTask = NULL;
105,743✔
45

46
  return 0;
105,743✔
47
}
48

49
bool streamTaskIterNextTask(SStreamTaskIter *pIter) {
476,998✔
50
  if (pIter->level >= pIter->totalLevel) {
476,998!
51
    pIter->pTask = NULL;
×
52
    return false;
×
53
  }
54

55
  if (pIter->level == -1) {
476,998✔
56
    pIter->level += 1;
105,740✔
57
  }
58

59
  while (pIter->level < pIter->totalLevel) {
684,168✔
60
    SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
577,993✔
61
    if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
577,236✔
62
      pIter->level += 1;
207,170✔
63
      pIter->ordinalIndex = 0;
207,170✔
64
      pIter->pTask = NULL;
207,170✔
65
      continue;
207,170✔
66
    }
67

68
    pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
370,935✔
69
    pIter->ordinalIndex += 1;
370,809✔
70
    return true;
370,809✔
71
  }
72

73
  pIter->pTask = NULL;
106,175✔
74
  return false;
106,175✔
75
}
76

77
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask) {
370,961✔
78
  if (pTask) {
370,961!
79
    *pTask = pIter->pTask;
370,982✔
80
    if (*pTask != NULL) {
370,982!
81
      return TSDB_CODE_SUCCESS;
371,004✔
82
    }
83
  }
84

85
  return TSDB_CODE_INVALID_PARA;
×
86
}
87

88
void destroyStreamTaskIter(SStreamTaskIter *pIter) { taosMemoryFree(pIter); }
105,600!
89

90
static bool checkStatusForEachReplica(SVgObj *pVgroup) {
243,073✔
91
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
490,356✔
92
    if (!pVgroup->vnodeGid[i].syncRestore) {
248,353✔
93
      mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
1,064!
94
      return false;
1,064✔
95
    }
96

97
    ESyncState state = pVgroup->vnodeGid[i].syncState;
247,289✔
98
    if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
247,289!
99
        state == TAOS_SYNC_STATE_CANDIDATE) {
100
      mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
6!
101
            state);
102
      return false;
6✔
103
    }
104
  }
105

106
  return true;
242,003✔
107
}
108

109
static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) {
13,182✔
110
  SSnodeObj *pObj = NULL;
13,182✔
111
  void      *pIter = NULL;
13,182✔
112
  int32_t    code = 0;
13,182✔
113

114
  while (1) {
6,309✔
115
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
19,491✔
116
    if (pIter == NULL) {
19,491✔
117
      break;
13,182✔
118
    }
119

120
    SNodeEntry entry = {.nodeId = SNODE_HANDLE};
6,309✔
121
    code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
6,309✔
122
    if (code) {
6,309!
123
      sdbRelease(pMnode->pSdb, pObj);
×
124
      sdbCancelFetch(pMnode->pSdb, pIter);
×
125
      mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
×
126
      return code;
×
127
    }
128

129
    char buf[256] = {0};
6,309✔
130
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
6,309✔
131
    if (code != 0) {  // print error and continue
6,309!
132
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
133
    }
134

135
    void *p = taosArrayPush(pVgroupList, &entry);
6,309✔
136
    if (p == NULL) {
6,309!
137
      code = terrno;
×
138
      sdbRelease(pMnode->pSdb, pObj);
×
139
      sdbCancelFetch(pMnode->pSdb, pIter);
×
140
      mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
×
141
      return code;
×
142
    } else {
143
      mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
6,309✔
144
    }
145

146
    sdbRelease(pMnode->pSdb, pObj);
6,309✔
147
  }
148

149
  return code;
13,182✔
150
}
151

152
static int32_t mndCheckMnodeStatus(SMnode* pMnode) {
13,182✔
153
  int32_t    code = 0;
13,182✔
154
  ESdbStatus objStatus;
155
  void      *pIter = NULL;
13,182✔
156
  SMnodeObj *pObj = NULL;
13,182✔
157

158
  while (1) {
159
    pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true);
26,949✔
160
    if (pIter == NULL) {
26,949✔
161
      break;
13,047✔
162
    }
163

164
    if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) {
13,902✔
165
      mDebug("mnode sync state:%d not leader/follower", pObj->syncState);
133!
166
      sdbRelease(pMnode->pSdb, pObj);
133✔
167
      sdbCancelFetch(pMnode->pSdb, pIter);
133✔
168
      return TSDB_CODE_FAILED;
133✔
169
    }
170

171
    if (objStatus != SDB_STATUS_READY) {
13,769✔
172
      mWarn("mnode status:%d not ready", objStatus);
2!
173
      sdbRelease(pMnode->pSdb, pObj);
2✔
174
      sdbCancelFetch(pMnode->pSdb, pIter);
2✔
175
      return TSDB_CODE_FAILED;
2✔
176
    }
177

178
    sdbRelease(pMnode->pSdb, pObj);
13,767✔
179
  }
180

181
  return TSDB_CODE_SUCCESS;
13,047✔
182
}
183

184
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) {
13,182✔
185
  SSdb     *pSdb = pMnode->pSdb;
13,182✔
186
  void     *pIter = NULL;
13,182✔
187
  SVgObj   *pVgroup = NULL;
13,182✔
188
  int32_t   code = 0;
13,182✔
189
  SHashObj *pHash = NULL;
13,182✔
190

191
  pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
13,182✔
192
  if (pHash == NULL) {
13,182!
193
    mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
×
194
    return terrno;
×
195
  }
196

197
  while (1) {
245,593✔
198
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
258,775✔
199
    if (pIter == NULL) {
258,775✔
200
      break;
13,182✔
201
    }
202

203
    SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
245,593✔
204
    entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
245,593✔
205

206
    int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
245,593✔
207
    if (pReplica == NULL) {  // not exist, add it into hash map
245,593✔
208
      code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
122,909✔
209
      if (code) {
122,909!
210
        mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
×
211
        sdbRelease(pSdb, pVgroup);
×
212
        sdbCancelFetch(pSdb, pIter);
×
213
        goto _end;  // take snapshot failed, and not all ready
×
214
      }
215
    } else {
216
      if (*pReplica != pVgroup->replica) {
122,684✔
217
        mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
559!
218
              pVgroup->vgId, pVgroup->replica, *pReplica);
219
        *allReady = false;  // task snap success, but not all ready
559✔
220
      }
221
    }
222

223
    // if not all ready till now, no need to check the remaining vgroups,
224
    // but still we need to put the info of the existed vgroups into the snapshot list
225
    if (*allReady) {
245,593✔
226
      *allReady = checkStatusForEachReplica(pVgroup);
243,073✔
227
    }
228

229
    char buf[256] = {0};
245,593✔
230
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
245,593✔
231
    if (code != 0) {  // print error and continue
245,593!
232
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
233
    }
234

235
    void *p = taosArrayPush(pVgroupList, &entry);
245,593✔
236
    if (p == NULL) {
245,593!
237
      mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
×
238
      code = terrno;
×
239
      sdbRelease(pSdb, pVgroup);
×
240
      sdbCancelFetch(pSdb, pIter);
×
241
      goto _end;
×
242
    } else {
243
      mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
245,593✔
244
    }
245

246
    sdbRelease(pSdb, pVgroup);
245,593✔
247
  }
248

249
_end:
13,182✔
250
  taosHashCleanup(pHash);
13,182✔
251
  return code;
13,182✔
252
}
253

254
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
13,182✔
255
  int32_t   code = 0;
13,182✔
256
  SArray   *pVgroupList = NULL;
13,182✔
257

258
  *pList = NULL;
13,182✔
259
  *allReady = true;
13,182✔
260

261
  pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
13,182✔
262
  if (pVgroupList == NULL) {
13,182!
263
    mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
×
264
    code = terrno;
×
265
    goto _err;
×
266
  }
267

268
  // 1. check for all vnodes status
269
  code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady);
13,182✔
270
  if (code) {
13,182!
271
    goto _err;
×
272
  }
273

274
  // 2. add snode info
275
  code = mndAddSnodeInfo(pMnode, pVgroupList);
13,182✔
276
  if (code) {
13,182!
277
    goto _err;
×
278
  }
279

280
  // 3. check for mnode status
281
  code = mndCheckMnodeStatus(pMnode);
13,182✔
282
  if (code != TSDB_CODE_SUCCESS) {
13,182✔
283
    *allReady = false;
135✔
284
  }
285

286
  *pList = pVgroupList;
13,182✔
287
  return code;
13,182✔
288

289
_err:
×
290
  *allReady = false;
×
291
  taosArrayDestroy(pVgroupList);
×
292
  return code;
×
293
}
294

295
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
9,985✔
296
  void *pIter = NULL;
9,985✔
297
  SSdb *pSdb = pMnode->pSdb;
9,985✔
298
  *pStream = NULL;
9,985✔
299

300
  SStreamObj *p = NULL;
9,985✔
301
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
17,701✔
302
    if (p->uid == streamId) {
17,697✔
303
      sdbCancelFetch(pSdb, pIter);
9,981✔
304
      *pStream = p;
9,981✔
305
      return TSDB_CODE_SUCCESS;
9,981✔
306
    }
307
    sdbRelease(pSdb, p);
7,716✔
308
  }
309

310
  return TSDB_CODE_STREAM_TASK_NOT_EXIST;
4✔
311
}
312

313
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
×
314
  STrans *pTrans = mndAcquireTrans(pMnode, transId);
×
315
  if (pTrans != NULL) {
×
316
    mInfo("kill active transId:%d in Db:%s", transId, pDbName);
×
317
    int32_t code = mndKillTrans(pMnode, pTrans);
×
318
    mndReleaseTrans(pMnode, pTrans);
×
319
    if (code) {
×
320
      mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
×
321
    }
322
  } else {
323
    mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
×
324
  }
325
}
×
326

327
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
16,739✔
328
  *hasEpset = false;
16,739✔
329

330
  pEpSet->numOfEps = 0;
16,739✔
331
  if (nodeId == SNODE_HANDLE) {
16,739✔
332
    SSnodeObj *pObj = NULL;
256✔
333
    void      *pIter = NULL;
256✔
334

335
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
256✔
336
    if (pIter != NULL) {
256!
337
      int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
256✔
338
      sdbRelease(pMnode->pSdb, pObj);
256✔
339
      sdbCancelFetch(pMnode->pSdb, pIter);
256✔
340
      if (code) {
256!
341
        *hasEpset = false;
×
342
        mError("failed to set epset");
×
343
      } else {
344
        *hasEpset = true;
256✔
345
      }
346
      return code;
256✔
347
    } else {
348
      mError("failed to acquire snode epset");
×
349
      return TSDB_CODE_INVALID_PARA;
×
350
    }
351
  } else {
352
    SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
16,483✔
353
    if (pVgObj != NULL) {
16,483✔
354
      SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
16,482✔
355
      mndReleaseVgroup(pMnode, pVgObj);
16,482✔
356

357
      epsetAssign(pEpSet, &epset);
16,482✔
358
      *hasEpset = true;
16,482✔
359
      return TSDB_CODE_SUCCESS;
16,482✔
360
    } else {
361
      mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
1!
362
      return TSDB_CODE_SUCCESS;
1✔
363
    }
364
  }
365
}
366

367
int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
164✔
368
  *pTask = NULL;
164✔
369

370
  SStreamTask     *p = NULL;
164✔
371
  SStreamTaskIter *pIter = NULL;
164✔
372
  int32_t          code = createStreamTaskIter(pStream, &pIter);
164✔
373
  if (code) {
164!
374
    mError("failed to create stream task iter:%s", pStream->name);
×
375
    return code;
×
376
  }
377

378
  while (streamTaskIterNextTask(pIter)) {
631!
379
    code = streamTaskIterGetCurrent(pIter, &p);
631✔
380
    if (code) {
631!
381
      continue;
×
382
    }
383

384
    if (p->id.taskId == pId->taskId) {
631✔
385
      destroyStreamTaskIter(pIter);
164✔
386
      *pTask = p;
164✔
387
      return 0;
164✔
388
    }
389
  }
390

391
  destroyStreamTaskIter(pIter);
×
392
  return TSDB_CODE_FAILED;
×
393
}
394

395
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
74,275✔
396
  int32_t num = 0;
74,275✔
397
  for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
220,449✔
398
    SArray *pLevel = taosArrayGetP(pStream->tasks, i);
146,113✔
399
    num += taosArrayGetSize(pLevel);
146,219✔
400
  }
401

402
  return num;
74,186✔
403
}
404

405
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
66✔
406
  SSdb   *pSdb = pMnode->pSdb;
66✔
407
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
66✔
408
  if (pDb == NULL) {
66!
409
    TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
×
410
  }
411

412
  int32_t numOfStreams = 0;
66✔
413
  void   *pIter = NULL;
66✔
414
  while (1) {
×
415
    SStreamObj *pStream = NULL;
66✔
416
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
66✔
417
    if (pIter == NULL) break;
66!
418

419
    if (pStream->sourceDbUid == pDb->uid) {
×
420
      numOfStreams++;
×
421
    }
422

423
    sdbRelease(pSdb, pStream);
×
424
  }
425

426
  *pNumOfStreams = numOfStreams;
66✔
427
  mndReleaseDb(pMnode, pDb);
66✔
428
  return 0;
66✔
429
}
430

431
static void freeTaskList(void *param) {
1,232✔
432
  SArray **pList = (SArray **)param;
1,232✔
433
  taosArrayDestroy(*pList);
1,232✔
434
}
1,232✔
435

436
int32_t mndInitExecInfo() {
1,737✔
437
  int32_t code = taosThreadMutexInit(&execInfo.lock, NULL);
1,737✔
438
  if (code) {
1,737!
439
    return code;
×
440
  }
441

442
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
1,737✔
443

444
  execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
1,737✔
445
  execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
1,737✔
446
  execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
1,737✔
447
  execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
1,737✔
448
  execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,737✔
449
  execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,737✔
450
  execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
1,737✔
451
  execInfo.pKilledChkptTrans = taosArrayInit(4, sizeof(SStreamTaskResetMsg));
1,737✔
452

453
  if (execInfo.pTaskList == NULL || execInfo.pTaskMap == NULL || execInfo.transMgmt.pDBTrans == NULL ||
1,737!
454
      execInfo.pTransferStateStreams == NULL || execInfo.pChkptStreams == NULL || execInfo.pStreamConsensus == NULL ||
1,737!
455
      execInfo.pNodeList == NULL || execInfo.pKilledChkptTrans == NULL) {
1,737!
456
    mError("failed to initialize the stream runtime env, code:%s", tstrerror(terrno));
×
457
    return terrno;
×
458
  }
459

460
  execInfo.role = NODE_ROLE_UNINIT;
1,737✔
461
  execInfo.switchFromFollower = false;
1,737✔
462

463
  taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
1,737✔
464
  taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
1,737✔
465
  taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
1,737✔
466
  return 0;
1,737✔
467
}
468

469
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
1,260✔
470
  SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
1,260✔
471
  if (pValidList == NULL) {  // not continue
1,260!
472
    return;
×
473
  }
474

475
  int32_t size = taosArrayGetSize(pNodeSnapshot);
1,260✔
476
  int32_t oldSize = taosArrayGetSize(execInfo.pNodeList);
1,260✔
477

478
  for (int32_t i = 0; i < oldSize; ++i) {
8,631✔
479
    SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
7,371✔
480
    if (p == NULL) {
7,371!
481
      continue;
×
482
    }
483

484
    for (int32_t j = 0; j < size; ++j) {
100,572✔
485
      SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
99,645✔
486
      if (pEntry == NULL) {
99,645!
487
        continue;
×
488
      }
489

490
      if (pEntry->nodeId == p->nodeId) {
99,645✔
491
        p->hbTimestamp = pEntry->hbTimestamp;
6,444✔
492

493
        void *px = taosArrayPush(pValidList, p);
6,444✔
494
        if (px == NULL) {
6,444!
495
          mError("failed to put node into list, nodeId:%d", p->nodeId);
×
496
        } else {
497
          mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
6,444✔
498
        }
499
        break;
6,444✔
500
      }
501
    }
502
  }
503

504
  taosArrayDestroy(execInfo.pNodeList);
1,260✔
505
  execInfo.pNodeList = pValidList;
1,260✔
506

507
  mDebug("remain %d valid node entries after clean expired nodes info, prev size:%d",
1,260✔
508
         (int32_t)taosArrayGetSize(pValidList), oldSize);
509
}
510

511
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
6,184✔
512
  void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
6,184✔
513
  if (p == NULL) {
6,184✔
514
    return TSDB_CODE_SUCCESS;
130✔
515
  }
516

517
  int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
6,054✔
518
  if (code) {
6,054!
519
    return code;
×
520
  }
521

522
  for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
14,073!
523
    STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
14,073✔
524
    if (pId == NULL) {
14,073!
525
      continue;
×
526
    }
527

528
    if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
14,073!
529
      taosArrayRemove(pExecNode->pTaskList, k);
6,054✔
530

531
      int32_t num = taosArrayGetSize(pExecNode->pTaskList);
6,054✔
532
      mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
6,054!
533
      break;
6,054✔
534
    }
535
  }
536

537
  return TSDB_CODE_SUCCESS;
6,054✔
538
}
539

540
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo) {
1,260✔
541
  for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
1,260!
542
    STaskId *pId = taosArrayGet(pTaskIds, i);
×
543
    if (pId == NULL) {
×
544
      continue;
×
545
    }
546

547
    int32_t code = doRemoveTasks(pExecInfo, pId);
×
548
    if (code) {
×
549
      mError("failed to remove task in buffer list, 0x%" PRIx64, pId->taskId);
×
550
    }
551
  }
552
}
1,260✔
553

554
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,185✔
555
  SStreamTaskIter *pIter = NULL;
1,185✔
556
  streamMutexLock(&pExecNode->lock);
1,185✔
557

558
  // 1. remove task entries
559
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,185✔
560
  if (code) {
1,185!
561
    streamMutexUnlock(&pExecNode->lock);
×
562
    mError("failed to create stream task iter:%s", pStream->name);
×
563
    return;
×
564
  }
565

566
  while (streamTaskIterNextTask(pIter)) {
7,369✔
567
    SStreamTask *pTask = NULL;
6,184✔
568
    code = streamTaskIterGetCurrent(pIter, &pTask);
6,184✔
569
    if (code) {
6,184!
570
      continue;
×
571
    }
572

573
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
6,184✔
574
    code = doRemoveTasks(pExecNode, &id);
6,184✔
575
    if (code) {
6,184!
576
      mError("failed to remove task in buffer list, 0x%" PRIx64, id.taskId);
×
577
    }
578
  }
579

580
  if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) {
1,185!
581
    streamMutexUnlock(&pExecNode->lock);
×
582
    destroyStreamTaskIter(pIter);
×
583
    mError("task map size, task list size, not equal");
×
584
    return;
×
585
  }
586

587
  // 2. remove stream entry in consensus hash table and checkpoint-report hash table
588
  code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
1,185✔
589
  if (code) {
1,185!
590
    mError("failed to clear consensus checkpointId, code:%s", tstrerror(code));
×
591
  }
592

593
  code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
1,185✔
594
  if (code) {
1,185✔
595
    mError("failed to clear the checkpoint report info, code:%s", tstrerror(code));
378!
596
  }
597

598
  streamMutexUnlock(&pExecNode->lock);
1,185✔
599
  destroyStreamTaskIter(pIter);
1,185✔
600
}
601

602
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
13,145✔
603
  size_t num = taosArrayGetSize(pList);
13,145✔
604

605
  for (int32_t i = 0; i < num; ++i) {
112,678!
606
    SNodeEntry *pEntry = taosArrayGet(pList, i);
112,678✔
607
    if (pEntry == NULL) {
112,678!
608
      continue;
×
609
    }
610

611
    if (pEntry->nodeId == nodeId) {
112,678✔
612
      return true;
13,145✔
613
    }
614
  }
615

616
  return false;
×
617
}
618

619
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
1,260✔
620
  SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
1,260✔
621
  if (pRemovedTasks == NULL) {
1,260!
622
    return terrno;
×
623
  }
624

625
  int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
1,260✔
626
  for (int32_t i = 0; i < numOfTask; ++i) {
15,261✔
627
    STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
14,001✔
628
    if (pId == NULL) {
14,001!
629
      continue;
×
630
    }
631

632
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
14,001✔
633
    if (pEntry == NULL) {
14,001!
634
      continue;
×
635
    }
636

637
    if (pEntry->nodeId == SNODE_HANDLE) {
14,001✔
638
      continue;
856✔
639
    }
640

641
    bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
13,145✔
642
    if (!existed) {
13,145!
643
      void *p = taosArrayPush(pRemovedTasks, pId);
×
644
      if (p == NULL) {
×
645
        mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
×
646
      }
647
    }
648
  }
649

650
  removeTasksInBuf(pRemovedTasks, &execInfo);
1,260✔
651

652
  mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
1,260✔
653
         (int32_t)taosArrayGetSize(execInfo.pTaskList));
654

655
  removeExpiredNodeInfo(pNodeSnapshot);
1,260✔
656

657
  taosArrayDestroy(pRemovedTasks);
1,260✔
658
  return 0;
1,260✔
659
}
660

661
static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) {
975✔
662
  int64_t checkpointId = -1;
975✔
663
  int32_t transId = -1;
975✔
664
  int32_t taskId = -1;
975✔
665

666
  int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList);
975✔
667
  if (existed != numOfTasks) {
975✔
668
    mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName,
37✔
669
           existed, numOfTasks, numOfTasks - existed);
670
    return -1;
37✔
671
  }
672

673
  // acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList
674
  for(int32_t i = 0; i < numOfTasks; ++i) {
5,060✔
675
    STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i);
4,122✔
676
    if (pInfo == NULL) {
4,122!
677
      continue;
×
678
    }
679

680
    if (checkpointId == -1) {
4,122✔
681
      checkpointId = pInfo->checkpointId;
938✔
682
      transId = pInfo->transId;
938✔
683
      taskId = pInfo->taskId;
938✔
684
    } else if (checkpointId != pInfo->checkpointId) {
3,184!
685
      mError("stream:0x%" PRIx64
×
686
             " checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64
687
             ", type 2 taskId:0x%x checkpointId:%" PRId64,
688
             pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId);
689
      return -1;
×
690
    }
691
  }
692

693
  // check for the correct checkpointId for current task info in STaskChkptInfo
694
  STaskChkptInfo  *p = taosArrayGet(pReportInfo->pTaskList, 0);
938✔
695
  STaskId id = {.streamId = p->streamId, .taskId = p->taskId};
938✔
696
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
938✔
697

698
  // cross-check failed, there must be something unknown wrong
699
  SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId));
938✔
700
  if (pTransInfo == NULL) {
938✔
701
    mWarn("stream:0x%" PRIx64 " no active trans exists for checkpoint transId:%d, it may have been cleared already",
149!
702
           id.streamId, transId);
703

704
    if (pe->checkpointInfo.activeId != 0 && pe->checkpointInfo.activeId != checkpointId) {
149!
705
      mWarn("stream:0x%" PRIx64 " active checkpointId is not equalled to the required, current:%" PRId64
×
706
            ", req:%" PRId64 " recheck next time",
707
            id.streamId, pe->checkpointInfo.activeId, checkpointId);
708
      return -1;
×
709
    } else {
710
      //  do nothing
711
    }
712
  } else {
713
    if (pTransInfo->transId != transId) {
789✔
714
      mError("stream:0x%" PRIx64
2!
715
             " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time",
716
             id.streamId, pTransInfo->transId, transId);
717
      return -1;
2✔
718
    }
719
  }
720

721
  mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId,
936✔
722
         pName, numOfTasks);
723

724
  return TSDB_CODE_SUCCESS;
936✔
725
}
726

727
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
25,091✔
728
  SMnode *pMnode = pReq->info.node;
25,091✔
729
  void   *pIter = NULL;
25,091✔
730
  int32_t code = 0;
25,091✔
731
  SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
25,091✔
732
  if (pDropped == NULL) {
25,091!
733
    return terrno;
×
734
  }
735

736
  mDebug("start to scan checkpoint report info");
25,091✔
737

738
  streamMutexLock(&execInfo.lock);
25,091✔
739

740
  while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
83,942✔
741
    SChkptReportInfo *px = (SChkptReportInfo *)pIter;
59,787✔
742
    if (taosArrayGetSize(px->pTaskList) == 0) {
59,787✔
743
      continue;
58,812✔
744
    }
745

746
    STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
975✔
747
    if (pInfo == NULL) {
975!
748
      continue;
×
749
    }
750

751
    SStreamObj *pStream = NULL;
975✔
752
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
975✔
753
    if (pStream == NULL || code != 0) {
975!
754
      mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
×
755
      void *p = taosArrayPush(pDropped, &pInfo->streamId);
×
756
      if (p == NULL) {
×
757
        mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
×
758
      }
759
      continue;
×
760
    }
761

762
    int32_t total = mndGetNumOfStreamTasks(pStream);
975✔
763
    int32_t ret = allTasksSendChkptReport(px, total, pStream->name);
975✔
764
    if (ret == 0) {
975✔
765
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
936✔
766
      if (code == 0) {
936!
767
        code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
936✔
768
        if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {  // remove this entry
936!
769
          taosArrayClear(px->pTaskList);
936✔
770
          mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64
936!
771
                " to %" PRId64,
772
                pInfo->streamId, px->reportChkpt, pInfo->checkpointId);
773
          px->reportChkpt = pInfo->checkpointId;
936✔
774
        } else {
775
          mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet",
×
776
                 pInfo->streamId);
777
        }
778

779
        sdbRelease(pMnode->pSdb, pStream);
936✔
780
        break;
936✔
781
      } else {
782
        mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
×
783
      }
784
    }
785

786
    sdbRelease(pMnode->pSdb, pStream);
39✔
787
  }
788

789
  int32_t size = taosArrayGetSize(pDropped);
25,091✔
790
  if (size > 0) {
25,091!
791
    for (int32_t i = 0; i < size; ++i) {
×
792
      int64_t *pStreamId = (int64_t *)taosArrayGet(pDropped, i);
×
793
      if (pStreamId == NULL) {
×
794
        continue;
×
795
      }
796

797
      code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId));
×
798
      if (code) {
×
799
        mError("failed to remove stream in buf:0x%" PRIx64, *pStreamId);
×
800
      }
801
    }
802

803
    int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
×
804
    mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
×
805
  }
806

807
  streamMutexUnlock(&execInfo.lock);
25,091✔
808

809
  taosArrayDestroy(pDropped);
25,091✔
810

811
  mDebug("end to scan checkpoint report info")
25,091✔
812
  return TSDB_CODE_SUCCESS;
25,091✔
813
}
814

815
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
164✔
816
                                          int64_t ts) {
817
  char msg[128] = {0};
164✔
818
  snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
164✔
819

820
  STrans *pTrans = NULL;
164✔
821
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
164✔
822
  if (pTrans == NULL || code != 0) {
164!
823
    return terrno;
×
824
  }
825

826
  STaskId      id = {.streamId = pStream->uid, .taskId = taskId};
164✔
827
  SStreamTask *pTask = NULL;
164✔
828
  code = mndGetStreamTask(&id, pStream, &pTask);
164✔
829
  if (code) {
164!
830
    mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);
×
831
    sdbRelease(pMnode->pSdb, pStream);
×
832
    return code;
×
833
  }
834

835
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
164✔
836
  if (code) {
164!
837
    sdbRelease(pMnode->pSdb, pStream);
×
838
    return code;
×
839
  }
840

841
  code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts);
164✔
842
  if (code != 0) {
164!
843
    sdbRelease(pMnode->pSdb, pStream);
×
844
    mndTransDrop(pTrans);
×
845
    return code;
×
846
  }
847

848
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
164✔
849
  if (code) {
164!
850
    sdbRelease(pMnode->pSdb, pStream);
×
851
    mndTransDrop(pTrans);
×
852
    return code;
×
853
  }
854

855
  code = mndTransPrepare(pMnode, pTrans);
164✔
856
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
164!
857
    mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
×
858
    sdbRelease(pMnode->pSdb, pStream);
×
859
    mndTransDrop(pTrans);
×
860
    return code;
×
861
  }
862

863
  sdbRelease(pMnode->pSdb, pStream);
164✔
864
  mndTransDrop(pTrans);
164✔
865

866
  return TSDB_CODE_ACTION_IN_PROGRESS;
164✔
867
}
868

869
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
168✔
870
  *pInfo = NULL;
168✔
871

872
  void *px = taosHashGet(pHash, &streamId, sizeof(streamId));
168✔
873
  if (px != NULL) {
168✔
874
    *pInfo = px;
132✔
875
    return 0;
132✔
876
  }
877

878
  SCheckpointConsensusInfo p = {
36✔
879
      .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
36✔
880
      .numOfTasks = numOfTasks,
881
      .streamId = streamId,
882
  };
883

884
  if (p.pTaskList == NULL) {
36!
885
    return terrno;
×
886
  }
887

888
  int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
36✔
889
  if (code == 0) {
36!
890
    void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId));
36✔
891
    *pInfo = pChkptInfo;
36✔
892
  } else {
893
    *pInfo = NULL;
×
894
  }
895

896
  return code;
36✔
897
}
898

899
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
900
// discard the msg may lead to the lost of connections.
901
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) {
168✔
902
  SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
168✔
903
  memcpy(&info.req, pRestoreInfo, sizeof(info.req));
168✔
904

905
  int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList);
168✔
906
  for (int32_t i = 0; i < num; ++i) {
605✔
907
    SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
441✔
908
    if (p == NULL) {
441!
909
      continue;
×
910
    }
911

912
    if (p->req.taskId == info.req.taskId) {
441✔
913
      mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
4!
914
             "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d",
915
             pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId,
916
             info.req.checkpointId, num);
917
      p->req.startTs = info.req.startTs;
4✔
918
      p->req.checkpointId = info.req.checkpointId;
4✔
919
      p->req.transId = info.req.transId;
4✔
920
      return;
4✔
921
    }
922
  }
923

924
  void *p = taosArrayPush(pInfo->pTaskList, &info);
164✔
925
  if (p == NULL) {
164!
926
    mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
×
927
  } else {
928
    num = taosArrayGetSize(pInfo->pTaskList);
164✔
929
    mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
164✔
930
           " waiting tasks:%d",
931
           pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
932
  }
933
}
934

935
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) {
36✔
936
  taosArrayDestroy(pInfo->pTaskList);
36✔
937
  pInfo->pTaskList = NULL;
36✔
938
}
36✔
939

940
int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
1,221✔
941
  int32_t code = 0;
1,221✔
942
  int32_t numOfStreams = taosHashGetSize(pHash);
1,221✔
943
  if (numOfStreams == 0) {
1,221✔
944
    return code;
1,185✔
945
  }
946

947
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
36✔
948
  if (code == 0) {
36!
949
    mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
36✔
950
  } else {
951
    mError("failed to remove stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
×
952
  }
953

954
  return code;
36✔
955
}
956

957
int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
1,185✔
958
  int32_t code = 0;
1,185✔
959
  int32_t numOfStreams = taosHashGetSize(pHash);
1,185✔
960
  if (numOfStreams == 0) {
1,185✔
961
    return code;
325✔
962
  }
963

964
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
860✔
965
  if (code == 0) {
860✔
966
    mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
482✔
967
  } else {
968
    mError("failed to remove stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
378!
969
  }
970

971
  return code;
860✔
972
}
973

974
int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId) {
×
975
  SChkptReportInfo *pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
×
976
  if (pInfo != NULL) {
×
977
    taosArrayClear(pInfo->pTaskList);
×
978
    mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
×
979
           pInfo->reportChkpt);
980
    return 0;
×
981
  }
982

983
  return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
984
}
985

986
static void mndShowStreamStatus(char *dst, int8_t status) {
33,168✔
987
  if (status == STREAM_STATUS__NORMAL) {
33,168✔
988
    tstrncpy(dst, "ready", MND_STREAM_TRIGGER_NAME_SIZE);
33,148✔
989
  } else if (status == STREAM_STATUS__STOP) {
20!
990
    tstrncpy(dst, "stop", MND_STREAM_TRIGGER_NAME_SIZE);
×
991
  } else if (status == STREAM_STATUS__FAILED) {
20!
992
    tstrncpy(dst, "failed", MND_STREAM_TRIGGER_NAME_SIZE);
×
993
  } else if (status == STREAM_STATUS__RECOVER) {
20!
994
    tstrncpy(dst, "recover", MND_STREAM_TRIGGER_NAME_SIZE);
×
995
  } else if (status == STREAM_STATUS__PAUSE) {
20!
996
    tstrncpy(dst, "paused", MND_STREAM_TRIGGER_NAME_SIZE);
27✔
997
  }
998
}
33,168✔
999

1000
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
33,082✔
1001
  int8_t trigger = pStream->conf.trigger;
33,082✔
1002
  if (trigger == STREAM_TRIGGER_AT_ONCE) {
33,082✔
1003
    tstrncpy(dst, "at once", MND_STREAM_TRIGGER_NAME_SIZE);
11,347✔
1004
  } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
21,735✔
1005
    tstrncpy(dst, "window close", MND_STREAM_TRIGGER_NAME_SIZE);
10,895✔
1006
  } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
10,840✔
1007
    tstrncpy(dst, "max delay", MND_STREAM_TRIGGER_NAME_SIZE);
10,829✔
1008
  } else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
11!
1009
    tstrncpy(dst, "force window close", MND_STREAM_TRIGGER_NAME_SIZE);
70✔
1010
  }
1011
}
33,082✔
1012

1013
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
255,666✔
1014
  memset(pBuf, 0, bufLen);
255,666✔
1015
  pBuf[2] = '0';
255,666✔
1016
  pBuf[3] = 'x';
255,666✔
1017

1018
  int32_t len = tintToHex(id, &pBuf[4]);
255,666✔
1019
  varDataSetLen(pBuf, len + 2);
256,317✔
1020
}
256,317✔
1021

1022
static int32_t isAllTaskPaused(SStreamObj *pStream, bool *pRes) {
33,112✔
1023
  int32_t          code = TSDB_CODE_SUCCESS;
33,112✔
1024
  int32_t          lino = 0;
33,112✔
1025
  SStreamTaskIter *pIter = NULL;
33,112✔
1026
  bool             isPaused =  true;
33,112✔
1027

1028
  taosRLockLatch(&pStream->lock);
33,112✔
1029
  code = createStreamTaskIter(pStream, &pIter);
33,201✔
1030
  TSDB_CHECK_CODE(code, lino, _end);
33,177!
1031

1032
  while (streamTaskIterNextTask(pIter)) {
145,040✔
1033
    SStreamTask *pTask = NULL;
111,058✔
1034
    code = streamTaskIterGetCurrent(pIter, &pTask);
111,058✔
1035
    TSDB_CHECK_CODE(code, lino, _end);
111,529!
1036

1037
    STaskId           id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
111,529✔
1038
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
111,529✔
1039
    if (pe == NULL) {
111,863✔
1040
      continue;
116✔
1041
    }
1042
    if (pe->status != TASK_STATUS__PAUSE) {
111,747✔
1043
      isPaused = false;
111,728✔
1044
    }
1045
  }
1046
  (*pRes) = isPaused;
33,139✔
1047

1048
_end:
33,139✔
1049
  destroyStreamTaskIter(pIter);
33,139✔
1050
  taosRUnLockLatch(&pStream->lock);
33,187✔
1051
  if (code != TSDB_CODE_SUCCESS) {
33,199!
1052
    mError("error happens when get stream status, lino:%d, code:%s", lino, tstrerror(code));
×
1053
  }
1054
  return code;
33,199✔
1055
}
1056

1057
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
33,194✔
1058
  int32_t code = 0;
33,194✔
1059
  int32_t cols = 0;
33,194✔
1060
  int32_t lino = 0;
33,194✔
1061

1062
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,194✔
1063
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
33,194✔
1064
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,190✔
1065
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,163!
1066

1067
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
33,163✔
1068
  TSDB_CHECK_CODE(code, lino, _end);
33,148!
1069

1070
  // create time
1071
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,148✔
1072
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,142!
1073
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
33,142✔
1074
  TSDB_CHECK_CODE(code, lino, _end);
33,133!
1075

1076
  // stream id
1077
  char buf[128] = {0};
33,133✔
1078
  int64ToHexStr(pStream->uid, buf, tListLen(buf));
33,133✔
1079
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,183✔
1080
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,163!
1081
  code = colDataSetVal(pColInfo, numOfRows, buf, false);
33,163✔
1082
  TSDB_CHECK_CODE(code, lino, _end);
33,157!
1083

1084
  // related fill-history stream id
1085
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,157✔
1086
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,139✔
1087
  if (pStream->hTaskUid != 0) {
33,136!
1088
    int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
×
1089
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1090
  } else {
1091
    code = colDataSetVal(pColInfo, numOfRows, buf, true);
33,136✔
1092
  }
1093
  TSDB_CHECK_CODE(code, lino, _end);
33,139!
1094

1095
  // related fill-history stream id
1096
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
33,139✔
1097
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
33,139✔
1098
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,139✔
1099
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,161!
1100
  code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
33,161✔
1101
  TSDB_CHECK_CODE(code, lino, _end);
33,119!
1102

1103
  char status[20 + VARSTR_HEADER_SIZE] = {0};
33,119✔
1104
  char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
33,119✔
1105
  bool isPaused = false;
33,119✔
1106
  code = isAllTaskPaused(pStream, &isPaused);
33,119✔
1107
  TSDB_CHECK_CODE(code, lino, _end);
33,199!
1108

1109
  int8_t streamStatus = atomic_load_8(&pStream->status);
33,199✔
1110
  if (isPaused) {
33,173✔
1111
    streamStatus = STREAM_STATUS__PAUSE;
27✔
1112
  }
1113
  mndShowStreamStatus(status2, streamStatus);
33,173✔
1114
  STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
33,161✔
1115
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,161✔
1116
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,127!
1117

1118
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
33,127✔
1119
  TSDB_CHECK_CODE(code, lino, _end);
33,135!
1120

1121
  char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,135✔
1122
  STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
33,135✔
1123
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,128✔
1124
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,098!
1125

1126
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
33,098✔
1127
  TSDB_CHECK_CODE(code, lino, _end);
33,135!
1128

1129
  char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,135✔
1130
  STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
33,135✔
1131
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,136✔
1132
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,114!
1133

1134
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
33,114✔
1135
  TSDB_CHECK_CODE(code, lino, _end);
33,142!
1136

1137
  if (pStream->targetSTbName[0] == 0) {
33,142✔
1138
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2✔
1139
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2!
1140

1141
    code = colDataSetVal(pColInfo, numOfRows, NULL, true);
2✔
1142
  } else {
1143
    char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,140✔
1144
    STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
33,140✔
1145
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,155✔
1146
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,125!
1147

1148
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
33,125✔
1149
  }
1150
  TSDB_CHECK_CODE(code, lino, _end);
33,127!
1151

1152
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,127✔
1153
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,108!
1154

1155
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
33,108✔
1156
  TSDB_CHECK_CODE(code, lino, _end);
33,088!
1157

1158
  char trigger[20 + VARSTR_HEADER_SIZE] = {0};
33,088✔
1159
  char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
33,088✔
1160
  mndShowStreamTrigger(trigger2, pStream);
33,088✔
1161
  STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
33,122✔
1162
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,122✔
1163
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,148!
1164

1165
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
33,148✔
1166
  TSDB_CHECK_CODE(code, lino, _end);
33,141!
1167

1168
  // sink_quota
1169
  char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
33,141✔
1170
  sinkQuota[0] = '0';
33,141✔
1171
  char dstStr[20] = {0};
33,141✔
1172
  STR_TO_VARSTR(dstStr, sinkQuota)
33,141✔
1173
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,141✔
1174
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,147!
1175

1176
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
33,147✔
1177
  TSDB_CHECK_CODE(code, lino, _end);
33,128!
1178

1179
  // checkpoint interval
1180
  char tmp[20 + VARSTR_HEADER_SIZE] = {0};
33,128✔
1181
  (void)tsnprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%d sec", tsStreamCheckpointInterval);
33,128✔
1182
  varDataSetLen(tmp, strlen(varDataVal(tmp)));
33,157✔
1183

1184
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,157✔
1185
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,128!
1186

1187
  code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
33,128✔
1188
  TSDB_CHECK_CODE(code, lino, _end);
33,145!
1189

1190
  // checkpoint backup type
1191
  char backup[20 + VARSTR_HEADER_SIZE] = {0};
33,145✔
1192
  STR_TO_VARSTR(backup, "none")
33,145✔
1193
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,145✔
1194
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,128!
1195

1196
  code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
33,128✔
1197
  TSDB_CHECK_CODE(code, lino, _end);
33,126!
1198

1199
  // history scan idle
1200
  char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
33,126✔
1201
  tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
33,126✔
1202

1203
  memset(dstStr, 0, tListLen(dstStr));
33,126✔
1204
  STR_TO_VARSTR(dstStr, scanHistoryIdle)
33,126✔
1205
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,126✔
1206
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,153!
1207

1208
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
33,153✔
1209

1210
_end:
33,138✔
1211
  if (code) {
33,138!
1212
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1213
  }
1214
  return code;
33,138✔
1215
}
1216

1217
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows,
222,216✔
1218
                              int32_t precision) {
1219
  SColumnInfoData *pColInfo = NULL;
222,216✔
1220
  int32_t          cols = 0;
222,216✔
1221
  int32_t          code = 0;
222,216✔
1222
  int32_t          lino = 0;
222,216✔
1223

1224
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
222,216✔
1225

1226
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
222,216✔
1227
  if (pe == NULL) {
222,813!
1228
    mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
×
1229
           " no valid status/stage info",
1230
           id.taskId, pStream->name, pStream->uid, pStream->createTime);
1231
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1232
  }
1233

1234
  // stream name
1235
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
222,813✔
1236
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
222,813✔
1237

1238
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,765✔
1239
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
222,389!
1240

1241
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
222,389✔
1242
  TSDB_CHECK_CODE(code, lino, _end);
222,089!
1243

1244
  // task id
1245
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,089✔
1246
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,743!
1247

1248
  char idstr[128] = {0};
221,743✔
1249
  int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
221,743✔
1250
  code = colDataSetVal(pColInfo, numOfRows, idstr, false);
222,798✔
1251
  TSDB_CHECK_CODE(code, lino, _end);
222,242!
1252

1253
  // node type
1254
  char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
222,242✔
1255
  varDataSetLen(nodeType, 5);
222,242✔
1256
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,242✔
1257
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,981!
1258

1259
  if (pTask->info.nodeId > 0) {
222,113✔
1260
    memcpy(varDataVal(nodeType), "vnode", 5);
204,066✔
1261
  } else {
1262
    memcpy(varDataVal(nodeType), "snode", 5);
18,047✔
1263
  }
1264
  code = colDataSetVal(pColInfo, numOfRows, nodeType, false);
222,113✔
1265
  TSDB_CHECK_CODE(code, lino, _end);
222,091!
1266

1267
  // node id
1268
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,091✔
1269
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,790!
1270

1271
  int64_t nodeId = TMAX(pTask->info.nodeId, 0);
221,790✔
1272
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
221,790✔
1273
  TSDB_CHECK_CODE(code, lino, _end);
221,629!
1274

1275
  // level
1276
  char level[20 + VARSTR_HEADER_SIZE] = {0};
221,629✔
1277
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
221,629✔
1278
    STR_WITH_SIZE_TO_VARSTR(level, "source", 6);
114,042✔
1279
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
107,587✔
1280
    STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
19,365✔
1281
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
88,222!
1282
    STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
88,995✔
1283
  }
1284

1285
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,629✔
1286
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,548!
1287

1288
  code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
221,548✔
1289
  TSDB_CHECK_CODE(code, lino, _end);
222,067!
1290

1291
  // status
1292
  char status[20 + VARSTR_HEADER_SIZE] = {0};
222,067✔
1293

1294
  const char *pStatus = streamTaskGetStatusStr(pe->status);
222,067✔
1295
  STR_TO_VARSTR(status, pStatus);
222,120✔
1296

1297
  // status
1298
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,120✔
1299
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,964!
1300

1301
  code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
221,964✔
1302
  TSDB_CHECK_CODE(code, lino, _end);
222,033!
1303

1304
  // stage
1305
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,033✔
1306
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,729!
1307

1308
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
221,729✔
1309
  TSDB_CHECK_CODE(code, lino, _end);
221,625!
1310

1311
  // input queue
1312
  char        vbuf[TSDB_STREAM_NOTIFY_STAT_LEN + 2] = {0};
221,625✔
1313
  char        buf[TSDB_STREAM_NOTIFY_STAT_LEN] = {0};
221,625✔
1314
  const char *queueInfoStr = "%4.2f MiB (%6.2f%)";
221,625✔
1315
  snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate);
221,625✔
1316
  STR_TO_VARSTR(vbuf, buf);
221,625✔
1317

1318
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,625✔
1319
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
222,414!
1320

1321
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
222,414✔
1322
  TSDB_CHECK_CODE(code, lino, _end);
222,182!
1323

1324
  // input total
1325
  const char *formatTotalMb = "%7.2f MiB";
222,182✔
1326
  const char *formatTotalGb = "%7.2f GiB";
222,182✔
1327
  if (pe->procsTotal < 1024) {
222,182!
1328
    snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal);
222,217✔
1329
  } else {
1330
    snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024);
×
1331
  }
1332

1333
  memset(vbuf, 0, tListLen(vbuf));
222,182✔
1334
  STR_TO_VARSTR(vbuf, buf);
222,182✔
1335

1336
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,182✔
1337
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
222,409!
1338

1339
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
222,409✔
1340
  TSDB_CHECK_CODE(code, lino, _end);
222,158!
1341

1342
  // process throughput
1343
  const char *formatKb = "%7.2f KiB/s";
222,158✔
1344
  const char *formatMb = "%7.2f MiB/s";
222,158✔
1345
  if (pe->procsThroughput < 1024) {
222,158✔
1346
    snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput);
222,030✔
1347
  } else {
1348
    snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024);
128✔
1349
  }
1350

1351
  memset(vbuf, 0, tListLen(vbuf));
222,158✔
1352
  STR_TO_VARSTR(vbuf, buf);
222,158✔
1353

1354
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,158✔
1355
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
222,423!
1356

1357
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
222,423✔
1358
  TSDB_CHECK_CODE(code, lino, _end);
222,168!
1359

1360
  // output total
1361
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,168✔
1362
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,823!
1363

1364
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
222,303✔
1365
    colDataSetNULL(pColInfo, numOfRows);
89,013!
1366
  } else {
1367
    (void)tsnprintf(buf, sizeof(buf), formatTotalMb, pe->outputTotal);
133,290✔
1368
    memset(vbuf, 0, tListLen(vbuf));
133,708✔
1369
    STR_TO_VARSTR(vbuf, buf);
133,708✔
1370

1371
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
133,708✔
1372
    TSDB_CHECK_CODE(code, lino, _end);
133,511!
1373
  }
1374

1375
  // output throughput
1376
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,524✔
1377
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,865!
1378

1379
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
222,348✔
1380
    colDataSetNULL(pColInfo, numOfRows);
88,976!
1381
  } else {
1382
    if (pe->outputThroughput < 1024) {
133,372✔
1383
      snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput);
133,330✔
1384
    } else {
1385
      snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024);
42✔
1386
    }
1387

1388
    memset(vbuf, 0, tListLen(vbuf));
133,372✔
1389
    STR_TO_VARSTR(vbuf, buf);
133,372✔
1390

1391
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
133,372✔
1392
    TSDB_CHECK_CODE(code, lino, _end);
133,472!
1393
  }
1394
  // info
1395
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
222,448✔
1396
    const char *sinkStr = "%.2f MiB";
88,973✔
1397
    snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
88,973✔
1398
  } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {  // offset info
133,475✔
1399
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
114,175✔
1400
      int32_t ret = taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision);
5,891✔
1401
      if (ret != 0) {
5,891!
1402
        mError("failed to format processed timewindow, skey:%" PRId64, pe->processedVer);
×
1403
        memset(buf, 0, tListLen(buf));
×
1404
      }
1405
    } else {
1406
      const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
108,284✔
1407
      snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
108,284✔
1408
    }
1409
  } else {
1410
    memset(buf, 0, tListLen(buf));
19,300✔
1411
  }
1412

1413
  STR_TO_VARSTR(vbuf, buf);
222,448✔
1414

1415
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,448✔
1416
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
222,378!
1417

1418
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
222,378✔
1419
  TSDB_CHECK_CODE(code, lino, _end);
222,001!
1420

1421
  // start_time
1422
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
222,001✔
1423
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,663!
1424

1425
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false);
221,663✔
1426
  TSDB_CHECK_CODE(code, lino, _end);
221,469!
1427

1428
  // start id
1429
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,469✔
1430
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,163!
1431

1432
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false);
221,163✔
1433
  TSDB_CHECK_CODE(code, lino, _end);
221,252!
1434

1435
  // start ver
1436
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,252✔
1437
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,003!
1438

1439
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false);
221,003✔
1440
  TSDB_CHECK_CODE(code, lino, _end);
221,278!
1441

1442
  // checkpoint time
1443
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,278✔
1444
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,080!
1445

1446
  if (pe->checkpointInfo.latestTime != 0) {
221,099✔
1447
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
180,513✔
1448
  } else {
1449
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
40,586✔
1450
  }
1451
  TSDB_CHECK_CODE(code, lino, _end);
221,177!
1452

1453
  // checkpoint_id
1454
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,177✔
1455
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,006!
1456

1457
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false);
221,006✔
1458
  TSDB_CHECK_CODE(code, lino, _end);
221,332!
1459

1460
  // checkpoint version
1461
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,332✔
1462
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,151!
1463

1464
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false);
221,151✔
1465
  TSDB_CHECK_CODE(code, lino, _end);
221,395!
1466

1467
  // checkpoint size
1468
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,395✔
1469
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,107!
1470

1471
  colDataSetNULL(pColInfo, numOfRows);
221,112!
1472

1473
  // checkpoint backup status
1474
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,112✔
1475
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,119!
1476

1477
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
221,119✔
1478
  TSDB_CHECK_CODE(code, lino, _end);
221,469!
1479

1480
  // ds_err_info
1481
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,469✔
1482
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,360!
1483

1484
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
221,360✔
1485
  TSDB_CHECK_CODE(code, lino, _end);
221,525!
1486

1487
  // history_task_id
1488
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,525✔
1489
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,117!
1490

1491
  if (pe->hTaskId != 0) {
221,150✔
1492
    int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
331✔
1493
    code = colDataSetVal(pColInfo, numOfRows, idstr, false);
331✔
1494
  } else {
1495
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
220,819✔
1496
  }
1497
  TSDB_CHECK_CODE(code, lino, _end);
221,545!
1498

1499
  // history_task_status
1500
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,545✔
1501
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
221,243!
1502

1503
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
221,243✔
1504
  TSDB_CHECK_CODE(code, lino, _end);
221,777!
1505

1506
  // notify_event_stat
1507
  int32_t offset =0;
221,777✔
1508
  if (pe->notifyEventStat.notifyEventAddTimes > 0) {
221,777!
1509
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Add %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1510
                        pe->notifyEventStat.notifyEventAddTimes, pe->notifyEventStat.notifyEventAddElems,
1511
                        pe->notifyEventStat.notifyEventAddCostSec);
1512
  }
1513
  if (pe->notifyEventStat.notifyEventPushTimes > 0) {
221,777!
1514
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Push %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1515
                        pe->notifyEventStat.notifyEventPushTimes, pe->notifyEventStat.notifyEventPushElems,
1516
                        pe->notifyEventStat.notifyEventPushCostSec);
1517
  }
1518
  if (pe->notifyEventStat.notifyEventPackTimes > 0) {
221,777!
1519
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Pack %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1520
                        pe->notifyEventStat.notifyEventPackTimes, pe->notifyEventStat.notifyEventPackElems,
1521
                        pe->notifyEventStat.notifyEventPackCostSec);
1522
  }
1523
  if (pe->notifyEventStat.notifyEventSendTimes > 0) {
221,777!
1524
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Send %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1525
                        pe->notifyEventStat.notifyEventSendTimes, pe->notifyEventStat.notifyEventSendElems,
1526
                        pe->notifyEventStat.notifyEventSendCostSec);
1527
  }
1528
  if (pe->notifyEventStat.notifyEventHoldElems > 0) {
221,777!
1529
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "[Hold %" PRId64 " elems] ",
×
1530
                        pe->notifyEventStat.notifyEventHoldElems);
1531
  }
1532
  TSDB_CHECK_CONDITION(offset < sizeof(buf), code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
221,777!
1533
  buf[offset] = '\0';
221,777✔
1534

1535
  STR_TO_VARSTR(vbuf, buf);
221,777✔
1536

1537
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
221,777✔
1538
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
222,445✔
1539

1540
  if (offset == 0) {
222,442!
1541
    colDataSetNULL(pColInfo, numOfRows);
222,442!
1542
  } else {
1543
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
1544
    TSDB_CHECK_CODE(code, lino, _end);
×
1545
  }
1546

1547
_end:
×
1548
  if (code) {
222,442!
1549
    mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code));
×
1550
  }
1551
  return code;
222,430✔
1552
}
1553

1554
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
11,401✔
1555
  const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
11,401✔
1556
  const SEp *p = GET_ACTIVE_EP(pCurrent);
11,401✔
1557

1558
  if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
11,401!
1559
    return false;
11,401✔
1560
  }
1561
  return true;
×
1562
}
1563

1564
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo) {
2,413✔
1565
  if (pInfo != NULL) {
2,413!
1566
    taosArrayDestroy(pInfo->pUpdateNodeList);
2,413✔
1567
    taosHashCleanup(pInfo->pDBMap);
2,413✔
1568
  }
1569
}
2,413✔
1570

1571
// 1. increase the replica does not affect the stream process.
1572
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
1573
// tasks on the will be removed replica.
1574
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
1575
// will handle it as mentioned in 1 & 2 items.
1576
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
2,413✔
1577
                               SVgroupChangeInfo *pInfo) {
1578
  int32_t code = 0;
2,413✔
1579
  int32_t lino = 0;
2,413✔
1580

1581
  if (pInfo == NULL) {
2,413!
1582
    return TSDB_CODE_INVALID_PARA;
×
1583
  }
1584

1585
  pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo));
2,413✔
1586
  pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
2,413✔
1587

1588
  if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
2,413!
1589
    mndDestroyVgroupChangeInfo(pInfo);
×
1590
    TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
×
1591
  }
1592

1593
  int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
2,413✔
1594
  for (int32_t i = 0; i < numOfNodes; ++i) {
14,494✔
1595
    SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
12,081✔
1596
    if (pPrevEntry == NULL) {
12,081!
1597
      continue;
×
1598
    }
1599

1600
    int32_t num = taosArrayGetSize(pNodeList);
12,081✔
1601
    for (int32_t j = 0; j < num; ++j) {
164,468✔
1602
      SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
163,802✔
1603
      if (pCurrent == NULL) {
163,802!
1604
        continue;
×
1605
      }
1606

1607
      if (pCurrent->nodeId == pPrevEntry->nodeId) {
163,802✔
1608
        if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
11,415!
1609
          const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
14✔
1610

1611
          char buf[256] = {0};
14✔
1612
          code = epsetToStr(&pCurrent->epset, buf, tListLen(buf));  // ignore this error
14✔
1613
          if (code) {
14!
1614
            mError("failed to convert epset string, code:%s", tstrerror(code));
×
1615
            TSDB_CHECK_CODE(code, lino, _err);
×
1616
          }
1617

1618
          mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
14!
1619
                 pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
1620

1621
          SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
14✔
1622
          epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
14✔
1623
          epsetAssign(&updateInfo.newEp, &pCurrent->epset);
14✔
1624

1625
          void *p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
14✔
1626
          TSDB_CHECK_NULL(p, code, lino, _err, terrno);
14!
1627
        }
1628

1629
        // todo handle the snode info
1630
        if (pCurrent->nodeId != SNODE_HANDLE) {
11,415✔
1631
          SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
9,938✔
1632
          code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
9,938✔
1633
          mndReleaseVgroup(pMnode, pVgroup);
9,938✔
1634
          TSDB_CHECK_CODE(code, lino, _err);
9,938!
1635
        }
1636

1637
        break;
11,415✔
1638
      }
1639
    }
1640
  }
1641

1642
  return code;
2,413✔
1643

1644
_err:
×
1645
  mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
×
1646
  mndDestroyVgroupChangeInfo(pInfo);
×
1647
  return code;
×
1648
}
1649

1650
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
1,792✔
1651
  bool              allReady = false;
1,792✔
1652
  bool              nodeUpdated = false;
1,792✔
1653
  SVgroupChangeInfo changeInfo = {0};
1,792✔
1654

1655
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,792✔
1656

1657
  if (numOfNodes == 0) {
1,792✔
1658
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
606✔
1659
    execInfo.ts = taosGetTimestampSec();
606✔
1660
    return false;
606✔
1661
  }
1662

1663
  for (int32_t i = 0; i < numOfNodes; ++i) {
6,963✔
1664
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
5,792✔
1665
    if (pNodeEntry == NULL) {
5,792!
1666
      continue;
×
1667
    }
1668

1669
    if (pNodeEntry->stageUpdated) {
5,792✔
1670
      mDebug("stream task not ready due to node update detected, checkpoint not issued");
15!
1671
      return true;
15✔
1672
    }
1673
  }
1674

1675
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
1,171✔
1676
  if (code) {
1,171!
1677
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
1678
  }
1679

1680
  if (!allReady) {
1,171✔
1681
    mWarn("not all vnodes ready, quit from vnodes status check");
18!
1682
    return true;
18✔
1683
  }
1684

1685
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
1,153✔
1686
  if (code) {
1,153!
1687
    nodeUpdated = false;
×
1688
  } else {
1689
    nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
1,153✔
1690
    if (nodeUpdated) {
1,153!
1691
      mDebug("stream tasks not ready due to node update");
×
1692
    }
1693
  }
1694

1695
  mndDestroyVgroupChangeInfo(&changeInfo);
1,153✔
1696
  return nodeUpdated;
1,153✔
1697
}
1698

1699
// check if the node update happens or not
1700
bool mndStreamNodeIsUpdated(SMnode *pMnode) {
1,792✔
1701
  SArray *pNodeSnapshot = NULL;
1,792✔
1702

1703
  streamMutexLock(&execInfo.lock);
1,792✔
1704
  bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
1,792✔
1705
  streamMutexUnlock(&execInfo.lock);
1,792✔
1706

1707
  taosArrayDestroy(pNodeSnapshot);
1,792✔
1708
  return updated;
1,792✔
1709
}
1710

1711
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
1,564✔
1712
  SSdb      *pSdb = pMnode->pSdb;
1,564✔
1713
  void      *pIter = NULL;
1,564✔
1714
  SSnodeObj *pObj = NULL;
1,564✔
1715

1716
  if (pSrcDb->cfg.replications == 1) {
1,564✔
1717
    return TSDB_CODE_SUCCESS;
1,561✔
1718
  } else {
1719
    while (1) {
1720
      pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
3✔
1721
      if (pIter == NULL) {
3✔
1722
        break;
2✔
1723
      }
1724

1725
      sdbRelease(pSdb, pObj);
1✔
1726
      sdbCancelFetch(pSdb, pIter);
1✔
1727
      return TSDB_CODE_SUCCESS;
1✔
1728
    }
1729

1730
    mError("snode not existed when trying to create stream in db with multiple replica");
2!
1731
    return TSDB_CODE_SNODE_NOT_DEPLOYED;
2✔
1732
  }
1733
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc