• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3552

11 Dec 2024 06:08AM UTC coverage: 62.526% (+0.7%) from 61.798%
#3552

push

travis-ci

web-flow
Merge pull request #29092 from taosdata/fix/3.0/TD-33146

fix:[TD-33146] stmt_get_tag_fields return error code

124833 of 255773 branches covered (48.81%)

Branch coverage included in aggregate %.

209830 of 279467 relevant lines covered (75.08%)

19111707.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.73
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
struct SStreamTaskIter {
25
  SStreamObj  *pStream;
26
  int32_t      level;
27
  int32_t      ordinalIndex;
28
  int32_t      totalLevel;
29
  SStreamTask *pTask;
30
};
31

32
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
33

34
int32_t createStreamTaskIter(SStreamObj* pStream, SStreamTaskIter** pIter) {
66,269✔
35
  *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
66,269✔
36
  if (*pIter == NULL) {
66,268!
37
    return terrno;
×
38
  }
39

40
  (*pIter)->level = -1;
66,268✔
41
  (*pIter)->ordinalIndex = 0;
66,268✔
42
  (*pIter)->pStream = pStream;
66,268✔
43
  (*pIter)->totalLevel = taosArrayGetSize(pStream->tasks);
66,268✔
44
  (*pIter)->pTask = NULL;
66,266✔
45

46
  return 0;
66,266✔
47
}
48

49
bool streamTaskIterNextTask(SStreamTaskIter* pIter) {
307,679✔
50
  if (pIter->level >= pIter->totalLevel) {
307,679!
51
    pIter->pTask = NULL;
×
52
    return false;
×
53
  }
54

55
  if (pIter->level == -1) {
307,679✔
56
    pIter->level += 1;
66,266✔
57
  }
58

59
  while(pIter->level < pIter->totalLevel) {
438,215✔
60
    SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
372,089✔
61
    if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
371,908✔
62
      pIter->level += 1;
130,536✔
63
      pIter->ordinalIndex = 0;
130,536✔
64
      pIter->pTask = NULL;
130,536✔
65
      continue;
130,536✔
66
    }
67

68
    pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
241,899✔
69
    pIter->ordinalIndex += 1;
241,942✔
70
    return true;
241,942✔
71
  }
72

73
  pIter->pTask = NULL;
66,126✔
74
  return false;
66,126✔
75
}
76

77
int32_t streamTaskIterGetCurrent(SStreamTaskIter* pIter, SStreamTask** pTask) {
241,993✔
78
  if (pTask) {
241,993!
79
    *pTask = pIter->pTask;
241,998✔
80
    if (*pTask != NULL) {
241,998!
81
      return TSDB_CODE_SUCCESS;
242,008✔
82
    }
83
  }
84

85
  return TSDB_CODE_INVALID_PARA;
×
86
}
87

88
void destroyStreamTaskIter(SStreamTaskIter* pIter) {
66,090✔
89
  taosMemoryFree(pIter);
66,090✔
90
}
66,157✔
91

92
static bool checkStatusForEachReplica(SVgObj *pVgroup) {
242,938✔
93
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
492,306✔
94
    if (!pVgroup->vnodeGid[i].syncRestore) {
250,544✔
95
      mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
1,171!
96
      return false;
1,171✔
97
    }
98

99
    ESyncState state = pVgroup->vnodeGid[i].syncState;
249,373✔
100
    if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
249,373!
101
        state == TAOS_SYNC_STATE_CANDIDATE) {
102
      mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
5!
103
            state);
104
      return false;
5✔
105
    }
106
  }
107

108
  return true;
241,762✔
109
}
110

111
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
14,884✔
112
  SSdb     *pSdb = pMnode->pSdb;
14,884✔
113
  void     *pIter = NULL;
14,884✔
114
  SVgObj   *pVgroup = NULL;
14,884✔
115
  int32_t   code = 0;
14,884✔
116
  SArray   *pVgroupList = NULL;
14,884✔
117
  SHashObj *pHash = NULL;
14,884✔
118

119
  pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
14,884✔
120
  if (pVgroupList == NULL) {
14,884!
121
    mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
×
122
    code = terrno;
×
123
    goto _err;
×
124
  }
125

126
  pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
14,884✔
127
  if (pHash == NULL) {
14,884!
128
    mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
×
129
    code = terrno;
×
130
    goto _err;
×
131
  }
132

133
  *allReady = true;
14,884✔
134

135
  while (1) {
245,187✔
136
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
260,071✔
137
    if (pIter == NULL) {
260,071✔
138
      break;
14,884✔
139
    }
140

141
    SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
245,187✔
142
    entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
245,187✔
143

144
    int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
245,187✔
145
    if (pReplica == NULL) {  // not exist, add it into hash map
245,187✔
146
      code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
121,559✔
147
      if (code) {
121,559!
148
        mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
×
149
        sdbRelease(pSdb, pVgroup);
×
150
        sdbCancelFetch(pSdb, pIter);
×
151
        goto _err;  // take snapshot failed, and not all ready
×
152
      }
153
    } else {
154
      if (*pReplica != pVgroup->replica) {
123,628✔
155
        mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
798!
156
              pVgroup->vgId, pVgroup->replica, *pReplica);
157
        *allReady = false;  // task snap success, but not all ready
798✔
158
      }
159
    }
160

161
    // if not all ready till now, no need to check the remaining vgroups.
162
    // but still we need to put the info of the existed vgroups into the snapshot list
163
    if (*allReady) {
245,187✔
164
      *allReady = checkStatusForEachReplica(pVgroup);
242,938✔
165
    }
166

167
    char buf[256] = {0};
245,187✔
168
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
245,187✔
169
    if (code != 0) {  // print error and continue
245,187!
170
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
171
    }
172

173
    void *p = taosArrayPush(pVgroupList, &entry);
245,187✔
174
    if (p == NULL) {
245,187!
175
      mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
×
176
      code = terrno;
×
177
      sdbRelease(pSdb, pVgroup);
×
178
      sdbCancelFetch(pSdb, pIter);
×
179
      goto _err;
×
180
    } else {
181
      mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
245,187✔
182
    }
183

184
    sdbRelease(pSdb, pVgroup);
245,187✔
185
  }
186

187
  SSnodeObj *pObj = NULL;
14,884✔
188
  while (1) {
6,192✔
189
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
21,076✔
190
    if (pIter == NULL) {
21,076✔
191
      break;
14,884✔
192
    }
193

194
    SNodeEntry entry = {.nodeId = SNODE_HANDLE};
6,192✔
195
    code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
6,192✔
196
    if (code) {
6,192!
197
      sdbRelease(pSdb, pObj);
×
198
      sdbCancelFetch(pSdb, pIter);
×
199
      mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
×
200
      goto _err;
×
201
    }
202

203
    char buf[256] = {0};
6,192✔
204
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
6,192✔
205
    if (code != 0) {  // print error and continue
6,192!
206
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
207
    }
208

209
    void *p = taosArrayPush(pVgroupList, &entry);
6,192✔
210
    if (p == NULL) {
6,192!
211
      code = terrno;
×
212
      sdbRelease(pSdb, pObj);
×
213
      sdbCancelFetch(pSdb, pIter);
×
214
      mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
×
215
      goto _err;
×
216
    } else {
217
      mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
6,192✔
218
    }
219

220
    sdbRelease(pSdb, pObj);
6,192✔
221
  }
222

223
  *pList = pVgroupList;
14,884✔
224
  taosHashCleanup(pHash);
14,884✔
225
  return code;
14,884✔
226

227
_err:
×
228
  *allReady = false;
×
229
  taosArrayDestroy(pVgroupList);
×
230
  taosHashCleanup(pHash);
×
231

232
  return code;
×
233
}
234

235
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
12,351✔
236
  void *pIter = NULL;
12,351✔
237
  SSdb *pSdb = pMnode->pSdb;
12,351✔
238
  *pStream = NULL;
12,351✔
239

240
  SStreamObj *p = NULL;
12,351✔
241
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
20,755✔
242
    if (p->uid == streamId) {
20,751✔
243
      sdbCancelFetch(pSdb, pIter);
12,347✔
244
      *pStream = p;
12,347✔
245
      return TSDB_CODE_SUCCESS;
12,347✔
246
    }
247
    sdbRelease(pSdb, p);
8,404✔
248
  }
249

250
  return TSDB_CODE_STREAM_TASK_NOT_EXIST;
4✔
251
}
252

253
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
×
254
  STrans *pTrans = mndAcquireTrans(pMnode, transId);
×
255
  if (pTrans != NULL) {
×
256
    mInfo("kill active transId:%d in Db:%s", transId, pDbName);
×
257
    int32_t code = mndKillTrans(pMnode, pTrans);
×
258
    mndReleaseTrans(pMnode, pTrans);
×
259
    if (code) {
×
260
      mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
×
261
    }
262
  } else {
263
    mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
×
264
  }
265
}
×
266

267
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
18,336✔
268
  *hasEpset = false;
18,336✔
269

270
  pEpSet->numOfEps = 0;
18,336✔
271
  if (nodeId == SNODE_HANDLE) {
18,336✔
272
    SSnodeObj *pObj = NULL;
293✔
273
    void      *pIter = NULL;
293✔
274

275
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
293✔
276
    if (pIter != NULL) {
293!
277
      int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
293✔
278
      sdbRelease(pMnode->pSdb, pObj);
293✔
279
      sdbCancelFetch(pMnode->pSdb, pIter);
293✔
280
      if (code) {
293!
281
        *hasEpset = false;
×
282
        mError("failed to set epset");
×
283
      } else {
284
        *hasEpset = true;
293✔
285
      }
286
      return code;
293✔
287
    } else {
288
      mError("failed to acquire snode epset");
×
289
      return TSDB_CODE_INVALID_PARA;
×
290
    }
291
  } else {
292
    SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
18,043✔
293
    if (pVgObj != NULL) {
18,043✔
294
      SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
18,042✔
295
      mndReleaseVgroup(pMnode, pVgObj);
18,042✔
296

297
      epsetAssign(pEpSet, &epset);
18,042✔
298
      *hasEpset = true;
18,042✔
299
      return TSDB_CODE_SUCCESS;
18,042✔
300
    } else {
301
      mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
1!
302
      return TSDB_CODE_SUCCESS;
1✔
303
    }
304
  }
305
}
306

307
int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
136✔
308
  *pTask = NULL;
136✔
309

310
  SStreamTask     *p = NULL;
136✔
311
  SStreamTaskIter *pIter = NULL;
136✔
312
  int32_t          code = createStreamTaskIter(pStream, &pIter);
136✔
313
  if (code) {
136!
314
    mError("failed to create stream task iter:%s", pStream->name);
×
315
    return code;
×
316
  }
317

318
  while (streamTaskIterNextTask(pIter)) {
464!
319
    code = streamTaskIterGetCurrent(pIter, &p);
464✔
320
    if (code) {
464!
321
      continue;
×
322
    }
323

324
    if (p->id.taskId == pId->taskId) {
464✔
325
      destroyStreamTaskIter(pIter);
136✔
326
      *pTask = p;
136✔
327
      return 0;
136✔
328
    }
329
  }
330

331
  destroyStreamTaskIter(pIter);
×
332
  return TSDB_CODE_FAILED;
×
333
}
334

335
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
70,090✔
336
  int32_t num = 0;
70,090✔
337
  for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
210,420✔
338
    SArray* pLevel = taosArrayGetP(pStream->tasks, i);
140,330✔
339
    num += taosArrayGetSize(pLevel);
140,342✔
340
  }
341

342
  return num;
70,060✔
343
}
344

345
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
114✔
346
  SSdb   *pSdb = pMnode->pSdb;
114✔
347
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
114✔
348
  if (pDb == NULL) {
114!
349
    TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
×
350
  }
351

352
  int32_t numOfStreams = 0;
114✔
353
  void   *pIter = NULL;
114✔
354
  while (1) {
4✔
355
    SStreamObj *pStream = NULL;
118✔
356
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
118✔
357
    if (pIter == NULL) break;
118✔
358

359
    if (pStream->sourceDbUid == pDb->uid) {
4!
360
      numOfStreams++;
4✔
361
    }
362

363
    sdbRelease(pSdb, pStream);
4✔
364
  }
365

366
  *pNumOfStreams = numOfStreams;
114✔
367
  mndReleaseDb(pMnode, pDb);
114✔
368
  return 0;
114✔
369
}
370

371
static void freeTaskList(void* param) {
1,525✔
372
  SArray** pList = (SArray **)param;
1,525✔
373
  taosArrayDestroy(*pList);
1,525✔
374
}
1,525✔
375

376
int32_t mndInitExecInfo() {
2,032✔
377
  int32_t code = taosThreadMutexInit(&execInfo.lock, NULL);
2,032✔
378
  if (code) {
2,032!
379
    return code;
×
380
  }
381

382
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
2,032✔
383

384
  execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
2,032✔
385
  execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
2,032✔
386
  execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
2,032✔
387
  execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
2,032✔
388
  execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2,032✔
389
  execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2,032✔
390
  execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
2,032✔
391
  execInfo.pKilledChkptTrans = taosArrayInit(4, sizeof(SStreamTaskResetMsg));
2,032✔
392

393
  if (execInfo.pTaskList == NULL || execInfo.pTaskMap == NULL || execInfo.transMgmt.pDBTrans == NULL ||
2,032!
394
      execInfo.pTransferStateStreams == NULL || execInfo.pChkptStreams == NULL || execInfo.pStreamConsensus == NULL ||
2,032!
395
      execInfo.pNodeList == NULL || execInfo.pKilledChkptTrans == NULL) {
2,032!
396
    mError("failed to initialize the stream runtime env, code:%s", tstrerror(terrno));
×
397
    return terrno;
×
398
  }
399

400
  execInfo.role = NODE_ROLE_UNINIT;
2,032✔
401
  execInfo.switchFromFollower = false;
2,032✔
402

403
  taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
2,032✔
404
  taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
2,032✔
405
  taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
2,032✔
406
  return 0;
2,032✔
407
}
408

409
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
1,250✔
410
  SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
1,250✔
411
  if (pValidList == NULL) {  // not continue
1,250!
412
    return;
×
413
  }
414

415
  int32_t size = taosArrayGetSize(pNodeSnapshot);
1,250✔
416
  int32_t oldSize = taosArrayGetSize(execInfo.pNodeList);
1,250✔
417

418
  for (int32_t i = 0; i < oldSize; ++i) {
8,817✔
419
    SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
7,567✔
420
    if (p == NULL) {
7,567!
421
      continue;
×
422
    }
423

424
    for (int32_t j = 0; j < size; ++j) {
95,882✔
425
      SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
94,853✔
426
      if (pEntry == NULL) {
94,853!
427
        continue;
×
428
      }
429

430
      if (pEntry->nodeId == p->nodeId) {
94,853✔
431
        p->hbTimestamp = pEntry->hbTimestamp;
6,538✔
432

433
        void* px = taosArrayPush(pValidList, p);
6,538✔
434
        if (px == NULL) {
6,538!
435
          mError("failed to put node into list, nodeId:%d", p->nodeId);
×
436
        } else {
437
          mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
6,538✔
438
        }
439
        break;
6,538✔
440
      }
441
    }
442
  }
443

444
  taosArrayDestroy(execInfo.pNodeList);
1,250✔
445
  execInfo.pNodeList = pValidList;
1,250✔
446

447
  mDebug("remain %d valid node entries after clean expired nodes info, prev size:%d",
1,250✔
448
         (int32_t)taosArrayGetSize(pValidList), oldSize);
449
}
450

451
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
7,270✔
452
  void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
7,270✔
453
  if (p == NULL) {
7,270✔
454
    return TSDB_CODE_SUCCESS;
130✔
455
  }
456

457
  int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
7,140✔
458
  if (code) {
7,140!
459
    return code;
×
460
  }
461

462
  for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
20,536!
463
    STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
20,536✔
464
    if (pId == NULL) {
20,536!
465
      continue;
×
466
    }
467

468
    if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
20,536!
469
      taosArrayRemove(pExecNode->pTaskList, k);
7,140✔
470

471
      int32_t num = taosArrayGetSize(pExecNode->pTaskList);
7,140✔
472
      mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
7,140!
473
      break;
7,140✔
474
    }
475
  }
476

477
  return TSDB_CODE_SUCCESS;
7,140✔
478
}
479

480
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) {
2,898✔
481
  for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
2,898!
482
    STaskId *pId = taosArrayGet(pTaskIds, i);
×
483
    if (pId == NULL) {
×
484
      continue;
×
485
    }
486

487
    int32_t code = doRemoveTasks(pExecInfo, pId);
×
488
    if (code) {
×
489
      mError("failed to remove task in buffer list, 0x%"PRIx64, pId->taskId);
×
490
    }
491
  }
492
}
2,898✔
493

494
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,336✔
495
  SStreamTaskIter *pIter = NULL;
1,336✔
496
  streamMutexLock(&pExecNode->lock);
1,336✔
497

498
  // 1. remove task entries
499
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,336✔
500
  if (code) {
1,336!
501
    streamMutexUnlock(&pExecNode->lock);
×
502
    mError("failed to create stream task iter:%s", pStream->name);
×
503
    return;
×
504
  }
505

506
  while (streamTaskIterNextTask(pIter)) {
8,606✔
507
    SStreamTask *pTask = NULL;
7,270✔
508
    code = streamTaskIterGetCurrent(pIter, &pTask);
7,270✔
509
    if (code) {
7,270!
510
      continue;
×
511
    }
512

513
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
7,270✔
514
    code = doRemoveTasks(pExecNode, &id);
7,270✔
515
    if (code) {
7,270!
516
      mError("failed to remove task in buffer list, 0x%"PRIx64, id.taskId);
×
517
    }
518
  }
519

520
  if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) {
1,336!
521
    streamMutexUnlock(&pExecNode->lock);
×
522
    destroyStreamTaskIter(pIter);
×
523
    mError("task map size, task list size, not equal");
×
524
    return;
×
525
  }
526

527
  // 2. remove stream entry in consensus hash table and checkpoint-report hash table
528
  code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
1,336✔
529
  if (code) {
1,336!
530
    mError("failed to clear consensus checkpointId, code:%s", tstrerror(code));
×
531
  }
532

533
  code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
1,336✔
534
  if (code) {
1,336✔
535
    mError("failed to clear the checkpoint report info, code:%s", tstrerror(code));
399!
536
  }
537

538
  streamMutexUnlock(&pExecNode->lock);
1,336✔
539
  destroyStreamTaskIter(pIter);
1,336✔
540
}
541

542
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
12,468✔
543
  size_t num = taosArrayGetSize(pList);
12,468✔
544

545
  for (int32_t i = 0; i < num; ++i) {
103,032!
546
    SNodeEntry *pEntry = taosArrayGet(pList, i);
103,032✔
547
    if (pEntry == NULL) {
103,032!
548
      continue;
×
549
    }
550

551
    if (pEntry->nodeId == nodeId) {
103,032✔
552
      return true;
12,468✔
553
    }
554
  }
555

556
  return false;
×
557
}
558

559
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
1,250✔
560
  SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
1,250✔
561
  if (pRemovedTasks == NULL) {
1,250!
562
    return terrno;
×
563
  }
564

565
  int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
1,250✔
566
  for (int32_t i = 0; i < numOfTask; ++i) {
14,561✔
567
    STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
13,311✔
568
    if (pId == NULL) {
13,311!
569
      continue;
×
570
    }
571

572
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
13,311✔
573
    if (pEntry == NULL) {
13,311!
574
      continue;
×
575
    }
576

577
    if (pEntry->nodeId == SNODE_HANDLE) {
13,311✔
578
      continue;
843✔
579
    }
580

581
    bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
12,468✔
582
    if (!existed) {
12,468!
583
      void* p = taosArrayPush(pRemovedTasks, pId);
×
584
      if (p == NULL) {
×
585
        mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
×
586
      }
587
    }
588
  }
589

590
  removeTasksInBuf(pRemovedTasks, &execInfo);
1,250✔
591

592
  mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
1,250✔
593
         (int32_t)taosArrayGetSize(execInfo.pTaskList));
594

595
  removeExpiredNodeInfo(pNodeSnapshot);
1,250✔
596

597
  taosArrayDestroy(pRemovedTasks);
1,250✔
598
  return 0;
1,250✔
599
}
600

601
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
25,411✔
602
  SMnode *pMnode = pReq->info.node;
25,411✔
603
  void   *pIter = NULL;
25,411✔
604
  int32_t code = 0;
25,411✔
605
  SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
25,411✔
606
  if (pDropped == NULL) {
25,411!
607
    return terrno;
×
608
  }
609

610
  mDebug("start to scan checkpoint report info");
25,411✔
611
  streamMutexLock(&execInfo.lock);
25,411✔
612

613
  while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
87,981✔
614
    SChkptReportInfo* px = (SChkptReportInfo *)pIter;
63,662✔
615
    if (taosArrayGetSize(px->pTaskList) == 0) {
63,662✔
616
      continue;
62,509✔
617
    }
618

619
    STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
1,153✔
620
    if (pInfo == NULL) {
1,153!
621
      continue;
×
622
    }
623

624
    SStreamObj *pStream = NULL;
1,153✔
625
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
1,153✔
626
    if (pStream == NULL || code != 0) {
1,153!
627
      mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
×
628
      void *p = taosArrayPush(pDropped, &pInfo->streamId);
×
629
      if (p == NULL) {
×
630
        mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
×
631
      }
632
      continue;
×
633
    }
634

635
    int32_t total = mndGetNumOfStreamTasks(pStream);
1,153✔
636
    int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList);
1,153✔
637

638
    if (total == existed) {
1,153✔
639
      mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
1,092✔
640
             pStream->uid, pStream->name, total);
641

642
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
1,092✔
643
      if (code == 0) {
1,092!
644
        code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
1,092✔
645
        if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {  // remove this entry
1,092!
646
          taosArrayClear(px->pTaskList);
1,092✔
647
          px->reportChkpt = pInfo->checkpointId;
1,092✔
648
          mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId);
1,092✔
649
        } else {
650
          mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet",
×
651
                 pInfo->streamId);
652
        }
653
        break;
1,092✔
654
      } else {
655
        mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
×
656
      }
657
    } else {
658
      mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name,
61✔
659
             existed, total, total - existed);
660
    }
661

662
    sdbRelease(pMnode->pSdb, pStream);
61✔
663
  }
664

665
  int32_t size = taosArrayGetSize(pDropped);
25,411✔
666
  if (size > 0) {
25,411!
667
    for (int32_t i = 0; i < size; ++i) {
×
668
      int64_t* pStreamId = (int64_t *)taosArrayGet(pDropped, i);
×
669
      if (pStreamId == NULL) {
×
670
        continue;
×
671
      }
672

673
      code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId));
×
674
      if (code) {
×
675
        mError("failed to remove stream in buf:0x%"PRIx64, *pStreamId);
×
676
      }
677
    }
678

679
    int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
×
680
    mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
×
681
  }
682

683
  streamMutexUnlock(&execInfo.lock);
25,411✔
684

685
  taosArrayDestroy(pDropped);
25,411✔
686
  return TSDB_CODE_SUCCESS;
25,411✔
687
}
688

689
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
136✔
690
                                          int64_t ts) {
691
  char msg[128] = {0};
136✔
692
  snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
136✔
693

694
  STrans *pTrans = NULL;
136✔
695
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
136✔
696
  if (pTrans == NULL || code != 0) {
136!
697
    return terrno;
×
698
  }
699

700
  STaskId      id = {.streamId = pStream->uid, .taskId = taskId};
136✔
701
  SStreamTask *pTask = NULL;
136✔
702
  code = mndGetStreamTask(&id, pStream, &pTask);
136✔
703
  if (code) {
136!
704
    mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);
×
705
    sdbRelease(pMnode->pSdb, pStream);
×
706
    return code;
×
707
  }
708

709
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
136✔
710
  if (code) {
136!
711
    sdbRelease(pMnode->pSdb, pStream);
×
712
    return code;
×
713
  }
714

715
  code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts);
136✔
716
  if (code != 0) {
136!
717
    sdbRelease(pMnode->pSdb, pStream);
×
718
    mndTransDrop(pTrans);
×
719
    return code;
×
720
  }
721

722
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
136✔
723
  if (code) {
136!
724
    sdbRelease(pMnode->pSdb, pStream);
×
725
    mndTransDrop(pTrans);
×
726
    return code;
×
727
  }
728

729
  code = mndTransPrepare(pMnode, pTrans);
136✔
730
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
136!
731
    mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
×
732
    sdbRelease(pMnode->pSdb, pStream);
×
733
    mndTransDrop(pTrans);
×
734
    return code;
×
735
  }
736

737
  sdbRelease(pMnode->pSdb, pStream);
136✔
738
  mndTransDrop(pTrans);
136✔
739

740
  return TSDB_CODE_ACTION_IN_PROGRESS;
136✔
741
}
742

743
int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
164✔
744
  *pInfo = NULL;
164✔
745

746
  void* px = taosHashGet(pHash, &streamId, sizeof(streamId));
164✔
747
  if (px != NULL) {
164✔
748
    *pInfo = px;
129✔
749
    return 0;
129✔
750
  }
751

752
  SCheckpointConsensusInfo p = {
35✔
753
      .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
35✔
754
      .numOfTasks = numOfTasks,
755
      .streamId = streamId,
756
  };
757

758
  if (p.pTaskList == NULL) {
35!
759
    return terrno;
×
760
  }
761

762
  int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
35✔
763
  if (code == 0) {
35!
764
    void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId));
35✔
765
    *pInfo = pChkptInfo;
35✔
766
  } else {
767
    *pInfo = NULL;
×
768
  }
769

770
  return code;
35✔
771
}
772

773
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
774
// discard the msg may lead to the lost of connections.
775
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) {
164✔
776
  SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
164✔
777
  memcpy(&info.req, pRestoreInfo, sizeof(info.req));
164✔
778

779
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
487✔
780
    SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
346✔
781
    if (p == NULL) {
346!
782
      continue;
×
783
    }
784

785
    if (p->req.taskId == info.req.taskId) {
346✔
786
      mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
23✔
787
             "->%" PRId64 " total existed:%d",
788
             pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs,
789
             (int32_t)taosArrayGetSize(pInfo->pTaskList));
790
      p->req.startTs = info.req.startTs;
23✔
791
      return;
23✔
792
    }
793
  }
794

795
  void *p = taosArrayPush(pInfo->pTaskList, &info);
141✔
796
  if (p == NULL) {
141!
797
    mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
×
798
  } else {
799
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
141✔
800
    mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
141✔
801
           " waiting tasks:%d",
802
           pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
803
  }
804
}
805

806
void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) {
34✔
807
  taosArrayDestroy(pInfo->pTaskList);
34✔
808
  pInfo->pTaskList = NULL;
34✔
809
}
34✔
810

811
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
1,370✔
812
  int32_t code = 0;
1,370✔
813
  int32_t numOfStreams = taosHashGetSize(pHash);
1,370✔
814
  if (numOfStreams == 0) {
1,370✔
815
    return code;
1,336✔
816
  }
817

818
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
34✔
819
  if (code == 0) {
34!
820
    mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
34✔
821
  } else {
822
    mError("failed to remove stream:0x%"PRIx64" in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
×
823
  }
824

825
  return code;
34✔
826
}
827

828
int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId) {
1,336✔
829
  int32_t code = 0;
1,336✔
830
  int32_t numOfStreams = taosHashGetSize(pHash);
1,336✔
831
  if (numOfStreams == 0) {
1,336✔
832
    return code;
316✔
833
  }
834

835
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
1,020✔
836
  if (code == 0) {
1,020✔
837
    mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
621✔
838
  } else {
839
    mError("failed to remove stream:0x%"PRIx64" in chkpt-report list, remain:%d", streamId, numOfStreams);
399!
840
  }
841

842
  return code;
1,020✔
843
}
844

845
int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId) {
×
846
  SChkptReportInfo* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
×
847
  if (pInfo != NULL) {
×
848
    taosArrayClear(pInfo->pTaskList);
×
849
    mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
×
850
           pInfo->reportChkpt);
851
    return 0;
×
852
  }
853

854
  return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
855
}
856

857
static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
33,004✔
858
  int8_t status = atomic_load_8(&pStream->status);
33,004✔
859
  if (status == STREAM_STATUS__NORMAL) {
33,033!
860
    strcpy(dst, "ready");
33,035✔
861
  } else if (status == STREAM_STATUS__STOP) {
×
862
    strcpy(dst, "stop");
×
863
  } else if (status == STREAM_STATUS__FAILED) {
×
864
    strcpy(dst, "failed");
×
865
  } else if (status == STREAM_STATUS__RECOVER) {
×
866
    strcpy(dst, "recover");
×
867
  } else if (status == STREAM_STATUS__PAUSE) {
×
868
    strcpy(dst, "paused");
×
869
  }
870
}
33,033✔
871

872
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
33,096✔
873
  int8_t trigger = pStream->conf.trigger;
33,096✔
874
  if (trigger == STREAM_TRIGGER_AT_ONCE) {
33,096✔
875
    strcpy(dst, "at once");
11,352✔
876
  } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
21,744✔
877
    strcpy(dst, "window close");
10,903✔
878
  } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
10,841!
879
    strcpy(dst, "max delay");
10,843✔
880
  } else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
×
881
    strcpy(dst, "force window close");
14✔
882
  }
883
}
33,096✔
884

885
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
234,538✔
886
  memset(pBuf, 0, bufLen);
234,538✔
887
  pBuf[2] = '0';
234,538✔
888
  pBuf[3] = 'x';
234,538✔
889

890
  int32_t len = tintToHex(id, &pBuf[4]);
234,538✔
891
  varDataSetLen(pBuf, len + 2);
234,725✔
892
}
234,725✔
893

894
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
33,115✔
895
  int32_t code = 0;
33,115✔
896
  int32_t cols = 0;
33,115✔
897
  int32_t lino = 0;
33,115✔
898

899
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,115✔
900
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
33,115✔
901
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,107✔
902
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,074!
903

904
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
33,074✔
905
  TSDB_CHECK_CODE(code, lino, _end);
33,067!
906

907
  // create time
908
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,067✔
909
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,070!
910
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
33,070✔
911
  TSDB_CHECK_CODE(code, lino, _end);
33,056!
912

913
  // stream id
914
  char buf[128] = {0};
33,056✔
915
  int64ToHexStr(pStream->uid, buf, tListLen(buf));
33,056✔
916
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,079✔
917
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,095!
918
  code = colDataSetVal(pColInfo, numOfRows, buf, false);
33,095✔
919
  TSDB_CHECK_CODE(code, lino, _end);
33,109!
920

921
  // related fill-history stream id
922
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,109✔
923
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,100✔
924
  if (pStream->hTaskUid != 0) {
33,079!
925
    int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
×
926
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
927
  } else {
928
    code = colDataSetVal(pColInfo, numOfRows, buf, true);
33,079✔
929
  }
930
  TSDB_CHECK_CODE(code, lino, _end);
33,080!
931

932
  // related fill-history stream id
933
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
33,080✔
934
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
33,080✔
935
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,080✔
936
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,109!
937
  code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
33,109✔
938
  TSDB_CHECK_CODE(code, lino, _end);
33,031!
939

940
  char status[20 + VARSTR_HEADER_SIZE] = {0};
33,031✔
941
  char status2[20] = {0};
33,031✔
942
  mndShowStreamStatus(status2, pStream);
33,031✔
943
  STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
33,030✔
944
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,030✔
945
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,110!
946

947
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
33,110✔
948
  TSDB_CHECK_CODE(code, lino, _end);
33,120!
949

950
  char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,120✔
951
  STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
33,120✔
952
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,099✔
953
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,095!
954

955
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
33,095✔
956
  TSDB_CHECK_CODE(code, lino, _end);
33,116!
957

958
  char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,116✔
959
  STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
33,116✔
960
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,112✔
961
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,083!
962

963
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
33,083✔
964
  TSDB_CHECK_CODE(code, lino, _end);
33,117!
965

966
  if (pStream->targetSTbName[0] == 0) {
33,117✔
967
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2✔
968
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2!
969

970
    code = colDataSetVal(pColInfo, numOfRows, NULL, true);
2✔
971
  } else {
972
    char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
33,115✔
973
    STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
33,115✔
974
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,086✔
975
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,067!
976

977
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
33,067✔
978
  }
979
  TSDB_CHECK_CODE(code, lino, _end);
33,094!
980

981
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,094✔
982
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,078!
983

984
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
33,078✔
985
  TSDB_CHECK_CODE(code, lino, _end);
33,089!
986

987
  char trigger[20 + VARSTR_HEADER_SIZE] = {0};
33,089✔
988
  char trigger2[20] = {0};
33,089✔
989
  mndShowStreamTrigger(trigger2, pStream);
33,089✔
990
  STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
33,090✔
991
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,090✔
992
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,117!
993

994
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
33,117✔
995
  TSDB_CHECK_CODE(code, lino, _end);
33,114!
996

997
  // sink_quota
998
  char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
33,114✔
999
  sinkQuota[0] = '0';
33,114✔
1000
  char dstStr[20] = {0};
33,114✔
1001
  STR_TO_VARSTR(dstStr, sinkQuota)
33,114✔
1002
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,114✔
1003
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,118!
1004

1005
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
33,118✔
1006
  TSDB_CHECK_CODE(code, lino, _end);
33,114!
1007

1008
  // checkpoint interval
1009
  char tmp[20 + VARSTR_HEADER_SIZE] = {0};
33,114✔
1010
  sprintf(varDataVal(tmp), "%d sec", tsStreamCheckpointInterval);
33,114✔
1011
  varDataSetLen(tmp, strlen(varDataVal(tmp)));
33,114✔
1012

1013
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,114✔
1014
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,083!
1015

1016
  code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
33,083✔
1017
  TSDB_CHECK_CODE(code, lino, _end);
33,114!
1018

1019
  // checkpoint backup type
1020
  char backup[20 + VARSTR_HEADER_SIZE] = {0};
33,114✔
1021
  STR_TO_VARSTR(backup, "none")
33,114✔
1022
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,114✔
1023
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,116!
1024

1025
  code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
33,116✔
1026
  TSDB_CHECK_CODE(code, lino, _end);
33,108!
1027

1028
  // history scan idle
1029
  char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
33,108✔
1030
  strcpy(scanHistoryIdle, "100a");
33,108✔
1031

1032
  memset(dstStr, 0, tListLen(dstStr));
33,108✔
1033
  STR_TO_VARSTR(dstStr, scanHistoryIdle)
33,108✔
1034
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
33,108✔
1035
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
33,121!
1036

1037
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
33,121✔
1038

1039
_end:
33,122✔
1040
  if (code) {
33,122!
1041
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1042
  }
1043
  return code;
33,123✔
1044
}
1045

1046
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows, int32_t precision) {
200,986✔
1047
  SColumnInfoData *pColInfo = NULL;
200,986✔
1048
  int32_t          cols = 0;
200,986✔
1049
  int32_t          code = 0;
200,986✔
1050
  int32_t          lino = 0;
200,986✔
1051

1052
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
200,986✔
1053

1054
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
200,986✔
1055
  if (pe == NULL) {
201,576!
1056
    mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
×
1057
               " no valid status/stage info",
1058
           id.taskId, pStream->name, pStream->uid, pStream->createTime);
1059
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1060
  }
1061

1062
  // stream name
1063
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
201,576✔
1064
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
201,576✔
1065

1066
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,507✔
1067
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,354!
1068

1069
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
201,354✔
1070
  TSDB_CHECK_CODE(code, lino, _end);
201,265!
1071

1072
  // task id
1073
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,265✔
1074
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,245!
1075

1076
  char idstr[128] = {0};
201,245✔
1077
  int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
201,245✔
1078
  code = colDataSetVal(pColInfo, numOfRows, idstr, false);
201,434✔
1079
  TSDB_CHECK_CODE(code, lino, _end);
201,435!
1080

1081
  // node type
1082
  char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
201,435✔
1083
  varDataSetLen(nodeType, 5);
201,435✔
1084
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,435✔
1085
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,386!
1086

1087
  if (pTask->info.nodeId > 0) {
201,455✔
1088
    memcpy(varDataVal(nodeType), "vnode", 5);
184,162✔
1089
  } else {
1090
    memcpy(varDataVal(nodeType), "snode", 5);
17,293✔
1091
  }
1092
  code = colDataSetVal(pColInfo, numOfRows, nodeType, false);
201,455✔
1093
  TSDB_CHECK_CODE(code, lino, _end);
201,328!
1094

1095
  // node id
1096
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,328✔
1097
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,264!
1098

1099
  int64_t nodeId = TMAX(pTask->info.nodeId, 0);
201,264✔
1100
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
201,264✔
1101
  TSDB_CHECK_CODE(code, lino, _end);
201,188!
1102

1103
  // level
1104
  char level[20 + VARSTR_HEADER_SIZE] = {0};
201,188✔
1105
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
201,188✔
1106
    STR_WITH_SIZE_TO_VARSTR(level, "source", 6);
102,433✔
1107
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
98,755✔
1108
    STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
18,708✔
1109
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
80,047!
1110
    STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
80,168✔
1111
  }
1112

1113
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,188✔
1114
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,031!
1115

1116
  code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
201,031✔
1117
  TSDB_CHECK_CODE(code, lino, _end);
201,223!
1118

1119
  // status
1120
  char status[20 + VARSTR_HEADER_SIZE] = {0};
201,223✔
1121

1122
  const char *pStatus = streamTaskGetStatusStr(pe->status);
201,223✔
1123
  STR_TO_VARSTR(status, pStatus);
201,178✔
1124

1125
  // status
1126
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,178✔
1127
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,125!
1128

1129
  code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
201,125✔
1130
  TSDB_CHECK_CODE(code, lino, _end);
201,273!
1131

1132
  // stage
1133
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,273✔
1134
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,226!
1135

1136
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
201,226✔
1137
  TSDB_CHECK_CODE(code, lino, _end);
201,097!
1138

1139
  // input queue
1140
  char        vbuf[40] = {0};
201,097✔
1141
  char        buf[38] = {0};
201,097✔
1142
  const char *queueInfoStr = "%4.2f MiB (%6.2f%)";
201,097✔
1143
  snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate);
201,097✔
1144
  STR_TO_VARSTR(vbuf, buf);
201,097✔
1145

1146
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,097✔
1147
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,573!
1148

1149
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
201,573✔
1150
  TSDB_CHECK_CODE(code, lino, _end);
201,511!
1151

1152
  // input total
1153
  const char *formatTotalMb = "%7.2f MiB";
201,511✔
1154
  const char *formatTotalGb = "%7.2f GiB";
201,511✔
1155
  if (pe->procsTotal < 1024) {
201,511✔
1156
    snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal);
201,364✔
1157
  } else {
1158
    snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024);
147✔
1159
  }
1160

1161
  memset(vbuf, 0, tListLen(vbuf));
201,511✔
1162
  STR_TO_VARSTR(vbuf, buf);
201,511✔
1163

1164
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,511✔
1165
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,657!
1166

1167
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
201,657✔
1168
  TSDB_CHECK_CODE(code, lino, _end);
201,559!
1169

1170
  // process throughput
1171
  const char *formatKb = "%7.2f KiB/s";
201,559✔
1172
  const char *formatMb = "%7.2f MiB/s";
201,559✔
1173
  if (pe->procsThroughput < 1024) {
201,559✔
1174
    snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput);
201,388✔
1175
  } else {
1176
    snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024);
171✔
1177
  }
1178

1179
  memset(vbuf, 0, tListLen(vbuf));
201,559✔
1180
  STR_TO_VARSTR(vbuf, buf);
201,559✔
1181

1182
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,559✔
1183
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,695!
1184

1185
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
201,695✔
1186
  TSDB_CHECK_CODE(code, lino, _end);
201,491!
1187

1188
  // output total
1189
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,491✔
1190
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,407!
1191

1192
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
201,565✔
1193
    colDataSetNULL(pColInfo, numOfRows);
80,256!
1194
  } else {
1195
    sprintf(buf, formatTotalMb, pe->outputTotal);
121,309✔
1196
    memset(vbuf, 0, tListLen(vbuf));
121,309✔
1197
    STR_TO_VARSTR(vbuf, buf);
121,309✔
1198

1199
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
121,309✔
1200
    TSDB_CHECK_CODE(code, lino, _end);
121,429!
1201
  }
1202

1203
  // output throughput
1204
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,685✔
1205
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,469!
1206

1207
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
201,613✔
1208
    colDataSetNULL(pColInfo, numOfRows);
80,221!
1209
  } else {
1210
    if (pe->outputThroughput < 1024) {
121,392✔
1211
      snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput);
121,308✔
1212
    } else {
1213
      snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024);
84✔
1214
    }
1215

1216
    memset(vbuf, 0, tListLen(vbuf));
121,392✔
1217
    STR_TO_VARSTR(vbuf, buf);
121,392✔
1218

1219
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
121,392✔
1220
    TSDB_CHECK_CODE(code, lino, _end);
121,458!
1221
  }
1222

1223
  // output queue
1224
  //          sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
1225
  //        STR_TO_VARSTR(vbuf, buf);
1226

1227
  //        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1228
  //        colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
1229

1230
  // info
1231
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
201,679✔
1232
    const char *sinkStr = "%.2f MiB";
80,211✔
1233
    snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
80,211✔
1234
  } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info
121,468✔
1235
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
102,733✔
1236
      int32_t ret = taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision);
1,732✔
1237
      if (ret != 0) {
1,732!
1238
        mError("failed to format processed timewindow, skey:%" PRId64, pe->processedVer);
×
1239
        memset(buf, 0, tListLen(buf));
×
1240
      }
1241
    } else {
1242
      const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
101,001✔
1243
      snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
101,001✔
1244
    }
1245
  } else {
1246
    memset(buf, 0, tListLen(buf));
18,735✔
1247
  }
1248

1249
  STR_TO_VARSTR(vbuf, buf);
201,679✔
1250

1251
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,679✔
1252
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,546!
1253

1254
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
201,546✔
1255
  TSDB_CHECK_CODE(code, lino, _end);
201,531!
1256

1257
  // start_time
1258
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,531✔
1259
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,502!
1260

1261
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false);
201,502✔
1262
  TSDB_CHECK_CODE(code, lino, _end);
201,309!
1263

1264
  // start id
1265
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,309✔
1266
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,271!
1267

1268
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false);
201,271✔
1269
  TSDB_CHECK_CODE(code, lino, _end);
201,071!
1270

1271
  // start ver
1272
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,071✔
1273
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,047!
1274

1275
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false);
201,047✔
1276
  TSDB_CHECK_CODE(code, lino, _end);
201,047!
1277

1278
  // checkpoint time
1279
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,047✔
1280
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,014!
1281

1282
  if (pe->checkpointInfo.latestTime != 0) {
201,026✔
1283
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
174,166✔
1284
  } else {
1285
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
26,860✔
1286
  }
1287
  TSDB_CHECK_CODE(code, lino, _end);
201,033!
1288

1289
  // checkpoint_id
1290
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
201,033✔
1291
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
201,012!
1292

1293
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false);
201,012✔
1294
  TSDB_CHECK_CODE(code, lino, _end);
200,900!
1295

1296
  // checkpoint version
1297
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,900✔
1298
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,876!
1299

1300
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false);
200,876✔
1301
  TSDB_CHECK_CODE(code, lino, _end);
200,840!
1302

1303
  // checkpoint size
1304
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,840✔
1305
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,828✔
1306

1307
  colDataSetNULL(pColInfo, numOfRows);
200,801!
1308

1309
  // checkpoint backup status
1310
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,801✔
1311
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,674!
1312

1313
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
200,674✔
1314
  TSDB_CHECK_CODE(code, lino, _end);
200,737!
1315

1316
  // ds_err_info
1317
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,737✔
1318
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,662!
1319

1320
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
200,662✔
1321
  TSDB_CHECK_CODE(code, lino, _end);
200,677!
1322

1323
  // history_task_id
1324
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,677✔
1325
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,591!
1326

1327
  if (pe->hTaskId != 0) {
200,594✔
1328
    int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
193✔
1329
    code = colDataSetVal(pColInfo, numOfRows, idstr, false);
193✔
1330
  } else {
1331
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
200,401✔
1332
  }
1333
  TSDB_CHECK_CODE(code, lino, _end);
200,642!
1334

1335
  // history_task_status
1336
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
200,642✔
1337
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,533!
1338

1339
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
200,533✔
1340
  TSDB_CHECK_CODE(code, lino, _end);
200,607!
1341

1342
  _end:
200,607✔
1343
  if (code) {
200,607!
1344
    mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code));
×
1345
  }
1346
  return code;
200,626✔
1347
}
1348

1349
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
11,675✔
1350
  const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
11,675✔
1351
  const SEp *p = GET_ACTIVE_EP(pCurrent);
11,675✔
1352

1353
  if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
11,675!
1354
    return false;
11,675✔
1355
  }
1356
  return true;
×
1357
}
1358

1359
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) {
2,417✔
1360
  if (pInfo != NULL) {
2,417!
1361
    taosArrayDestroy(pInfo->pUpdateNodeList);
2,417✔
1362
    taosHashCleanup(pInfo->pDBMap);
2,417✔
1363
  }
1364
}
2,417✔
1365

1366
// 1. increase the replica does not affect the stream process.
1367
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
1368
// tasks on the will be removed replica.
1369
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
1370
// will handle it as mentioned in 1 & 2 items.
1371
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
2,417✔
1372
                               SVgroupChangeInfo *pInfo) {
1373
  int32_t code = 0;
2,417✔
1374
  int32_t lino = 0;
2,417✔
1375

1376
  if (pInfo == NULL) {
2,417!
1377
    return TSDB_CODE_INVALID_PARA;
×
1378
  }
1379

1380
  pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
2,417✔
1381
      pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
2,417✔
1382

1383
  if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
2,417!
1384
    mndDestroyVgroupChangeInfo(pInfo);
×
1385
    TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
×
1386
  }
1387

1388
  int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
2,417✔
1389
  for (int32_t i = 0; i < numOfNodes; ++i) {
14,741✔
1390
    SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
12,324✔
1391
    if (pPrevEntry == NULL) {
12,324!
1392
      continue;
×
1393
    }
1394

1395
    int32_t num = taosArrayGetSize(pNodeList);
12,324✔
1396
    for (int32_t j = 0; j < num; ++j) {
156,099✔
1397
      SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
155,471✔
1398
      if(pCurrent == NULL) {
155,471!
1399
        continue;
×
1400
      }
1401

1402
      if (pCurrent->nodeId == pPrevEntry->nodeId) {
155,471✔
1403
        if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
11,696!
1404
          const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
21✔
1405

1406
          char buf[256] = {0};
21✔
1407
          code = epsetToStr(&pCurrent->epset, buf, tListLen(buf));  // ignore this error
21✔
1408
          if (code) {
21!
1409
            mError("failed to convert epset string, code:%s", tstrerror(code));
×
1410
            TSDB_CHECK_CODE(code, lino, _err);
×
1411
          }
1412

1413
          mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
21✔
1414
                 pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
1415

1416
          SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
21✔
1417
          epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
21✔
1418
          epsetAssign(&updateInfo.newEp, &pCurrent->epset);
21✔
1419

1420
          void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
21✔
1421
          TSDB_CHECK_NULL(p, code, lino, _err, terrno);
21!
1422
        }
1423

1424
        // todo handle the snode info
1425
        if (pCurrent->nodeId != SNODE_HANDLE) {
11,696✔
1426
          SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
10,205✔
1427
          code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
10,205✔
1428
          mndReleaseVgroup(pMnode, pVgroup);
10,205✔
1429
          TSDB_CHECK_CODE(code, lino, _err);
10,205!
1430
        }
1431

1432
        break;
11,696✔
1433
      }
1434
    }
1435
  }
1436

1437
  return code;
2,417✔
1438

1439
  _err:
×
1440
  mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
×
1441
  mndDestroyVgroupChangeInfo(pInfo);
×
1442
  return code;
×
1443
  }
1444

1445
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
2,022✔
1446
  bool              allReady = false;
2,022✔
1447
  bool              nodeUpdated = false;
2,022✔
1448
  SVgroupChangeInfo changeInfo = {0};
2,022✔
1449

1450
  int32_t numOfNodes = extractStreamNodeList(pMnode);
2,022✔
1451

1452
  if (numOfNodes == 0) {
2,022✔
1453
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
814✔
1454
    execInfo.ts = taosGetTimestampSec();
814✔
1455
    return false;
814✔
1456
  }
1457

1458
  for (int32_t i = 0; i < numOfNodes; ++i) {
7,094✔
1459
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
5,908✔
1460
    if (pNodeEntry == NULL) {
5,908!
1461
      continue;
×
1462
    }
1463

1464
    if (pNodeEntry->stageUpdated) {
5,908✔
1465
      mDebug("stream task not ready due to node update detected, checkpoint not issued");
22✔
1466
      return true;
22✔
1467
    }
1468
  }
1469

1470
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
1,186✔
1471
  if (code) {
1,186!
1472
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
1473
  }
1474

1475
  if (!allReady) {
1,186✔
1476
    mWarn("not all vnodes ready, quit from vnodes status check");
19!
1477
    return true;
19✔
1478
  }
1479

1480
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
1,167✔
1481
  if (code) {
1,167!
1482
    nodeUpdated = false;
×
1483
  } else {
1484
    nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
1,167✔
1485
    if (nodeUpdated) {
1,167!
1486
      mDebug("stream tasks not ready due to node update");
×
1487
    }
1488
  }
1489

1490
  mndDestroyVgroupChangeInfo(&changeInfo);
1,167✔
1491
  return nodeUpdated;
1,167✔
1492
}
1493

1494
// check if the node update happens or not
1495
bool mndStreamNodeIsUpdated(SMnode *pMnode) {
2,022✔
1496
  SArray *pNodeSnapshot = NULL;
2,022✔
1497

1498
  streamMutexLock(&execInfo.lock);
2,022✔
1499
  bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
2,022✔
1500
  streamMutexUnlock(&execInfo.lock);
2,022✔
1501

1502
  taosArrayDestroy(pNodeSnapshot);
2,022✔
1503
  return updated;
2,022✔
1504
}
1505

1506
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
1,613✔
1507
  SSdb      *pSdb = pMnode->pSdb;
1,613✔
1508
  void      *pIter = NULL;
1,613✔
1509
  SSnodeObj *pObj = NULL;
1,613✔
1510

1511
  if (pSrcDb->cfg.replications == 1) {
1,613✔
1512
    return TSDB_CODE_SUCCESS;
1,610✔
1513
  } else {
1514
    while (1) {
1515
      pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
3✔
1516
      if (pIter == NULL) {
3✔
1517
        break;
2✔
1518
      }
1519

1520
      sdbRelease(pSdb, pObj);
1✔
1521
      sdbCancelFetch(pSdb, pIter);
1✔
1522
      return TSDB_CODE_SUCCESS;
1✔
1523
    }
1524

1525
    mError("snode not existed when trying to create stream in db with multiple replica");
2!
1526
    return TSDB_CODE_SNODE_NOT_DEPLOYED;
2✔
1527
  }
1528
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc