• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4495

12 Jul 2025 07:47AM UTC coverage: 62.706% (+0.07%) from 62.635%
#4495

push

travis-ci

web-flow
docs: update stream docs (#31822)

159683 of 324087 branches covered (49.27%)

Branch coverage included in aggregate %.

245972 of 322830 relevant lines covered (76.19%)

23684066.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.76
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
struct SStreamTaskIter {
25
  SStreamObj  *pStream;
26
  int32_t      level;
27
  int32_t      ordinalIndex;
28
  int32_t      totalLevel;
29
  SStreamTask *pTask;
30
};
31

32
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
33

34
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter) {
201,892✔
35
  *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
201,892!
36
  if (*pIter == NULL) {
201,889!
37
    return terrno;
×
38
  }
39

40
  (*pIter)->level = -1;
201,889✔
41
  (*pIter)->ordinalIndex = 0;
201,889✔
42
  (*pIter)->pStream = pStream;
201,889✔
43
  (*pIter)->totalLevel = taosArrayGetSize(pStream->pTaskList);
201,889✔
44
  (*pIter)->pTask = NULL;
201,683✔
45

46
  return 0;
201,683✔
47
}
48

49
bool streamTaskIterNextTask(SStreamTaskIter *pIter) {
894,081✔
50
  if (pIter->level >= pIter->totalLevel) {
894,081!
51
    pIter->pTask = NULL;
×
52
    return false;
×
53
  }
54

55
  if (pIter->level == -1) {
894,081✔
56
    pIter->level += 1;
201,678✔
57
  }
58

59
  while (pIter->level < pIter->totalLevel) {
1,290,910✔
60
    SArray *pList = taosArrayGetP(pIter->pStream->pTaskList, pIter->level);
1,087,770✔
61
    if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
1,083,356✔
62
      pIter->level += 1;
396,829✔
63
      pIter->ordinalIndex = 0;
396,829✔
64
      pIter->pTask = NULL;
396,829✔
65
      continue;
396,829✔
66
    }
67

68
    pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
689,531✔
69
    pIter->ordinalIndex += 1;
688,340✔
70
    return true;
688,340✔
71
  }
72

73
  pIter->pTask = NULL;
203,140✔
74
  return false;
203,140✔
75
}
76

77
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask) {
688,990✔
78
  if (pTask) {
688,990!
79
    *pTask = pIter->pTask;
689,118✔
80
    if (*pTask != NULL) {
689,118!
81
      return TSDB_CODE_SUCCESS;
689,184✔
82
    }
83
  }
84

85
  return TSDB_CODE_INVALID_PARA;
×
86
}
87

88
void destroyStreamTaskIter(SStreamTaskIter *pIter) { taosMemoryFree(pIter); }
201,499!
89

90
static bool checkStatusForEachReplica(SVgObj *pVgroup) {
35,819✔
91
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
71,661✔
92
    if (!pVgroup->vnodeGid[i].syncRestore) {
35,868✔
93
      mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
26!
94
      return false;
26✔
95
    }
96

97
    ESyncState state = pVgroup->vnodeGid[i].syncState;
35,842✔
98
    if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
35,842!
99
        state == TAOS_SYNC_STATE_CANDIDATE) {
100
      mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
×
101
            state);
102
      return false;
×
103
    }
104
  }
105

106
  return true;
35,793✔
107
}
108

109
static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) {
1,641✔
110
  SSnodeObj *pObj = NULL;
1,641✔
111
  void      *pIter = NULL;
1,641✔
112
  int32_t    code = 0;
1,641✔
113

114
  while (1) {
1,380✔
115
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
3,021✔
116
    if (pIter == NULL) {
3,021✔
117
      break;
1,641✔
118
    }
119

120
    SNodeEntry entry = {.nodeId = SNODE_HANDLE};
1,380✔
121
    code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
1,380✔
122
    if (code) {
1,380!
123
      sdbRelease(pMnode->pSdb, pObj);
×
124
      sdbCancelFetch(pMnode->pSdb, pIter);
×
125
      mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
×
126
      return code;
×
127
    }
128

129
    char buf[256] = {0};
1,380✔
130
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
1,380✔
131
    if (code != 0) {  // print error and continue
1,380!
132
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
133
    }
134

135
    void *p = taosArrayPush(pVgroupList, &entry);
1,380✔
136
    if (p == NULL) {
1,380!
137
      code = terrno;
×
138
      sdbRelease(pMnode->pSdb, pObj);
×
139
      sdbCancelFetch(pMnode->pSdb, pIter);
×
140
      mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
×
141
      return code;
×
142
    } else {
143
      mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
1,380✔
144
    }
145

146
    sdbRelease(pMnode->pSdb, pObj);
1,380✔
147
  }
148

149
  return code;
1,641✔
150
}
151

152
static int32_t mndCheckMnodeStatus(SMnode* pMnode) {
1,641✔
153
  int32_t    code = 0;
1,641✔
154
  ESdbStatus objStatus;
155
  void      *pIter = NULL;
1,641✔
156
  SMnodeObj *pObj = NULL;
1,641✔
157

158
  while (1) {
159
    pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true);
3,282✔
160
    if (pIter == NULL) {
3,282✔
161
      break;
1,641✔
162
    }
163

164
    if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) {
1,641!
165
      mDebug("mnode sync state:%d not leader/follower", pObj->syncState);
×
166
      sdbRelease(pMnode->pSdb, pObj);
×
167
      sdbCancelFetch(pMnode->pSdb, pIter);
×
168
      return TSDB_CODE_FAILED;
×
169
    }
170

171
    if (objStatus != SDB_STATUS_READY) {
1,641!
172
      mWarn("mnode status:%d not ready", objStatus);
×
173
      sdbRelease(pMnode->pSdb, pObj);
×
174
      sdbCancelFetch(pMnode->pSdb, pIter);
×
175
      return TSDB_CODE_FAILED;
×
176
    }
177

178
    sdbRelease(pMnode->pSdb, pObj);
1,641✔
179
  }
180

181
  return TSDB_CODE_SUCCESS;
1,641✔
182
}
183

184
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady, SHashObj* pTermMap) {
1,641✔
185
  SSdb     *pSdb = pMnode->pSdb;
1,641✔
186
  void     *pIter = NULL;
1,641✔
187
  SVgObj   *pVgroup = NULL;
1,641✔
188
  int32_t   code = 0;
1,641✔
189
  SHashObj *pHash = NULL;
1,641✔
190

191
  pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1,641✔
192
  if (pHash == NULL) {
1,641!
193
    mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
×
194
    return terrno;
×
195
  }
196

197
  while (1) {
36,158✔
198
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
37,799✔
199
    if (pIter == NULL) {
37,799✔
200
      break;
1,641✔
201
    }
202
    if(pVgroup->mountVgId) {
36,158!
203
      sdbRelease(pSdb, pVgroup);
×
204
      continue;
×
205
    }
206

207
    SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
36,158✔
208
    entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
36,158✔
209

210
    int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
36,158✔
211
    if (pReplica == NULL) {  // not exist, add it into hash map
36,158✔
212
      code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
18,232✔
213
      if (code) {
18,232!
214
        mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
×
215
        sdbRelease(pSdb, pVgroup);
×
216
        sdbCancelFetch(pSdb, pIter);
×
217
        goto _end;  // take snapshot failed, and not all ready
×
218
      }
219
    } else {
220
      if (*pReplica != pVgroup->replica) {
17,926✔
221
        mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
9!
222
              pVgroup->vgId, pVgroup->replica, *pReplica);
223
        *allReady = false;  // task snap success, but not all ready
9✔
224
      }
225
    }
226

227
    // if not all ready till now, no need to check the remaining vgroups,
228
    // but still we need to put the info of the existed vgroups into the snapshot list
229
    if (*allReady) {
36,158✔
230
      *allReady = checkStatusForEachReplica(pVgroup);
35,819✔
231
    }
232

233
    char buf[256] = {0};
36,158✔
234
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
36,158✔
235
    if (code != 0) {  // print error and continue
36,158!
236
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
237
    }
238

239
    void *p = taosArrayPush(pVgroupList, &entry);
36,158✔
240
    if (p == NULL) {
36,158!
241
      mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
×
242
      code = terrno;
×
243
      sdbRelease(pSdb, pVgroup);
×
244
      sdbCancelFetch(pSdb, pIter);
×
245
      goto _end;
×
246
    } else {
247
      mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
36,158✔
248
    }
249

250
    if (pTermMap != NULL) {
36,158✔
251
      int64_t term = pVgroup->vnodeGid[0].syncTerm;
123✔
252
      code = taosHashPut(pTermMap, &pVgroup->vgId, sizeof(pVgroup->vgId), &term, sizeof(term));
123✔
253
      if (code) {
123!
254
        mError("failed to put vnode:%d term into hashMap, code:%s", pVgroup->vgId, tstrerror(code));
×
255
      }
256
    }
257

258
    sdbRelease(pSdb, pVgroup);
36,158✔
259
  }
260

261
_end:
1,641✔
262
  taosHashCleanup(pHash);
1,641✔
263
  return code;
1,641✔
264
}
265

266
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList, SHashObj* pTermMap) {
1,641✔
267
  int32_t   code = 0;
1,641✔
268
  SArray   *pVgroupList = NULL;
1,641✔
269

270
  *pList = NULL;
1,641✔
271
  *allReady = true;
1,641✔
272

273
  pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
1,641✔
274
  if (pVgroupList == NULL) {
1,641!
275
    mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
×
276
    code = terrno;
×
277
    goto _err;
×
278
  }
279

280
  // 1. check for all vnodes status
281
  code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady, pTermMap);
1,641✔
282
  if (code) {
1,641!
283
    goto _err;
×
284
  }
285

286
  // 2. add snode info
287
  code = mndAddSnodeInfo(pMnode, pVgroupList);
1,641✔
288
  if (code) {
1,641!
289
    goto _err;
×
290
  }
291

292
  // 3. check for mnode status
293
  code = mndCheckMnodeStatus(pMnode);
1,641✔
294
  if (code != TSDB_CODE_SUCCESS) {
1,641!
295
    *allReady = false;
×
296
  }
297

298
  *pList = pVgroupList;
1,641✔
299
  return code;
1,641✔
300

301
_err:
×
302
  *allReady = false;
×
303
  taosArrayDestroy(pVgroupList);
×
304
  return code;
×
305
}
306

307
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
13,404✔
308
  void *pIter = NULL;
13,404✔
309
  SSdb *pSdb = pMnode->pSdb;
13,404✔
310
  *pStream = NULL;
13,404✔
311

312
  SStreamObj *p = NULL;
13,404✔
313
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
23,951✔
314
    if (p->uid == streamId) {
23,946✔
315
      sdbCancelFetch(pSdb, pIter);
13,399✔
316
      *pStream = p;
13,399✔
317
      return TSDB_CODE_SUCCESS;
13,399✔
318
    }
319
    sdbRelease(pSdb, p);
10,547✔
320
  }
321

322
  return TSDB_CODE_STREAM_TASK_NOT_EXIST;
5✔
323
}
324

325
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
×
326
  STrans *pTrans = mndAcquireTrans(pMnode, transId);
×
327
  if (pTrans != NULL) {
×
328
    mInfo("kill active transId:%d in Db:%s", transId, pDbName);
×
329
    int32_t code = mndKillTrans(pMnode, pTrans);
×
330
    mndReleaseTrans(pMnode, pTrans);
×
331
    if (code) {
×
332
      mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
×
333
    }
334
  } else {
335
    mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
×
336
  }
337
}
×
338

339
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
19,277✔
340
  *hasEpset = false;
19,277✔
341

342
  pEpSet->numOfEps = 0;
19,277✔
343
  if (nodeId == SNODE_HANDLE) {
19,277✔
344
    SSnodeObj *pObj = NULL;
412✔
345
    void      *pIter = NULL;
412✔
346

347
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
412✔
348
    if (pIter != NULL) {
412!
349
      int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
412✔
350
      sdbRelease(pMnode->pSdb, pObj);
412✔
351
      sdbCancelFetch(pMnode->pSdb, pIter);
412✔
352
      if (code) {
412!
353
        *hasEpset = false;
×
354
        mError("failed to set epset");
×
355
      } else {
356
        *hasEpset = true;
412✔
357
      }
358
      return code;
412✔
359
    } else {
360
      mError("failed to acquire snode epset");
×
361
      return TSDB_CODE_INVALID_PARA;
×
362
    }
363
  } else {
364
    SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
18,865✔
365
    if (pVgObj != NULL) {
18,865✔
366
      SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
18,864✔
367
      mndReleaseVgroup(pMnode, pVgObj);
18,864✔
368

369
      epsetAssign(pEpSet, &epset);
18,864✔
370
      *hasEpset = true;
18,864✔
371
      return TSDB_CODE_SUCCESS;
18,864✔
372
    } else {
373
      mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
1!
374
      return TSDB_CODE_SUCCESS;
1✔
375
    }
376
  }
377
}
378

379
int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
×
380
  *pTask = NULL;
×
381

382
  SStreamTask     *p = NULL;
×
383
  SStreamTaskIter *pIter = NULL;
×
384
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
385
  if (code) {
×
386
    mError("failed to create stream task iter:%s", pStream->name);
×
387
    return code;
×
388
  }
389

390
  while (streamTaskIterNextTask(pIter)) {
×
391
    code = streamTaskIterGetCurrent(pIter, &p);
×
392
    if (code) {
×
393
      continue;
×
394
    }
395

396
    if (p->id.taskId == pId->taskId) {
×
397
      destroyStreamTaskIter(pIter);
×
398
      *pTask = p;
×
399
      return 0;
×
400
    }
401
  }
402

403
  destroyStreamTaskIter(pIter);
×
404
  return TSDB_CODE_FAILED;
×
405
}
406

407
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
135,081✔
408
  int32_t num = 0;
135,081✔
409
  for (int32_t i = 0; i < taosArrayGetSize(pStream->pTaskList); ++i) {
401,051✔
410
    SArray *pLevel = taosArrayGetP(pStream->pTaskList, i);
265,480✔
411
    num += taosArrayGetSize(pLevel);
266,170✔
412
  }
413

414
  return num;
134,714✔
415
}
416

417
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
82✔
418
  SSdb   *pSdb = pMnode->pSdb;
82✔
419
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
82✔
420
  if (pDb == NULL) {
82!
421
    TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
×
422
  }
423

424
  int32_t numOfStreams = 0;
82✔
425
  void   *pIter = NULL;
82✔
426
  while (1) {
×
427
    SStreamObj *pStream = NULL;
82✔
428
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
82✔
429
    if (pIter == NULL) break;
82!
430

431
    if (pStream->sourceDbUid == pDb->uid) {
×
432
      numOfStreams++;
×
433
    }
434

435
    sdbRelease(pSdb, pStream);
×
436
  }
437

438
  *pNumOfStreams = numOfStreams;
82✔
439
  mndReleaseDb(pMnode, pDb);
82✔
440
  return 0;
82✔
441
}
442

443
static void freeTaskList(void *param) {
1,552✔
444
  SArray **pList = (SArray **)param;
1,552✔
445
  taosArrayDestroy(*pList);
1,552✔
446
}
1,552✔
447

448
int32_t mndInitExecInfo() {
2,506✔
449
  int32_t code = taosThreadMutexInit(&execInfo.lock, NULL);
2,506✔
450
  if (code) {
2,506!
451
    return code;
×
452
  }
453

454
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
2,506✔
455

456
  execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
2,506✔
457
  execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
2,506✔
458
  execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
2,506✔
459
  execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
2,506✔
460
  execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2,506✔
461
  execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2,506✔
462
  execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
2,506✔
463
  execInfo.pKilledChkptTrans = taosArrayInit(4, sizeof(SStreamTaskResetMsg));
2,506✔
464

465
  if (execInfo.pTaskList == NULL || execInfo.pTaskMap == NULL || execInfo.transMgmt.pDBTrans == NULL ||
2,506!
466
      execInfo.pTransferStateStreams == NULL || execInfo.pChkptStreams == NULL || execInfo.pStreamConsensus == NULL ||
2,506!
467
      execInfo.pNodeList == NULL || execInfo.pKilledChkptTrans == NULL) {
2,506!
468
    mError("failed to initialize the stream runtime env, code:%s", tstrerror(terrno));
×
469
    return terrno;
×
470
  }
471

472
  execInfo.role = NODE_ROLE_UNINIT;
2,506✔
473
  execInfo.switchFromFollower = false;
2,506✔
474

475
  taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
2,506✔
476
  taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
2,506✔
477
  taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
2,506✔
478
  return 0;
2,506✔
479
}
480

481
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
126✔
482
  SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
126✔
483
  if (pValidList == NULL) {  // not continue
126!
484
    return;
×
485
  }
486

487
  int32_t size = taosArrayGetSize(pNodeSnapshot);
126✔
488
  int32_t oldSize = taosArrayGetSize(execInfo.pNodeList);
126✔
489

490
  for (int32_t i = 0; i < oldSize; ++i) {
1,615✔
491
    SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
1,489✔
492
    if (p == NULL) {
1,489!
493
      continue;
×
494
    }
495

496
    for (int32_t j = 0; j < size; ++j) {
14,187✔
497
      SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
13,371✔
498
      if (pEntry == NULL) {
13,371!
499
        continue;
×
500
      }
501

502
      if (pEntry->nodeId == p->nodeId) {
13,371✔
503
        p->hbTimestamp = pEntry->hbTimestamp;
673✔
504

505
        void *px = taosArrayPush(pValidList, p);
673✔
506
        if (px == NULL) {
673!
507
          mError("failed to put node into list, nodeId:%d", p->nodeId);
×
508
        } else {
509
          mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
673✔
510
        }
511
        break;
673✔
512
      }
513
    }
514
  }
515

516
  taosArrayDestroy(execInfo.pNodeList);
126✔
517
  execInfo.pNodeList = pValidList;
126✔
518

519
  mDebug("remain %d valid node entries after clean expired nodes info, prev size:%d",
126✔
520
         (int32_t)taosArrayGetSize(pValidList), oldSize);
521
}
522

523
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
8,553✔
524
  void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
8,553✔
525
  if (p == NULL) {
8,553✔
526
    return TSDB_CODE_SUCCESS;
116✔
527
  }
528

529
  int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
8,437✔
530
  if (code) {
8,437!
531
    return code;
×
532
  }
533

534
  for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
20,604!
535
    STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
20,604✔
536
    if (pId == NULL) {
20,604!
537
      continue;
×
538
    }
539

540
    if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
20,604!
541
      taosArrayRemove(pExecNode->pTaskList, k);
8,437✔
542

543
      int32_t num = taosArrayGetSize(pExecNode->pTaskList);
8,437✔
544
      mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
8,437!
545
      break;
8,437✔
546
    }
547
  }
548

549
  return TSDB_CODE_SUCCESS;
8,437✔
550
}
551

552
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo) {
126✔
553
  for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
126!
554
    STaskId *pId = taosArrayGet(pTaskIds, i);
×
555
    if (pId == NULL) {
×
556
      continue;
×
557
    }
558

559
    int32_t code = doRemoveTasks(pExecInfo, pId);
×
560
    if (code) {
×
561
      mError("failed to remove task in buffer list, 0x%" PRIx64, pId->taskId);
×
562
    }
563
  }
564
}
126✔
565

566
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,574✔
567
  SStreamTaskIter *pIter = NULL;
1,574✔
568
  streamMutexLock(&pExecNode->lock);
1,574✔
569

570
  // 1. remove task entries
571
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,574✔
572
  if (code) {
1,574!
573
    streamMutexUnlock(&pExecNode->lock);
×
574
    mError("failed to create stream task iter:%s", pStream->name);
×
575
    return;
×
576
  }
577

578
  while (streamTaskIterNextTask(pIter)) {
10,127✔
579
    SStreamTask *pTask = NULL;
8,553✔
580
    code = streamTaskIterGetCurrent(pIter, &pTask);
8,553✔
581
    if (code) {
8,553!
582
      continue;
×
583
    }
584

585
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
8,553✔
586
    code = doRemoveTasks(pExecNode, &id);
8,553✔
587
    if (code) {
8,553!
588
      mError("failed to remove task in buffer list, 0x%" PRIx64, id.taskId);
×
589
    }
590
  }
591

592
  if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) {
1,574!
593
    streamMutexUnlock(&pExecNode->lock);
×
594
    destroyStreamTaskIter(pIter);
×
595
    mError("task map size, task list size, not equal");
×
596
    return;
×
597
  }
598

599
  // 2. remove stream entry in consensus hash table and checkpoint-report hash table
600
  code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
1,574✔
601
  if (code) {
1,574!
602
    mError("failed to clear consensus checkpointId, code:%s", tstrerror(code));
×
603
  }
604

605
  code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
1,574✔
606
  if (code) {
1,574✔
607
    mError("failed to clear the checkpoint report info, code:%s", tstrerror(code));
604!
608
  }
609

610
  streamMutexUnlock(&pExecNode->lock);
1,574✔
611
  destroyStreamTaskIter(pIter);
1,574✔
612
}
613

614
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
1,187✔
615
  size_t num = taosArrayGetSize(pList);
1,187✔
616

617
  for (int32_t i = 0; i < num; ++i) {
11,294!
618
    SNodeEntry *pEntry = taosArrayGet(pList, i);
11,294✔
619
    if (pEntry == NULL) {
11,294!
620
      continue;
×
621
    }
622

623
    if (pEntry->nodeId == nodeId) {
11,294✔
624
      return true;
1,187✔
625
    }
626
  }
627

628
  return false;
×
629
}
630

631
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
126✔
632
  SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
126✔
633
  if (pRemovedTasks == NULL) {
126!
634
    return terrno;
×
635
  }
636

637
  int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
126✔
638
  for (int32_t i = 0; i < numOfTask; ++i) {
1,420✔
639
    STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
1,294✔
640
    if (pId == NULL) {
1,294!
641
      continue;
×
642
    }
643

644
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
1,294✔
645
    if (pEntry == NULL) {
1,294!
646
      continue;
×
647
    }
648

649
    if (pEntry->nodeId == SNODE_HANDLE) {
1,294✔
650
      continue;
107✔
651
    }
652

653
    bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
1,187✔
654
    if (!existed) {
1,187!
655
      void *p = taosArrayPush(pRemovedTasks, pId);
×
656
      if (p == NULL) {
×
657
        mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
×
658
      }
659
    }
660
  }
661

662
  removeTasksInBuf(pRemovedTasks, &execInfo);
126✔
663

664
  mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
126✔
665
         (int32_t)taosArrayGetSize(execInfo.pTaskList));
666

667
  removeExpiredNodeInfo(pNodeSnapshot);
126✔
668

669
  taosArrayDestroy(pRemovedTasks);
126✔
670
  return 0;
126✔
671
}
672

673
static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) {
1,260✔
674
  int64_t checkpointId = -1;
1,260✔
675
  int32_t transId = -1;
1,260✔
676
  int32_t taskId = -1;
1,260✔
677

678
  int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList);
1,260✔
679
  if (existed != numOfTasks) {
1,260✔
680
    mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName,
3!
681
           existed, numOfTasks, numOfTasks - existed);
682
    return -1;
3✔
683
  }
684

685
  // acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList
686
  for(int32_t i = 0; i < numOfTasks; ++i) {
6,622✔
687
    STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i);
5,365✔
688
    if (pInfo == NULL) {
5,365!
689
      continue;
×
690
    }
691

692
    if (checkpointId == -1) {
5,365✔
693
      checkpointId = pInfo->checkpointId;
1,257✔
694
      transId = pInfo->transId;
1,257✔
695
      taskId = pInfo->taskId;
1,257✔
696
    } else if (checkpointId != pInfo->checkpointId) {
4,108!
697
      mError("stream:0x%" PRIx64
×
698
             " checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64
699
             ", type 2 taskId:0x%x checkpointId:%" PRId64,
700
             pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId);
701
      return -1;
×
702
    }
703
  }
704

705
  // check for the correct checkpointId for current task info in STaskChkptInfo
706
  STaskChkptInfo  *p = taosArrayGet(pReportInfo->pTaskList, 0);
1,257✔
707
  STaskId id = {.streamId = p->streamId, .taskId = p->taskId};
1,257✔
708
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
1,257✔
709

710
  // cross-check failed, there must be something unknown wrong
711
  SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId));
1,257✔
712
  if (pTransInfo == NULL) {
1,257✔
713
    mWarn("stream:0x%" PRIx64 " no active trans exists for checkpoint transId:%d, it may have been cleared already",
211!
714
           id.streamId, transId);
715

716
    if (pe->checkpointInfo.activeId != 0 && pe->checkpointInfo.activeId != checkpointId) {
211!
717
      mWarn("stream:0x%" PRIx64 " active checkpointId is not equalled to the required, current:%" PRId64
×
718
            ", req:%" PRId64 " recheck next time",
719
            id.streamId, pe->checkpointInfo.activeId, checkpointId);
720
      return -1;
×
721
    } else {
722
      //  do nothing
723
    }
724
  } else {
725
    if (pTransInfo->transId != transId) {
1,046!
726
      mError("stream:0x%" PRIx64
×
727
             " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time",
728
             id.streamId, pTransInfo->transId, transId);
729
      return -1;
×
730
    }
731
  }
732

733
  mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId,
1,257✔
734
         pName, numOfTasks);
735

736
  return TSDB_CODE_SUCCESS;
1,257✔
737
}
738

739
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
4,165✔
740
  SMnode *pMnode = pReq->info.node;
4,165✔
741
  void   *pIter = NULL;
4,165✔
742
  int32_t code = 0;
4,165✔
743
  int32_t lino = 0;
4,165✔
744
  SArray *pDropped = NULL;
4,165✔
745
  int64_t ts = 0;
4,165✔
746

747
  mDebug("start to scan checkpoint report info");
4,165✔
748

749
  streamMutexLock(&execInfo.lock);
4,165✔
750

751
  int32_t num = taosHashGetSize(execInfo.pChkptStreams);
4,165✔
752
  if (num == 0) {
4,165✔
753
    goto _end;
782✔
754
  }
755

756
  pDropped = taosArrayInit(4, sizeof(int64_t));
3,383✔
757
  TSDB_CHECK_NULL(pDropped, code, lino, _end, terrno);
3,383!
758

759
  while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
10,211✔
760
    SChkptReportInfo *px = (SChkptReportInfo *)pIter;
8,085✔
761
    if (taosArrayGetSize(px->pTaskList) == 0) {
8,085✔
762
      continue;
6,825✔
763
    }
764

765
    STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
1,260✔
766
    if (pInfo == NULL) {
1,260!
767
      continue;
×
768
    }
769

770
    SStreamObj *pStream = NULL;
1,260✔
771
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
1,260✔
772
    if (pStream == NULL || code != 0) {
1,260!
773
      mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
×
774
      void *p = taosArrayPush(pDropped, &pInfo->streamId);
×
775
      if (p == NULL) {
×
776
        mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
×
777
      }
778
      continue;
×
779
    }
780

781
    int32_t total = mndGetNumOfStreamTasks(pStream);
1,260✔
782
    int32_t ret = allTasksSendChkptReport(px, total, pStream->name);
1,260✔
783
    if (ret == 0) {
1,260✔
784
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
1,257✔
785
      if (code == 0) {
1,257!
786
        code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
1,257✔
787
        if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {  // remove this entry
1,257!
788
          taosArrayClear(px->pTaskList);
1,257✔
789
          mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64
1,257!
790
                " to %" PRId64,
791
                pInfo->streamId, px->reportChkpt, pInfo->checkpointId);
792
          px->reportChkpt = pInfo->checkpointId;
1,257✔
793
        } else {
794
          mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet",
×
795
                 pInfo->streamId);
796
        }
797

798
        sdbRelease(pMnode->pSdb, pStream);
1,257✔
799
        break;
1,257✔
800
      } else {
801
        mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
×
802
      }
803
    }
804

805
    sdbRelease(pMnode->pSdb, pStream);
3✔
806
  }
807

808
  int32_t size = taosArrayGetSize(pDropped);
3,383✔
809
  if (size > 0) {
3,383!
810
    for (int32_t i = 0; i < size; ++i) {
×
811
      int64_t *pStreamId = (int64_t *)taosArrayGet(pDropped, i);
×
812
      if (pStreamId == NULL) {
×
813
        continue;
×
814
      }
815

816
      code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId));
×
817
      if (code) {
×
818
        mError("failed to remove stream in buf:0x%" PRIx64, *pStreamId);
×
819
      }
820
    }
821

822
    int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
×
823
    mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
×
824
  }
825

826
_end:
4,165✔
827

828
  ts = taosGetTimestampMs();
4,165✔
829
  execInfo.chkptReportScanTs = ts;
4,165✔
830

831
  streamMutexUnlock(&execInfo.lock);
4,165✔
832

833
  if (pDropped != NULL) {
4,165✔
834
    taosArrayDestroy(pDropped);
3,383✔
835
  }
836

837
  mDebug("end to scan checkpoint report info, ts:%"PRId64, ts);
4,165✔
838
  return code;
4,165✔
839
}
840

841
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, SArray* pList) {
29✔
842
  char    msg[128] = {0};
29✔
843
  STrans *pTrans = NULL;
29✔
844

845
  snprintf(msg, tListLen(msg), "set consen-chkpt-id for stream:0x%" PRIx64, pStream->uid);
29✔
846

847
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
29✔
848
  if (pTrans == NULL || code != 0) {
29!
849
    return terrno;
×
850
  }
851

852
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
29✔
853
  if (code) {
29!
854
    return code;
×
855
  }
856

857
  code = mndStreamSetChkptIdAction(pMnode, pTrans, pStream, checkpointId, pList);
29✔
858
  if (code != 0) {
29!
859
    mndTransDrop(pTrans);
×
860
    return code;
×
861
  }
862

863
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
29✔
864
  if (code) {
29!
865
    mndTransDrop(pTrans);
×
866
    return code;
×
867
  }
868

869
  code = mndTransPrepare(pMnode, pTrans);
29✔
870

871
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
29!
872
    mError("trans:%d, failed to prepare set consensus-chkptId trans for stream:0x%" PRId64 " since %s", pTrans->id,
×
873
           pStream->uid, tstrerror(code));
874
    mndTransDrop(pTrans);
×
875
    return code;
×
876
  }
877

878
  mndTransDrop(pTrans);
29✔
879
  return TSDB_CODE_ACTION_IN_PROGRESS;
29✔
880
}
881

882
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
254✔
883
  *pInfo = NULL;
254✔
884

885
  void *px = taosHashGet(pHash, &streamId, sizeof(streamId));
254✔
886
  if (px != NULL) {
254✔
887
    *pInfo = px;
211✔
888
    return 0;
211✔
889
  }
890

891
  SCheckpointConsensusInfo p = {
43✔
892
      .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
43✔
893
      .numOfTasks = numOfTasks,
894
      .streamId = streamId,
895
  };
896

897
  if (p.pTaskList == NULL) {
43!
898
    return terrno;
×
899
  }
900

901
  int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
43✔
902
  if (code == 0) {
43!
903
    void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId));
43✔
904
    *pInfo = pChkptInfo;
43✔
905
  } else {
906
    *pInfo = NULL;
×
907
  }
908

909
  return code;
43✔
910
}
911

912
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
913
// discard the msg may lead to the lost of connections.
914
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) {
254✔
915
  SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
254✔
916
  memcpy(&info.req, pRestoreInfo, sizeof(info.req));
254✔
917

918
  int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList);
254✔
919
  for (int32_t i = 0; i < num; ++i) {
921✔
920
    SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
697✔
921
    if (p == NULL) {
697!
922
      continue;
×
923
    }
924

925
    if (p->req.taskId == info.req.taskId) {
697✔
926
      mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update send reqTs %" PRId64
30✔
927
             "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " term:%d->%d total existed:%d",
928
             pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId,
929
             info.req.checkpointId, p->req.term, info.req.term, num);
930
      p->req.startTs = info.req.startTs;
30✔
931
      p->req.checkpointId = info.req.checkpointId;
30✔
932
      p->req.transId = info.req.transId;
30✔
933
      p->req.nodeId = info.req.nodeId;
30✔
934
      p->req.term = info.req.term;
30✔
935
      return;
30✔
936
    }
937
  }
938

939
  void *p = taosArrayPush(pInfo->pTaskList, &info);
224✔
940
  if (p == NULL) {
224!
941
    mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
×
942
  } else {
943
    num = taosArrayGetSize(pInfo->pTaskList);
224✔
944
    mInfo("s-task:0x%x (vgId:%d) checkpointId:%" PRId64 " term:%d, reqTs:%" PRId64
224!
945
           " added into consensus-checkpointId list, stream:0x%" PRIx64 " waiting tasks:%d, total tasks:%d",
946
           pRestoreInfo->taskId, pRestoreInfo->nodeId, pRestoreInfo->checkpointId, info.req.term,
947
           info.req.startTs, pRestoreInfo->streamId, num, pInfo->numOfTasks);
948
  }
949
}
950

951
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) {
29✔
952
  taosArrayDestroy(pInfo->pTaskList);
29✔
953
  pInfo->pTaskList = NULL;
29✔
954
}
29✔
955

956
int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
1,607✔
957
  int32_t code = 0;
1,607✔
958
  int32_t numOfStreams = taosHashGetSize(pHash);
1,607✔
959
  if (numOfStreams == 0) {
1,607✔
960
    return code;
1,576✔
961
  }
962

963
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
31✔
964
  if (code == 0) {
31!
965
    numOfStreams = taosHashGetSize(pHash);
31✔
966
    mInfo("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
31!
967
  } else {
968
    mError("failed to remove stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
×
969
  }
970

971
  return code;
31✔
972
}
973

974
int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
1,574✔
975
  int32_t code = 0;
1,574✔
976
  int32_t numOfStreams = taosHashGetSize(pHash);
1,574✔
977
  if (numOfStreams == 0) {
1,574✔
978
    return code;
347✔
979
  }
980

981
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
1,227✔
982
  if (code == 0) {
1,227✔
983
    mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
623✔
984
  } else {
985
    mError("failed to remove stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
604!
986
  }
987

988
  return code;
1,227✔
989
}
990

991
int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId) {
×
992
  SChkptReportInfo *pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
×
993
  if (pInfo != NULL) {
×
994
    taosArrayClear(pInfo->pTaskList);
×
995
    mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
×
996
           pInfo->reportChkpt);
997
    return 0;
×
998
  }
999

1000
  return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
1001
}
1002

1003
static void mndShowStreamStatus(char *dst, int8_t status) {
70,219✔
1004
  if (status == STREAM_STATUS__NORMAL) {
70,219!
1005
    tstrncpy(dst, "ready", MND_STREAM_TRIGGER_NAME_SIZE);
70,219✔
1006
  } else if (status == STREAM_STATUS__STOP) {
×
1007
    tstrncpy(dst, "stop", MND_STREAM_TRIGGER_NAME_SIZE);
×
1008
  } else if (status == STREAM_STATUS__FAILED) {
×
1009
    tstrncpy(dst, "failed", MND_STREAM_TRIGGER_NAME_SIZE);
1✔
1010
  } else if (status == STREAM_STATUS__RECOVER) {
×
1011
    tstrncpy(dst, "recover", MND_STREAM_TRIGGER_NAME_SIZE);
×
1012
  } else if (status == STREAM_STATUS__PAUSE) {
×
1013
    tstrncpy(dst, "paused", MND_STREAM_TRIGGER_NAME_SIZE);
2✔
1014
  } else if (status == STREAM_STATUS__INIT) {
×
1015
    tstrncpy(dst, "init", MND_STREAM_TRIGGER_NAME_SIZE);
2✔
1016
  }
1017
}
70,219✔
1018

1019
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
70,040✔
1020
  int8_t trigger = pStream->conf.trigger;
70,040✔
1021
  if (trigger == STREAM_TRIGGER_AT_ONCE) {
70,040✔
1022
    tstrncpy(dst, "at once", MND_STREAM_TRIGGER_NAME_SIZE);
23,747✔
1023
  } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
46,293✔
1024
    tstrncpy(dst, "window close", MND_STREAM_TRIGGER_NAME_SIZE);
23,200✔
1025
  } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
23,093✔
1026
    tstrncpy(dst, "max delay", MND_STREAM_TRIGGER_NAME_SIZE);
23,066✔
1027
  } else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
27!
1028
    tstrncpy(dst, "force window close", MND_STREAM_TRIGGER_NAME_SIZE);
84✔
1029
  }
1030
}
70,040✔
1031

1032
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
482,713✔
1033
  memset(pBuf, 0, bufLen);
482,713✔
1034
  pBuf[2] = '0';
482,713✔
1035
  pBuf[3] = 'x';
482,713✔
1036

1037
  int32_t len = tintToHex(id, &pBuf[4]);
482,713✔
1038
  varDataSetLen(pBuf, len + 2);
484,516✔
1039
}
484,516✔
1040

1041
static int32_t isAllTaskPaused(SStreamObj *pStream, bool *pRes) {
70,045✔
1042
  int32_t          code = TSDB_CODE_SUCCESS;
70,045✔
1043
  int32_t          lino = 0;
70,045✔
1044
  SStreamTaskIter *pIter = NULL;
70,045✔
1045
  bool             isPaused = true;
70,045✔
1046
  int32_t          num = 0;
70,045✔
1047

1048
  taosRLockLatch(&pStream->lock);
70,045✔
1049
  code = createStreamTaskIter(pStream, &pIter);
70,239✔
1050
  TSDB_CHECK_CODE(code, lino, _end);
70,112!
1051

1052
  while (streamTaskIterNextTask(pIter)) {
306,459✔
1053
    SStreamTask *pTask = NULL;
231,306✔
1054
    code = streamTaskIterGetCurrent(pIter, &pTask);
231,306✔
1055
    TSDB_CHECK_CODE(code, lino, _end);
233,853!
1056

1057
    STaskId           id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
233,853✔
1058
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
233,853✔
1059
    if (pe == NULL) {
236,137✔
1060
      continue;
116✔
1061
    }
1062

1063
    if (pe->status != TASK_STATUS__PAUSE) {
236,021!
1064
      isPaused = false;
236,023✔
1065
      mInfo("stream:0x%" PRIx64 " taskId:0x%" PRIx64 ", status:%d not paused, stream status not paused",
236,023✔
1066
            pe->id.streamId, pe->id.taskId, pe->status);
1067
    } else {
1068
      mInfo("stream:0x%" PRIx64 " taskId:0x%" PRIx64 ", status: paused, stream status paused", pe->id.streamId,
×
1069
            pe->id.taskId);
1070
    }
1071

1072
    num += 1;
236,231✔
1073
  }
1074

1075
  if (num > 0) {
70,148✔
1076
    (*pRes) = isPaused;
70,126✔
1077
  }
1078

1079
_end:
22✔
1080
  destroyStreamTaskIter(pIter);
70,148✔
1081
  taosRUnLockLatch(&pStream->lock);
70,238✔
1082
  if (code != TSDB_CODE_SUCCESS) {
70,239!
1083
    mError("error happens when get stream status, lino:%d, code:%s", lino, tstrerror(code));
×
1084
  }
1085
  return code;
70,240✔
1086
}
1087

1088
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
70,217✔
1089
  int32_t code = 0;
70,217✔
1090
  int32_t cols = 0;
70,217✔
1091
  int32_t lino = 0;
70,217✔
1092

1093
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
70,217✔
1094
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
70,217✔
1095
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,212✔
1096
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
70,034!
1097

1098
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
70,034✔
1099
  TSDB_CHECK_CODE(code, lino, _end);
70,107!
1100

1101
  // create time
1102
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,107✔
1103
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
69,952!
1104
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
69,952✔
1105
  TSDB_CHECK_CODE(code, lino, _end);
69,993!
1106

1107
  // stream id
1108
  char buf[128] = {0};
69,993✔
1109
  int64ToHexStr(pStream->uid, buf, tListLen(buf));
69,993✔
1110
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,221✔
1111
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
70,002!
1112
  code = colDataSetVal(pColInfo, numOfRows, buf, false);
70,002✔
1113
  TSDB_CHECK_CODE(code, lino, _end);
70,112!
1114

1115
  // related fill-history stream id
1116
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,112✔
1117
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
69,912!
1118
  if (pStream->hTaskUid != 0) {
69,916!
1119
    int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
×
1120
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1121
  } else {
1122
    code = colDataSetVal(pColInfo, numOfRows, buf, true);
69,916✔
1123
  }
1124
  TSDB_CHECK_CODE(code, lino, _end);
70,024!
1125

1126
  // related fill-history stream id
1127
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
70,024✔
1128
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
70,024✔
1129
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,024✔
1130
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
69,959!
1131
  code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
69,959✔
1132
  TSDB_CHECK_CODE(code, lino, _end);
70,035!
1133

1134
  char status[20 + VARSTR_HEADER_SIZE] = {0};
70,035✔
1135
  char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
70,035✔
1136
  bool isPaused = false;
70,035✔
1137
  code = isAllTaskPaused(pStream, &isPaused);
70,035✔
1138
  TSDB_CHECK_CODE(code, lino, _end);
70,237!
1139

1140
  int8_t streamStatus = atomic_load_8(&pStream->status);
70,237✔
1141
  if (isPaused && pStream->pTaskList != NULL) {
70,224!
1142
    streamStatus = STREAM_STATUS__PAUSE;
2✔
1143
  }
1144
  mndShowStreamStatus(status2, streamStatus);
70,224✔
1145
  STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
70,214✔
1146
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,214✔
1147
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
70,083!
1148

1149
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
70,083✔
1150
  TSDB_CHECK_CODE(code, lino, _end);
70,167!
1151

1152
  char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
70,167✔
1153
  STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
70,167✔
1154
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,175✔
1155
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
70,043!
1156

1157
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
70,043✔
1158
  TSDB_CHECK_CODE(code, lino, _end);
70,141!
1159

1160
  char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
70,141✔
1161
  STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
70,141✔
1162
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,144✔
1163
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
70,014!
1164

1165
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
70,014✔
1166
  TSDB_CHECK_CODE(code, lino, _end);
70,141!
1167

1168
  if (pStream->targetSTbName[0] == 0) {
70,141!
1169
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1170
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1171

1172
    code = colDataSetVal(pColInfo, numOfRows, NULL, true);
×
1173
  } else {
1174
    char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
70,141✔
1175
    STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
70,141✔
1176
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,192✔
1177
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
70,002!
1178

1179
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
70,002✔
1180
  }
1181
  TSDB_CHECK_CODE(code, lino, _end);
70,087!
1182

1183
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,087✔
1184
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
69,928!
1185

1186
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
69,928✔
1187
  TSDB_CHECK_CODE(code, lino, _end);
69,991!
1188

1189
  char trigger[20 + VARSTR_HEADER_SIZE] = {0};
69,991✔
1190
  char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
69,991✔
1191
  mndShowStreamTrigger(trigger2, pStream);
69,991✔
1192
  STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
70,141✔
1193
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,141✔
1194
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
70,001!
1195

1196
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
70,001✔
1197
  TSDB_CHECK_CODE(code, lino, _end);
70,115!
1198

1199
  // sink_quota
1200
  char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
70,115✔
1201
  sinkQuota[0] = '0';
70,115✔
1202
  char dstStr[20] = {0};
70,115✔
1203
  STR_TO_VARSTR(dstStr, sinkQuota)
70,115✔
1204
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,115✔
1205
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
70,012!
1206

1207
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
70,012✔
1208
  TSDB_CHECK_CODE(code, lino, _end);
70,101!
1209

1210
  // checkpoint interval
1211
  char tmp[20 + VARSTR_HEADER_SIZE] = {0};
70,101✔
1212
  (void)tsnprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%d sec", tsStreamCheckpointInterval);
70,101✔
1213
  varDataSetLen(tmp, strlen(varDataVal(tmp)));
70,216✔
1214

1215
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,216✔
1216
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
69,973!
1217

1218
  code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
69,973✔
1219
  TSDB_CHECK_CODE(code, lino, _end);
70,132!
1220

1221
  // checkpoint backup type
1222
  char backup[20 + VARSTR_HEADER_SIZE] = {0};
70,132✔
1223
  STR_TO_VARSTR(backup, "none")
70,132✔
1224
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,132✔
1225
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
69,934!
1226

1227
  code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
69,934✔
1228
  TSDB_CHECK_CODE(code, lino, _end);
70,106!
1229

1230
  // history scan idle
1231
  char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
70,106✔
1232
  tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
70,106✔
1233

1234
  memset(dstStr, 0, tListLen(dstStr));
70,106✔
1235
  STR_TO_VARSTR(dstStr, scanHistoryIdle)
70,106✔
1236
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,106✔
1237
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
69,968!
1238

1239
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
69,968✔
1240
  TSDB_CHECK_CODE(code, lino, _end);
70,123!
1241

1242
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
70,123✔
1243
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
69,891!
1244
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
70,122✔
1245
  if (streamStatus == STREAM_STATUS__FAILED){
70,122✔
1246
    STR_TO_VARSTR(msg, pStream->reserve)
1✔
1247
  } else {
1248
    STR_TO_VARSTR(msg, " ")
70,121✔
1249
  }
1250
  code = colDataSetVal(pColInfo, numOfRows, (const char *)msg, false);
70,122✔
1251

1252
_end:
70,088✔
1253
  if (code) {
70,088!
1254
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1255
  }
1256
  return code;
70,090✔
1257
}
1258

1259
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows,
411,431✔
1260
                              int32_t precision) {
1261
  SColumnInfoData *pColInfo = NULL;
411,431✔
1262
  int32_t          cols = 0;
411,431✔
1263
  int32_t          code = 0;
411,431✔
1264
  int32_t          lino = 0;
411,431✔
1265

1266
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
411,431✔
1267

1268
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
411,431✔
1269
  if (pe == NULL) {
413,233!
1270
    mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
×
1271
           " no valid status/stage info",
1272
           id.taskId, pStream->name, pStream->uid, pStream->createTime);
1273
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1274
  }
1275

1276
  // stream name
1277
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
413,233✔
1278
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
413,233✔
1279

1280
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
413,255✔
1281
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
411,770!
1282

1283
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
411,770✔
1284
  TSDB_CHECK_CODE(code, lino, _end);
411,783!
1285

1286
  // task id
1287
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,783✔
1288
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
409,912!
1289

1290
  char idstr[128] = {0};
409,912✔
1291
  int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
409,912✔
1292
  code = colDataSetVal(pColInfo, numOfRows, idstr, false);
413,331✔
1293
  TSDB_CHECK_CODE(code, lino, _end);
411,908!
1294

1295
  // node type
1296
  char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
411,908✔
1297
  varDataSetLen(nodeType, 5);
411,908✔
1298
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,908✔
1299
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
410,184!
1300

1301
  if (pTask->info.nodeId > 0) {
410,472✔
1302
    memcpy(varDataVal(nodeType), "vnode", 5);
374,402✔
1303
  } else {
1304
    memcpy(varDataVal(nodeType), "snode", 5);
36,070✔
1305
  }
1306
  code = colDataSetVal(pColInfo, numOfRows, nodeType, false);
410,472✔
1307
  TSDB_CHECK_CODE(code, lino, _end);
411,683!
1308

1309
  // node id
1310
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,683✔
1311
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
409,947!
1312

1313
  int64_t nodeId = TMAX(pTask->info.nodeId, 0);
409,947✔
1314
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
409,947✔
1315
  TSDB_CHECK_CODE(code, lino, _end);
410,199!
1316

1317
  // level
1318
  char level[20 + VARSTR_HEADER_SIZE] = {0};
410,199✔
1319
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
410,199✔
1320
    STR_WITH_SIZE_TO_VARSTR(level, "source", 6);
209,696✔
1321
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
200,503✔
1322
    STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
37,577✔
1323
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
162,926!
1324
    STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
164,743✔
1325
  } else if (pTask->info.taskLevel == TASK_LEVEL__MERGE) {
×
1326
    STR_WITH_SIZE_TO_VARSTR(level, "merge", 5);
×
1327
  }
1328

1329
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
410,199✔
1330
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
409,698!
1331

1332
  code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
409,698✔
1333
  TSDB_CHECK_CODE(code, lino, _end);
411,537!
1334

1335
  // status
1336
  char status[20 + VARSTR_HEADER_SIZE] = {0};
411,537✔
1337

1338
  const char *pStatus = streamTaskGetStatusStr(pe->status);
411,537✔
1339
  STR_TO_VARSTR(status, pStatus);
411,809✔
1340

1341
  // status
1342
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,809✔
1343
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
410,686!
1344

1345
  code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
410,686✔
1346
  TSDB_CHECK_CODE(code, lino, _end);
411,393!
1347

1348
  // stage
1349
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,393✔
1350
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
409,847!
1351

1352
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
409,847✔
1353
  TSDB_CHECK_CODE(code, lino, _end);
410,375!
1354

1355
  // input queue
1356
  char        vbuf[TSDB_STREAM_NOTIFY_STAT_LEN + 2] = {0};
410,375✔
1357
  char        buf[TSDB_STREAM_NOTIFY_STAT_LEN] = {0};
410,375✔
1358
  const char *queueInfoStr = "%4.2f MiB (%6.2f%)";
410,375✔
1359
  snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate);
410,375✔
1360
  STR_TO_VARSTR(vbuf, buf);
410,375✔
1361

1362
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
410,375✔
1363
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
410,922!
1364

1365
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
410,922✔
1366
  TSDB_CHECK_CODE(code, lino, _end);
411,574!
1367

1368
  // input total
1369
  const char *formatTotalMb = "%7.2f MiB";
411,574✔
1370
  const char *formatTotalGb = "%7.2f GiB";
411,574✔
1371
  if (pe->procsTotal < 1024) {
411,574!
1372
    snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal);
411,706✔
1373
  } else {
1374
    snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024);
×
1375
  }
1376

1377
  memset(vbuf, 0, tListLen(vbuf));
411,574✔
1378
  STR_TO_VARSTR(vbuf, buf);
411,574✔
1379

1380
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,574✔
1381
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
410,979!
1382

1383
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
410,979✔
1384
  TSDB_CHECK_CODE(code, lino, _end);
411,655!
1385

1386
  // process throughput
1387
  const char *formatKb = "%7.2f KiB/s";
411,655✔
1388
  const char *formatMb = "%7.2f MiB/s";
411,655✔
1389
  if (pe->procsThroughput < 1024) {
411,655✔
1390
    snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput);
411,567✔
1391
  } else {
1392
    snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024);
88✔
1393
  }
1394

1395
  memset(vbuf, 0, tListLen(vbuf));
411,655✔
1396
  STR_TO_VARSTR(vbuf, buf);
411,655✔
1397

1398
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,655✔
1399
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
411,012!
1400

1401
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
411,012✔
1402
  TSDB_CHECK_CODE(code, lino, _end);
411,465!
1403

1404
  // output total
1405
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,465✔
1406
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
409,654!
1407

1408
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
410,957✔
1409
    colDataSetNULL(pColInfo, numOfRows);
164,359!
1410
  } else {
1411
    (void)tsnprintf(buf, sizeof(buf), formatTotalMb, pe->outputTotal);
246,598✔
1412
    memset(vbuf, 0, tListLen(vbuf));
248,031✔
1413
    STR_TO_VARSTR(vbuf, buf);
248,031✔
1414

1415
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
248,031✔
1416
    TSDB_CHECK_CODE(code, lino, _end);
247,539!
1417
  }
1418

1419
  // output throughput
1420
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,898✔
1421
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
409,639!
1422

1423
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
410,981✔
1424
    colDataSetNULL(pColInfo, numOfRows);
164,213!
1425
  } else {
1426
    if (pe->outputThroughput < 1024) {
246,768✔
1427
      snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput);
246,663✔
1428
    } else {
1429
      snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024);
105✔
1430
    }
1431

1432
    memset(vbuf, 0, tListLen(vbuf));
246,768✔
1433
    STR_TO_VARSTR(vbuf, buf);
246,768✔
1434

1435
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
246,768✔
1436
    TSDB_CHECK_CODE(code, lino, _end);
247,519!
1437
  }
1438
  // info
1439
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
411,732✔
1440
    const char *sinkStr = "%.2f MiB";
164,255✔
1441
    snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
164,255✔
1442
  } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {  // offset info
247,477✔
1443
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
210,021✔
1444
      int32_t ret = taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision);
8,495✔
1445
      if (ret != 0) {
8,495!
1446
        mError("failed to format processed timewindow, skey:%" PRId64, pe->processedVer);
×
1447
        memset(buf, 0, tListLen(buf));
×
1448
      }
1449
    } else {
1450
      const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
201,526✔
1451
      snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
201,526✔
1452
    }
1453
  } else {
1454
    memset(buf, 0, tListLen(buf));
37,456✔
1455
  }
1456

1457
  STR_TO_VARSTR(vbuf, buf);
411,732✔
1458

1459
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,732✔
1460
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
410,809!
1461

1462
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
410,809✔
1463
  TSDB_CHECK_CODE(code, lino, _end);
411,400!
1464

1465
  // start_time
1466
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
411,400✔
1467
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
409,671!
1468

1469
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false);
409,671✔
1470
  TSDB_CHECK_CODE(code, lino, _end);
410,026!
1471

1472
  // start id
1473
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
410,026✔
1474
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
408,589!
1475

1476
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false);
408,589✔
1477
  TSDB_CHECK_CODE(code, lino, _end);
409,539!
1478

1479
  // start ver
1480
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
409,539✔
1481
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
408,386!
1482

1483
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false);
408,386✔
1484
  TSDB_CHECK_CODE(code, lino, _end);
409,444!
1485

1486
  // checkpoint time
1487
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
409,444✔
1488
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
408,357!
1489

1490
  if (pe->checkpointInfo.latestTime != 0) {
408,374✔
1491
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
358,099✔
1492
  } else {
1493
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
50,275✔
1494
  }
1495
  TSDB_CHECK_CODE(code, lino, _end);
409,472!
1496

1497
  // checkpoint_id
1498
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
409,472✔
1499
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
408,291!
1500

1501
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false);
408,291✔
1502
  TSDB_CHECK_CODE(code, lino, _end);
409,660!
1503

1504
  // checkpoint version
1505
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
409,660✔
1506
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
408,475!
1507

1508
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false);
408,475✔
1509
  TSDB_CHECK_CODE(code, lino, _end);
409,423!
1510

1511
  // checkpoint size
1512
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
409,423✔
1513
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
407,833!
1514

1515
  colDataSetNULL(pColInfo, numOfRows);
407,852!
1516

1517
  // checkpoint backup status
1518
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
407,852✔
1519
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
407,073!
1520

1521
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
407,073✔
1522
  TSDB_CHECK_CODE(code, lino, _end);
409,303!
1523

1524
  // ds_err_info
1525
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
409,303✔
1526
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
407,887!
1527

1528
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
407,887✔
1529
  TSDB_CHECK_CODE(code, lino, _end);
409,666!
1530

1531
  // history_task_id
1532
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
409,666✔
1533
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
407,390!
1534

1535
  if (pe->hTaskId != 0) {
407,455✔
1536
    int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
950✔
1537
    code = colDataSetVal(pColInfo, numOfRows, idstr, false);
950✔
1538
  } else {
1539
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
406,505✔
1540
  }
1541
  TSDB_CHECK_CODE(code, lino, _end);
410,037!
1542

1543
  // history_task_status
1544
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
410,037✔
1545
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
408,163!
1546

1547
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
408,163✔
1548
  TSDB_CHECK_CODE(code, lino, _end);
410,699!
1549

1550
  // notify_event_stat
1551
  int32_t offset =0;
410,699✔
1552
  if (pe->notifyEventStat.notifyEventAddTimes > 0) {
410,699!
1553
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Add %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1554
                        pe->notifyEventStat.notifyEventAddTimes, pe->notifyEventStat.notifyEventAddElems,
1555
                        pe->notifyEventStat.notifyEventAddCostSec);
1556
  }
1557
  if (pe->notifyEventStat.notifyEventPushTimes > 0) {
410,699!
1558
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Push %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1559
                        pe->notifyEventStat.notifyEventPushTimes, pe->notifyEventStat.notifyEventPushElems,
1560
                        pe->notifyEventStat.notifyEventPushCostSec);
1561
  }
1562
  if (pe->notifyEventStat.notifyEventPackTimes > 0) {
410,699!
1563
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Pack %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1564
                        pe->notifyEventStat.notifyEventPackTimes, pe->notifyEventStat.notifyEventPackElems,
1565
                        pe->notifyEventStat.notifyEventPackCostSec);
1566
  }
1567
  if (pe->notifyEventStat.notifyEventSendTimes > 0) {
410,699!
1568
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Send %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1569
                        pe->notifyEventStat.notifyEventSendTimes, pe->notifyEventStat.notifyEventSendElems,
1570
                        pe->notifyEventStat.notifyEventSendCostSec);
1571
  }
1572
  if (pe->notifyEventStat.notifyEventHoldElems > 0) {
410,699!
1573
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "[Hold %" PRId64 " elems] ",
×
1574
                        pe->notifyEventStat.notifyEventHoldElems);
1575
  }
1576
  TSDB_CHECK_CONDITION(offset < sizeof(buf), code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
410,699!
1577
  buf[offset] = '\0';
410,699✔
1578

1579
  STR_TO_VARSTR(vbuf, buf);
410,699✔
1580

1581
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
410,699✔
1582
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
410,789!
1583

1584
  if (offset == 0) {
410,789!
1585
    colDataSetNULL(pColInfo, numOfRows);
410,789!
1586
  } else {
1587
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
1588
    TSDB_CHECK_CODE(code, lino, _end);
×
1589
  }
1590

1591
_end:
×
1592
  if (code) {
410,789!
1593
    mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code));
×
1594
  }
1595
  return code;
410,786✔
1596
}
1597

1598
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
7,581✔
1599
  const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
7,581✔
1600
  const SEp *p = GET_ACTIVE_EP(pCurrent);
7,581✔
1601

1602
  if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
7,581!
1603
    return false;
7,581✔
1604
  }
1605
  return true;
×
1606
}
1607

1608
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo) {
1,567✔
1609
  if (pInfo != NULL) {
1,567!
1610
    taosArrayDestroy(pInfo->pUpdateNodeList);
1,567✔
1611
    taosHashCleanup(pInfo->pDBMap);
1,567✔
1612
  }
1613
}
1,567✔
1614

1615
// 1. increase the replica does not affect the stream process.
1616
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
1617
// tasks on the will be removed replica.
1618
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
1619
// will handle it as mentioned in 1 & 2 items.
1620
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
1,567✔
1621
                               SVgroupChangeInfo *pInfo) {
1622
  int32_t code = 0;
1,567✔
1623
  int32_t lino = 0;
1,567✔
1624

1625
  if (pInfo == NULL) {
1,567!
1626
    return TSDB_CODE_INVALID_PARA;
×
1627
  }
1628

1629
  pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo));
1,567✔
1630
  pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
1,567✔
1631

1632
  if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
1,567!
1633
    mndDestroyVgroupChangeInfo(pInfo);
×
1634
    TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
×
1635
  }
1636

1637
  int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
1,567✔
1638
  for (int32_t i = 0; i < numOfNodes; ++i) {
15,258✔
1639
    SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
13,691✔
1640
    if (pPrevEntry == NULL) {
13,691!
1641
      continue;
×
1642
    }
1643

1644
    int32_t num = taosArrayGetSize(pNodeList);
13,691✔
1645
    for (int32_t j = 0; j < num; ++j) {
124,423✔
1646
      SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
118,323✔
1647
      if (pCurrent == NULL) {
118,323!
1648
        continue;
×
1649
      }
1650

1651
      if (pCurrent->nodeId == pPrevEntry->nodeId) {
118,323✔
1652
        if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
7,591!
1653
          const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
10✔
1654

1655
          char buf[256] = {0};
10✔
1656
          code = epsetToStr(&pCurrent->epset, buf, tListLen(buf));  // ignore this error
10✔
1657
          if (code) {
10!
1658
            mError("failed to convert epset string, code:%s", tstrerror(code));
×
1659
            TSDB_CHECK_CODE(code, lino, _err);
×
1660
          }
1661

1662
          mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
10✔
1663
                 pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
1664

1665
          SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
10✔
1666
          epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
10✔
1667
          epsetAssign(&updateInfo.newEp, &pCurrent->epset);
10✔
1668

1669
          void *p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
10✔
1670
          TSDB_CHECK_NULL(p, code, lino, _err, terrno);
10!
1671
        }
1672

1673
        // todo handle the snode info
1674
        if (pCurrent->nodeId != SNODE_HANDLE) {
7,591✔
1675
          SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
6,530✔
1676
          code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
6,530✔
1677
          mndReleaseVgroup(pMnode, pVgroup);
6,530✔
1678
          TSDB_CHECK_CODE(code, lino, _err);
6,530!
1679
        }
1680

1681
        break;
7,591✔
1682
      }
1683
    }
1684
  }
1685

1686
  return code;
1,567✔
1687

1688
_err:
×
1689
  mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
×
1690
  mndDestroyVgroupChangeInfo(pInfo);
×
1691
  return code;
×
1692
}
1693

1694
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
2,829✔
1695
  bool              allReady = false;
2,829✔
1696
  bool              nodeUpdated = false;
2,829✔
1697
  SVgroupChangeInfo changeInfo = {0};
2,829✔
1698

1699
  int32_t numOfNodes = extractStreamNodeList(pMnode);
2,829✔
1700

1701
  if (numOfNodes == 0) {
2,829✔
1702
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
1,224✔
1703
    execInfo.ts = taosGetTimestampSec();
1,224✔
1704
    return false;
1,224✔
1705
  }
1706

1707
  for (int32_t i = 0; i < numOfNodes; ++i) {
14,853✔
1708
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
13,389✔
1709
    if (pNodeEntry == NULL) {
13,389!
1710
      continue;
×
1711
    }
1712

1713
    if (pNodeEntry->stageUpdated) {
13,389✔
1714
      mDebug("stream task not ready due to node update detected, checkpoint not issued");
141✔
1715
      return true;
141✔
1716
    }
1717
  }
1718

1719
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot, NULL);
1,464✔
1720
  if (code) {
1,464!
1721
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
1722
  }
1723

1724
  if (!allReady) {
1,464✔
1725
    mWarn("not all vnodes ready, quit from vnodes status check");
23!
1726
    return true;
23✔
1727
  }
1728

1729
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
1,441✔
1730
  if (code) {
1,441!
1731
    nodeUpdated = false;
×
1732
  } else {
1733
    nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
1,441✔
1734
    if (nodeUpdated) {
1,441!
1735
      mDebug("stream tasks not ready due to node update");
×
1736
    }
1737
  }
1738

1739
  mndDestroyVgroupChangeInfo(&changeInfo);
1,441✔
1740
  return nodeUpdated;
1,441✔
1741
}
1742

1743
// check if the node update happens or not
1744
bool mndStreamNodeIsUpdated(SMnode *pMnode) {
2,829✔
1745
  SArray *pNodeSnapshot = NULL;
2,829✔
1746

1747
  streamMutexLock(&execInfo.lock);
2,829✔
1748
  bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
2,829✔
1749
  streamMutexUnlock(&execInfo.lock);
2,829✔
1750

1751
  taosArrayDestroy(pNodeSnapshot);
2,829✔
1752
  return updated;
2,829✔
1753
}
1754

1755
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
2,023✔
1756
  SSdb      *pSdb = pMnode->pSdb;
2,023✔
1757
  void      *pIter = NULL;
2,023✔
1758
  SSnodeObj *pObj = NULL;
2,023✔
1759

1760
  if (pSrcDb->cfg.replications == 1) {
2,023✔
1761
    return TSDB_CODE_SUCCESS;
2,020✔
1762
  } else {
1763
    while (1) {
1764
      pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
3✔
1765
      if (pIter == NULL) {
3✔
1766
        break;
2✔
1767
      }
1768

1769
      sdbRelease(pSdb, pObj);
1✔
1770
      sdbCancelFetch(pSdb, pIter);
1✔
1771
      return TSDB_CODE_SUCCESS;
1✔
1772
    }
1773

1774
    mError("snode not existed when trying to create stream in db with multiple replica");
2!
1775
    return TSDB_CODE_SNODE_NOT_DEPLOYED;
2✔
1776
  }
1777
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc