• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.84
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
struct SStreamTaskIter {
25
  SStreamObj  *pStream;
26
  int32_t      level;
27
  int32_t      ordinalIndex;
28
  int32_t      totalLevel;
29
  SStreamTask *pTask;
30
};
31

32
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
33

34
int32_t createStreamTaskIter(SStreamObj* pStream, SStreamTaskIter** pIter) {
8,465✔
35
  *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
8,465✔
36
  if (*pIter == NULL) {
8,465!
37
    return terrno;
×
38
  }
39

40
  (*pIter)->level = -1;
8,465✔
41
  (*pIter)->ordinalIndex = 0;
8,465✔
42
  (*pIter)->pStream = pStream;
8,465✔
43
  (*pIter)->totalLevel = taosArrayGetSize(pStream->tasks);
8,465✔
44
  (*pIter)->pTask = NULL;
8,465✔
45

46
  return 0;
8,465✔
47
}
48

49
bool streamTaskIterNextTask(SStreamTaskIter* pIter) {
47,500✔
50
  if (pIter->level >= pIter->totalLevel) {
47,500!
51
    pIter->pTask = NULL;
×
52
    return false;
×
53
  }
54

55
  if (pIter->level == -1) {
47,500✔
56
    pIter->level += 1;
8,465✔
57
  }
58

59
  while(pIter->level < pIter->totalLevel) {
62,572✔
60
    SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
54,183✔
61
    if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
54,183✔
62
      pIter->level += 1;
15,072✔
63
      pIter->ordinalIndex = 0;
15,072✔
64
      pIter->pTask = NULL;
15,072✔
65
      continue;
15,072✔
66
    }
67

68
    pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
39,111✔
69
    pIter->ordinalIndex += 1;
39,111✔
70
    return true;
39,111✔
71
  }
72

73
  pIter->pTask = NULL;
8,389✔
74
  return false;
8,389✔
75
}
76

77
int32_t streamTaskIterGetCurrent(SStreamTaskIter* pIter, SStreamTask** pTask) {
39,111✔
78
  if (pTask) {
39,111!
79
    *pTask = pIter->pTask;
39,111✔
80
    if (*pTask != NULL) {
39,111!
81
      return TSDB_CODE_SUCCESS;
39,111✔
82
    }
83
  }
84

85
  return TSDB_CODE_INVALID_PARA;
×
86
}
87

88
void destroyStreamTaskIter(SStreamTaskIter* pIter) {
8,465✔
89
  taosMemoryFree(pIter);
8,465✔
90
}
8,465✔
91

92
static bool checkStatusForEachReplica(SVgObj *pVgroup) {
1,012,509✔
93
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
2,025,130✔
94
    if (!pVgroup->vnodeGid[i].syncRestore) {
1,012,917✔
95
      mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
296!
96
      return false;
296✔
97
    }
98

99
    ESyncState state = pVgroup->vnodeGid[i].syncState;
1,012,621✔
100
    if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
1,012,621!
101
        state == TAOS_SYNC_STATE_CANDIDATE) {
UNCOV
102
      mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
×
103
            state);
UNCOV
104
      return false;
×
105
    }
106
  }
107

108
  return true;
1,012,213✔
109
}
110

111
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
10,025✔
112
  SSdb     *pSdb = pMnode->pSdb;
10,025✔
113
  void     *pIter = NULL;
10,025✔
114
  SVgObj   *pVgroup = NULL;
10,025✔
115
  int32_t   code = 0;
10,025✔
116
  SArray   *pVgroupList = NULL;
10,025✔
117
  SHashObj *pHash = NULL;
10,025✔
118

119
  pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
10,025✔
120
  if (pVgroupList == NULL) {
10,025!
121
    mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
×
122
    code = terrno;
×
123
    goto _err;
×
124
  }
125

126
  pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
10,025✔
127
  if (pHash == NULL) {
10,025!
128
    mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
×
129
    code = terrno;
×
130
    goto _err;
×
131
  }
132

133
  *allReady = true;
10,025✔
134

135
  while (1) {
1,013,091✔
136
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
1,023,116✔
137
    if (pIter == NULL) {
1,023,116✔
138
      break;
10,025✔
139
    }
140

141
    SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
1,013,091✔
142
    entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
1,013,091✔
143

144
    int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
1,013,091✔
145
    if (pReplica == NULL) {  // not exist, add it into hash map
1,013,091✔
146
      code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
384,847✔
147
      if (code) {
384,847!
148
        mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
×
149
        sdbRelease(pSdb, pVgroup);
×
150
        sdbCancelFetch(pSdb, pIter);
×
151
        goto _err;  // take snapshot failed, and not all ready
×
152
      }
153
    } else {
154
      if (*pReplica != pVgroup->replica) {
628,244✔
155
        mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
49!
156
              pVgroup->vgId, pVgroup->replica, *pReplica);
157
        *allReady = false;  // task snap success, but not all ready
49✔
158
      }
159
    }
160

161
    // if not all ready till now, no need to check the remaining vgroups.
162
    // but still we need to put the info of the existed vgroups into the snapshot list
163
    if (*allReady) {
1,013,091✔
164
      *allReady = checkStatusForEachReplica(pVgroup);
1,012,509✔
165
    }
166

167
    char buf[256] = {0};
1,013,091✔
168
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
1,013,091✔
169
    if (code != 0) {  // print error and continue
1,013,091!
170
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
171
    }
172

173
    void *p = taosArrayPush(pVgroupList, &entry);
1,013,091✔
174
    if (p == NULL) {
1,013,091!
175
      mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
×
176
      code = terrno;
×
177
      sdbRelease(pSdb, pVgroup);
×
178
      sdbCancelFetch(pSdb, pIter);
×
179
      goto _err;
×
180
    } else {
181
      mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
1,013,091✔
182
    }
183

184
    sdbRelease(pSdb, pVgroup);
1,013,091✔
185
  }
186

187
  SSnodeObj *pObj = NULL;
10,025✔
188
  while (1) {
5,862✔
189
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
15,887✔
190
    if (pIter == NULL) {
15,887✔
191
      break;
10,025✔
192
    }
193

194
    SNodeEntry entry = {.nodeId = SNODE_HANDLE};
5,862✔
195
    code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
5,862✔
196
    if (code) {
5,862!
197
      sdbRelease(pSdb, pObj);
×
198
      sdbCancelFetch(pSdb, pIter);
×
199
      mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
×
200
      goto _err;
×
201
    }
202

203
    char buf[256] = {0};
5,862✔
204
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
5,862✔
205
    if (code != 0) {  // print error and continue
5,862!
206
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
207
    }
208

209
    void *p = taosArrayPush(pVgroupList, &entry);
5,862✔
210
    if (p == NULL) {
5,862!
211
      code = terrno;
×
212
      sdbRelease(pSdb, pObj);
×
213
      sdbCancelFetch(pSdb, pIter);
×
214
      mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
×
215
      goto _err;
×
216
    } else {
217
      mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
5,862!
218
    }
219

220
    sdbRelease(pSdb, pObj);
5,862✔
221
  }
222

223
  *pList = pVgroupList;
10,025✔
224
  taosHashCleanup(pHash);
10,025✔
225
  return code;
10,025✔
226

227
_err:
×
228
  *allReady = false;
×
229
  taosArrayDestroy(pVgroupList);
×
230
  taosHashCleanup(pHash);
×
231

232
  return code;
×
233
}
234

235
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
5,333✔
236
  void *pIter = NULL;
5,333✔
237
  SSdb *pSdb = pMnode->pSdb;
5,333✔
238
  *pStream = NULL;
5,333✔
239

240
  SStreamObj *p = NULL;
5,333✔
241
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
9,309✔
242
    if (p->uid == streamId) {
9,306✔
243
      sdbCancelFetch(pSdb, pIter);
5,330✔
244
      *pStream = p;
5,330✔
245
      return TSDB_CODE_SUCCESS;
5,330✔
246
    }
247
    sdbRelease(pSdb, p);
3,976✔
248
  }
249

250
  return TSDB_CODE_STREAM_TASK_NOT_EXIST;
3✔
251
}
252

UNCOV
253
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
×
UNCOV
254
  STrans *pTrans = mndAcquireTrans(pMnode, transId);
×
UNCOV
255
  if (pTrans != NULL) {
×
256
    mInfo("kill active transId:%d in Db:%s", transId, pDbName);
×
257
    int32_t code = mndKillTrans(pMnode, pTrans);
×
258
    mndReleaseTrans(pMnode, pTrans);
×
259
    if (code) {
×
260
      mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
×
261
    }
262
  } else {
UNCOV
263
    mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
×
264
  }
UNCOV
265
}
×
266

267
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
6,779✔
268
  *hasEpset = false;
6,779✔
269

270
  pEpSet->numOfEps = 0;
6,779✔
271
  if (nodeId == SNODE_HANDLE) {
6,779✔
272
    SSnodeObj *pObj = NULL;
36✔
273
    void      *pIter = NULL;
36✔
274

275
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
36✔
276
    if (pIter != NULL) {
36!
277
      int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
36✔
278
      sdbRelease(pMnode->pSdb, pObj);
36✔
279
      sdbCancelFetch(pMnode->pSdb, pIter);
36✔
280
      if (code) {
36!
281
        *hasEpset = false;
×
282
        mError("failed to set epset");
×
283
      } else {
284
        *hasEpset = true;
36✔
285
      }
286
      return code;
36✔
287
    } else {
288
      mError("failed to acquire snode epset");
×
289
      return TSDB_CODE_INVALID_PARA;
×
290
    }
291
  } else {
292
    SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
6,743✔
293
    if (pVgObj != NULL) {
6,743!
294
      SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
6,743✔
295
      mndReleaseVgroup(pMnode, pVgObj);
6,743✔
296

297
      epsetAssign(pEpSet, &epset);
6,743✔
298
      *hasEpset = true;
6,743✔
299
      return TSDB_CODE_SUCCESS;
6,743✔
300
    } else {
301
      mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
×
302
      return TSDB_CODE_SUCCESS;
×
303
    }
304
  }
305
}
306

307
int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
76✔
308
  *pTask = NULL;
76✔
309

310
  SStreamTask     *p = NULL;
76✔
311
  SStreamTaskIter *pIter = NULL;
76✔
312
  int32_t          code = createStreamTaskIter(pStream, &pIter);
76✔
313
  if (code) {
76!
314
    mError("failed to create stream task iter:%s", pStream->name);
×
315
    return code;
×
316
  }
317

318
  while (streamTaskIterNextTask(pIter)) {
256!
319
    code = streamTaskIterGetCurrent(pIter, &p);
256✔
320
    if (code) {
256!
321
      continue;
×
322
    }
323

324
    if (p->id.taskId == pId->taskId) {
256✔
325
      destroyStreamTaskIter(pIter);
76✔
326
      *pTask = p;
76✔
327
      return 0;
76✔
328
    }
329
  }
330

331
  destroyStreamTaskIter(pIter);
×
332
  return TSDB_CODE_FAILED;
×
333
}
334

335
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
9,787✔
336
  int32_t num = 0;
9,787✔
337
  for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
27,689✔
338
    SArray* pLevel = taosArrayGetP(pStream->tasks, i);
17,902✔
339
    num += taosArrayGetSize(pLevel);
17,902✔
340
  }
341

342
  return num;
9,787✔
343
}
344

345
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
2✔
346
  SSdb   *pSdb = pMnode->pSdb;
2✔
347
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
2✔
348
  if (pDb == NULL) {
2!
349
    TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
×
350
  }
351

352
  int32_t numOfStreams = 0;
2✔
353
  void   *pIter = NULL;
2✔
354
  while (1) {
×
355
    SStreamObj *pStream = NULL;
2✔
356
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2✔
357
    if (pIter == NULL) break;
2!
358

359
    if (pStream->sourceDbUid == pDb->uid) {
×
360
      numOfStreams++;
×
361
    }
362

363
    sdbRelease(pSdb, pStream);
×
364
  }
365

366
  *pNumOfStreams = numOfStreams;
2✔
367
  mndReleaseDb(pMnode, pDb);
2✔
368
  return 0;
2✔
369
}
370

371
static void freeTaskList(void* param) {
493✔
372
  SArray** pList = (SArray **)param;
493✔
373
  taosArrayDestroy(*pList);
493✔
374
}
493✔
375

376
int32_t mndInitExecInfo() {
716✔
377
  int32_t code = taosThreadMutexInit(&execInfo.lock, NULL);
716✔
378
  if (code) {
716!
379
    return code;
×
380
  }
381

382
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
716✔
383

384
  execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
716✔
385
  execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
716✔
386
  execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
716✔
387
  execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
716✔
388
  execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
716✔
389
  execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
716✔
390
  execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
716✔
391
  execInfo.pKilledChkptTrans = taosArrayInit(4, sizeof(SStreamTaskResetMsg));
716✔
392

393
  if (execInfo.pTaskList == NULL || execInfo.pTaskMap == NULL || execInfo.transMgmt.pDBTrans == NULL ||
716!
394
      execInfo.pTransferStateStreams == NULL || execInfo.pChkptStreams == NULL || execInfo.pStreamConsensus == NULL ||
716!
395
      execInfo.pNodeList == NULL || execInfo.pKilledChkptTrans == NULL) {
716!
396
    mError("failed to initialize the stream runtime env, code:%s", tstrerror(terrno));
×
397
    return terrno;
×
398
  }
399

400
  execInfo.role = NODE_ROLE_UNINIT;
716✔
401
  execInfo.switchFromFollower = false;
716✔
402

403
  taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
716✔
404
  taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
716✔
405
  taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
716✔
406
  return 0;
716✔
407
}
408

409
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
950✔
410
  SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
950✔
411
  if (pValidList == NULL) {  // not continue
950!
412
    return;
×
413
  }
414

415
  int32_t size = taosArrayGetSize(pNodeSnapshot);
950✔
416
  int32_t oldSize = taosArrayGetSize(execInfo.pNodeList);
950✔
417

418
  for (int32_t i = 0; i < oldSize; ++i) {
5,354✔
419
    SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
4,404✔
420
    if (p == NULL) {
4,404!
421
      continue;
×
422
    }
423

424
    for (int32_t j = 0; j < size; ++j) {
147,407✔
425
      SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
146,965✔
426
      if (pEntry == NULL) {
146,965!
427
        continue;
×
428
      }
429

430
      if (pEntry->nodeId == p->nodeId) {
146,965✔
431
        p->hbTimestamp = pEntry->hbTimestamp;
3,962✔
432

433
        void* px = taosArrayPush(pValidList, p);
3,962✔
434
        if (px == NULL) {
3,962!
435
          mError("failed to put node into list, nodeId:%d", p->nodeId);
×
436
        } else {
437
          mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
3,962✔
438
        }
439
        break;
3,962✔
440
      }
441
    }
442
  }
443

444
  taosArrayDestroy(execInfo.pNodeList);
950✔
445
  execInfo.pNodeList = pValidList;
950✔
446

447
  mDebug("remain %d valid node entries after clean expired nodes info, prev size:%d",
950✔
448
         (int32_t)taosArrayGetSize(pValidList), oldSize);
449
}
450

451
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
3,051✔
452
  void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
3,051✔
453
  if (p == NULL) {
3,051✔
454
    return TSDB_CODE_SUCCESS;
26✔
455
  }
456

457
  int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
3,025✔
458
  if (code) {
3,025!
459
    return code;
×
460
  }
461

462
  for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
7,755!
463
    STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
7,755✔
464
    if (pId == NULL) {
7,755!
465
      continue;
×
466
    }
467

468
    if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
7,755!
469
      taosArrayRemove(pExecNode->pTaskList, k);
3,025✔
470

471
      int32_t num = taosArrayGetSize(pExecNode->pTaskList);
3,025✔
472
      mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
3,025!
473
      break;
3,025✔
474
    }
475
  }
476

477
  return TSDB_CODE_SUCCESS;
3,025✔
478
}
479

480
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) {
2,230✔
481
  for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
2,230!
482
    STaskId *pId = taosArrayGet(pTaskIds, i);
×
483
    if (pId == NULL) {
×
484
      continue;
×
485
    }
486

487
    int32_t code = doRemoveTasks(pExecInfo, pId);
×
488
    if (code) {
×
489
      mError("failed to remove task in buffer list, 0x%"PRIx64, pId->taskId);
×
490
    }
491
  }
492
}
2,230✔
493

494
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
557✔
495
  SStreamTaskIter *pIter = NULL;
557✔
496
  streamMutexLock(&pExecNode->lock);
557✔
497

498
  // 1. remove task entries
499
  int32_t code = createStreamTaskIter(pStream, &pIter);
557✔
500
  if (code) {
557!
501
    streamMutexUnlock(&pExecNode->lock);
×
502
    mError("failed to create stream task iter:%s", pStream->name);
×
503
    return;
×
504
  }
505

506
  while (streamTaskIterNextTask(pIter)) {
3,608✔
507
    SStreamTask *pTask = NULL;
3,051✔
508
    code = streamTaskIterGetCurrent(pIter, &pTask);
3,051✔
509
    if (code) {
3,051!
510
      continue;
×
511
    }
512

513
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
3,051✔
514
    code = doRemoveTasks(pExecNode, &id);
3,051✔
515
    if (code) {
3,051!
516
      mError("failed to remove task in buffer list, 0x%"PRIx64, id.taskId);
×
517
    }
518
  }
519

520
  if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) {
557!
521
    streamMutexUnlock(&pExecNode->lock);
×
522
    destroyStreamTaskIter(pIter);
×
523
    mError("task map size, task list size, not equal");
×
524
    return;
×
525
  }
526

527
  // 2. remove stream entry in consensus hash table and checkpoint-report hash table
528
  code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
557✔
529
  if (code) {
557!
530
    mError("failed to clear consensus checkpointId, code:%s", tstrerror(code));
×
531
  }
532

533
  code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
557✔
534
  if (code) {
557✔
535
    mError("failed to clear the checkpoint report info, code:%s", tstrerror(code));
235!
536
  }
537

538
  streamMutexUnlock(&pExecNode->lock);
557✔
539
  destroyStreamTaskIter(pIter);
557✔
540
}
541

542
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
6,525✔
543
  size_t num = taosArrayGetSize(pList);
6,525✔
544

545
  for (int32_t i = 0; i < num; ++i) {
24,577!
546
    SNodeEntry *pEntry = taosArrayGet(pList, i);
24,577✔
547
    if (pEntry == NULL) {
24,577!
548
      continue;
×
549
    }
550

551
    if (pEntry->nodeId == nodeId) {
24,577✔
552
      return true;
6,525✔
553
    }
554
  }
555

556
  return false;
×
557
}
558

559
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
950✔
560
  SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
950✔
561
  if (pRemovedTasks == NULL) {
950!
562
    return terrno;
×
563
  }
564

565
  int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
950✔
566
  for (int32_t i = 0; i < numOfTask; ++i) {
7,512✔
567
    STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
6,562✔
568
    if (pId == NULL) {
6,562!
569
      continue;
×
570
    }
571

572
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
6,562✔
573
    if (pEntry == NULL) {
6,562!
574
      continue;
×
575
    }
576

577
    if (pEntry->nodeId == SNODE_HANDLE) {
6,562✔
578
      continue;
37✔
579
    }
580

581
    bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
6,525✔
582
    if (!existed) {
6,525!
583
      void* p = taosArrayPush(pRemovedTasks, pId);
×
584
      if (p == NULL) {
×
585
        mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
×
586
      }
587
    }
588
  }
589

590
  removeTasksInBuf(pRemovedTasks, &execInfo);
950✔
591

592
  mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
950✔
593
         (int32_t)taosArrayGetSize(execInfo.pTaskList));
594

595
  removeExpiredNodeInfo(pNodeSnapshot);
950✔
596

597
  taosArrayDestroy(pRemovedTasks);
950✔
598
  return 0;
950✔
599
}
600

601
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
12,295✔
602
  SMnode *pMnode = pReq->info.node;
12,295✔
603
  void   *pIter = NULL;
12,295✔
604
  int32_t code = 0;
12,295✔
605
  SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
12,295✔
606
  if (pDropped == NULL) {
12,295!
607
    return terrno;
×
608
  }
609

610
  mDebug("start to scan checkpoint report info");
12,295✔
611
  streamMutexLock(&execInfo.lock);
12,295✔
612

613
  while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
33,224✔
614
    SChkptReportInfo* px = (SChkptReportInfo *)pIter;
21,566✔
615
    if (taosArrayGetSize(px->pTaskList) == 0) {
21,566✔
616
      continue;
20,921✔
617
    }
618

619
    STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
645✔
620
    if (pInfo == NULL) {
645!
621
      continue;
×
622
    }
623

624
    SStreamObj *pStream = NULL;
645✔
625
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
645✔
626
    if (pStream == NULL || code != 0) {
645!
627
      mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
×
628
      void *p = taosArrayPush(pDropped, &pInfo->streamId);
×
629
      if (p == NULL) {
×
630
        mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
×
631
      }
632
      continue;
×
633
    }
634

635
    int32_t total = mndGetNumOfStreamTasks(pStream);
645✔
636
    int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList);
645✔
637

638
    if (total == existed) {
645✔
639
      mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
637✔
640
             pStream->uid, pStream->name, total);
641

642
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
637✔
643
      if (code == 0) {
637!
644
        code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
637✔
645
        if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {  // remove this entry
637!
646
          taosArrayClear(px->pTaskList);
637✔
647
          px->reportChkpt = pInfo->checkpointId;
637✔
648
          mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId);
637✔
649
        } else {
650
          mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet",
×
651
                 pInfo->streamId);
652
        }
653
        break;
637✔
654
      } else {
655
        mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
×
656
      }
657
    } else {
658
      mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name,
8!
659
             existed, total, total - existed);
660
    }
661

662
    sdbRelease(pMnode->pSdb, pStream);
8✔
663
  }
664

665
  int32_t size = taosArrayGetSize(pDropped);
12,295✔
666
  if (size > 0) {
12,295!
667
    for (int32_t i = 0; i < size; ++i) {
×
668
      int64_t* pStreamId = (int64_t *)taosArrayGet(pDropped, i);
×
669
      if (pStreamId == NULL) {
×
670
        continue;
×
671
      }
672

673
      code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId));
×
674
      if (code) {
×
675
        mError("failed to remove stream in buf:0x%"PRIx64, *pStreamId);
×
676
      }
677
    }
678

679
    int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
×
680
    mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
×
681
  }
682

683
  streamMutexUnlock(&execInfo.lock);
12,295✔
684

685
  taosArrayDestroy(pDropped);
12,295✔
686
  return TSDB_CODE_SUCCESS;
12,295✔
687
}
688

689
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
76✔
690
                                          int64_t ts) {
691
  char msg[128] = {0};
76✔
692
  snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
76✔
693

694
  STrans *pTrans = NULL;
76✔
695
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
76✔
696
  if (pTrans == NULL || code != 0) {
76!
697
    return terrno;
×
698
  }
699

700
  STaskId      id = {.streamId = pStream->uid, .taskId = taskId};
76✔
701
  SStreamTask *pTask = NULL;
76✔
702
  code = mndGetStreamTask(&id, pStream, &pTask);
76✔
703
  if (code) {
76!
704
    mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);
×
705
    sdbRelease(pMnode->pSdb, pStream);
×
706
    return code;
×
707
  }
708

709
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
76✔
710
  if (code) {
76!
711
    sdbRelease(pMnode->pSdb, pStream);
×
712
    return code;
×
713
  }
714

715
  code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts);
76✔
716
  if (code != 0) {
76!
717
    sdbRelease(pMnode->pSdb, pStream);
×
718
    mndTransDrop(pTrans);
×
719
    return code;
×
720
  }
721

722
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
76✔
723
  if (code) {
76!
724
    sdbRelease(pMnode->pSdb, pStream);
×
725
    mndTransDrop(pTrans);
×
726
    return code;
×
727
  }
728

729
  code = mndTransPrepare(pMnode, pTrans);
76✔
730
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
76!
731
    mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
×
732
    sdbRelease(pMnode->pSdb, pStream);
×
733
    mndTransDrop(pTrans);
×
734
    return code;
×
735
  }
736

737
  sdbRelease(pMnode->pSdb, pStream);
76✔
738
  mndTransDrop(pTrans);
76✔
739

740
  return TSDB_CODE_ACTION_IN_PROGRESS;
76✔
741
}
742

743
int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
76✔
744
  *pInfo = NULL;
76✔
745

746
  void* px = taosHashGet(pHash, &streamId, sizeof(streamId));
76✔
747
  if (px != NULL) {
76✔
748
    *pInfo = px;
60✔
749
    return 0;
60✔
750
  }
751

752
  SCheckpointConsensusInfo p = {
16✔
753
      .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
16✔
754
      .numOfTasks = numOfTasks,
755
      .streamId = streamId,
756
  };
757

758
  if (p.pTaskList == NULL) {
16!
759
    return terrno;
×
760
  }
761

762
  int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
16✔
763
  if (code == 0) {
16!
764
    void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId));
16✔
765
    *pInfo = pChkptInfo;
16✔
766
  } else {
767
    *pInfo = NULL;
×
768
  }
769

770
  return code;
16✔
771
}
772

773
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
774
// discard the msg may lead to the lost of connections.
775
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) {
76✔
776
  SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
76✔
777
  memcpy(&info.req, pRestoreInfo, sizeof(info.req));
76✔
778

779
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
256✔
780
    SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
180✔
781
    if (p == NULL) {
180!
782
      continue;
×
783
    }
784

785
    if (p->req.taskId == info.req.taskId) {
180!
UNCOV
786
      mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
×
787
             "->%" PRId64 " total existed:%d",
788
             pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs,
789
             (int32_t)taosArrayGetSize(pInfo->pTaskList));
UNCOV
790
      p->req.startTs = info.req.startTs;
×
UNCOV
791
      return;
×
792
    }
793
  }
794

795
  void *p = taosArrayPush(pInfo->pTaskList, &info);
76✔
796
  if (p == NULL) {
76!
797
    mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
×
798
  } else {
799
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
76✔
800
    mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
76✔
801
           " waiting tasks:%d",
802
           pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
803
  }
804
}
805

806
void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) {
16✔
807
  taosArrayDestroy(pInfo->pTaskList);
16✔
808
  pInfo->pTaskList = NULL;
16✔
809
}
16✔
810

811
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
573✔
812
  int32_t code = 0;
573✔
813
  int32_t numOfStreams = taosHashGetSize(pHash);
573✔
814
  if (numOfStreams == 0) {
573✔
815
    return code;
557✔
816
  }
817

818
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
16✔
819
  if (code == 0) {
16!
820
    mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
16✔
821
  } else {
822
    mError("failed to remove stream:0x%"PRIx64" in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
×
823
  }
824

825
  return code;
16✔
826
}
827

828
int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId) {
557✔
829
  int32_t code = 0;
557✔
830
  int32_t numOfStreams = taosHashGetSize(pHash);
557✔
831
  if (numOfStreams == 0) {
557✔
832
    return code;
116✔
833
  }
834

835
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
441✔
836
  if (code == 0) {
441✔
837
    mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
206!
838
  } else {
839
    mError("failed to remove stream:0x%"PRIx64" in chkpt-report list, remain:%d", streamId, numOfStreams);
235!
840
  }
841

842
  return code;
441✔
843
}
844

UNCOV
845
int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId) {
×
UNCOV
846
  SChkptReportInfo* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
×
UNCOV
847
  if (pInfo != NULL) {
×
UNCOV
848
    taosArrayClear(pInfo->pTaskList);
×
UNCOV
849
    mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
×
850
           pInfo->reportChkpt);
UNCOV
851
    return 0;
×
852
  }
853

854
  return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
855
}
856

857
static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
354✔
858
  int8_t status = atomic_load_8(&pStream->status);
354✔
859
  if (status == STREAM_STATUS__NORMAL) {
354!
860
    strcpy(dst, "ready");
354✔
861
  } else if (status == STREAM_STATUS__STOP) {
×
862
    strcpy(dst, "stop");
×
863
  } else if (status == STREAM_STATUS__FAILED) {
×
864
    strcpy(dst, "failed");
×
865
  } else if (status == STREAM_STATUS__RECOVER) {
×
866
    strcpy(dst, "recover");
×
867
  } else if (status == STREAM_STATUS__PAUSE) {
×
868
    strcpy(dst, "paused");
×
869
  }
870
}
354✔
871

872
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
354✔
873
  int8_t trigger = pStream->conf.trigger;
354✔
874
  if (trigger == STREAM_TRIGGER_AT_ONCE) {
354✔
875
    strcpy(dst, "at once");
253✔
876
  } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
101✔
877
    strcpy(dst, "window close");
81✔
878
  } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
20!
879
    strcpy(dst, "max delay");
20✔
880
  }
881
}
354✔
882

883
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
22,829✔
884
  memset(pBuf, 0, bufLen);
22,829✔
885
  pBuf[2] = '0';
22,829✔
886
  pBuf[3] = 'x';
22,829✔
887

888
  int32_t len = tintToHex(id, &pBuf[4]);
22,829✔
889
  varDataSetLen(pBuf, len + 2);
22,829✔
890
}
22,829✔
891

892
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
354✔
893
  int32_t code = 0;
354✔
894
  int32_t cols = 0;
354✔
895
  int32_t lino = 0;
354✔
896

897
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
354✔
898
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
354✔
899
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
900
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
901

902
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
354✔
903
  TSDB_CHECK_CODE(code, lino, _end);
354!
904

905
  // create time
906
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
907
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
908
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
354✔
909
  TSDB_CHECK_CODE(code, lino, _end);
354!
910

911
  // stream id
912
  char buf[128] = {0};
354✔
913
  int64ToHexStr(pStream->uid, buf, tListLen(buf));
354✔
914
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
915
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
916
  code = colDataSetVal(pColInfo, numOfRows, buf, false);
354✔
917
  TSDB_CHECK_CODE(code, lino, _end);
354!
918

919
  // related fill-history stream id
920
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
921
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
922
  if (pStream->hTaskUid != 0) {
354!
UNCOV
923
    int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
×
UNCOV
924
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
925
  } else {
926
    code = colDataSetVal(pColInfo, numOfRows, buf, true);
354✔
927
  }
928
  TSDB_CHECK_CODE(code, lino, _end);
354!
929

930
  // related fill-history stream id
931
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
354✔
932
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
354✔
933
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
934
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
935
  code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
354✔
936
  TSDB_CHECK_CODE(code, lino, _end);
354!
937

938
  char status[20 + VARSTR_HEADER_SIZE] = {0};
354✔
939
  char status2[20] = {0};
354✔
940
  mndShowStreamStatus(status2, pStream);
354✔
941
  STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
354✔
942
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
943
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
944

945
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
354✔
946
  TSDB_CHECK_CODE(code, lino, _end);
354!
947

948
  char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
354✔
949
  STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
354✔
950
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
951
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
952

953
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
354✔
954
  TSDB_CHECK_CODE(code, lino, _end);
354!
955

956
  char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
354✔
957
  STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
354✔
958
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
959
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
960

961
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
354✔
962
  TSDB_CHECK_CODE(code, lino, _end);
354!
963

964
  if (pStream->targetSTbName[0] == 0) {
354✔
965
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2✔
966
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2!
967

968
    code = colDataSetVal(pColInfo, numOfRows, NULL, true);
2✔
969
  } else {
970
    char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
352✔
971
    STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
352✔
972
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
352✔
973
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
352!
974

975
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
352✔
976
  }
977
  TSDB_CHECK_CODE(code, lino, _end);
354!
978

979
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
980
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
981

982
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
354✔
983
  TSDB_CHECK_CODE(code, lino, _end);
354!
984

985
  char trigger[20 + VARSTR_HEADER_SIZE] = {0};
354✔
986
  char trigger2[20] = {0};
354✔
987
  mndShowStreamTrigger(trigger2, pStream);
354✔
988
  STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
354✔
989
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
990
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
991

992
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
354✔
993
  TSDB_CHECK_CODE(code, lino, _end);
354!
994

995
  // sink_quota
996
  char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
354✔
997
  sinkQuota[0] = '0';
354✔
998
  char dstStr[20] = {0};
354✔
999
  STR_TO_VARSTR(dstStr, sinkQuota)
354✔
1000
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
1001
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
1002

1003
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
354✔
1004
  TSDB_CHECK_CODE(code, lino, _end);
354!
1005

1006
  // checkpoint interval
1007
  char tmp[20 + VARSTR_HEADER_SIZE] = {0};
354✔
1008
  sprintf(varDataVal(tmp), "%d sec", tsStreamCheckpointInterval);
354✔
1009
  varDataSetLen(tmp, strlen(varDataVal(tmp)));
354✔
1010

1011
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
1012
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
1013

1014
  code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
354✔
1015
  TSDB_CHECK_CODE(code, lino, _end);
354!
1016

1017
  // checkpoint backup type
1018
  char backup[20 + VARSTR_HEADER_SIZE] = {0};
354✔
1019
  STR_TO_VARSTR(backup, "none")
354✔
1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
1021
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
1022

1023
  code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
354✔
1024
  TSDB_CHECK_CODE(code, lino, _end);
354!
1025

1026
  // history scan idle
1027
  char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
354✔
1028
  strcpy(scanHistoryIdle, "100a");
354✔
1029

1030
  memset(dstStr, 0, tListLen(dstStr));
354✔
1031
  STR_TO_VARSTR(dstStr, scanHistoryIdle)
354✔
1032
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
354✔
1033
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
354!
1034

1035
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
354✔
1036

1037
_end:
354✔
1038
  if (code) {
354!
UNCOV
1039
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1040
  }
1041
  return code;
354✔
1042
}
1043

1044
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) {
22,409✔
1045
  SColumnInfoData *pColInfo = NULL;
22,409✔
1046
  int32_t          cols = 0;
22,409✔
1047
  int32_t          code = 0;
22,409✔
1048
  int32_t          lino = 0;
22,409✔
1049

1050
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
22,409✔
1051

1052
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
22,409✔
1053
  if (pe == NULL) {
22,409!
UNCOV
1054
    mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
×
1055
               " no valid status/stage info",
1056
           id.taskId, pStream->name, pStream->uid, pStream->createTime);
UNCOV
1057
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1058
  }
1059

1060
  // stream name
1061
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
22,409✔
1062
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
22,409✔
1063

1064
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1065
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1066

1067
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
22,409✔
1068
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1069

1070
  // task id
1071
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1072
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1073

1074
  char idstr[128] = {0};
22,409✔
1075
  int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
22,409✔
1076
  code = colDataSetVal(pColInfo, numOfRows, idstr, false);
22,409✔
1077
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1078

1079
  // node type
1080
  char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
22,409✔
1081
  varDataSetLen(nodeType, 5);
22,409✔
1082
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1083
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1084

1085
  if (pTask->info.nodeId > 0) {
22,409✔
1086
    memcpy(varDataVal(nodeType), "vnode", 5);
22,404✔
1087
  } else {
1088
    memcpy(varDataVal(nodeType), "snode", 5);
5✔
1089
  }
1090
  code = colDataSetVal(pColInfo, numOfRows, nodeType, false);
22,409✔
1091
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1092

1093
  // node id
1094
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1095
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1096

1097
  int64_t nodeId = TMAX(pTask->info.nodeId, 0);
22,409✔
1098
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
22,409✔
1099
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1100

1101
  // level
1102
  char level[20 + VARSTR_HEADER_SIZE] = {0};
22,409✔
1103
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
22,409✔
1104
    memcpy(varDataVal(level), "source", 6);
12,808✔
1105
    varDataSetLen(level, 6);
12,808✔
1106
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
9,601✔
1107
    memcpy(varDataVal(level), "agg", 3);
1,113✔
1108
    varDataSetLen(level, 3);
1,113✔
1109
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
8,488!
1110
    memcpy(varDataVal(level), "sink", 4);
8,488✔
1111
    varDataSetLen(level, 4);
8,488✔
1112
  }
1113

1114
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1115
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1116

1117
  code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
22,409✔
1118
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1119

1120
  // status
1121
  char status[20 + VARSTR_HEADER_SIZE] = {0};
22,409✔
1122

1123
  const char *pStatus = streamTaskGetStatusStr(pe->status);
22,409✔
1124
  STR_TO_VARSTR(status, pStatus);
22,409✔
1125

1126
  // status
1127
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1128
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1129

1130
  code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
22,409✔
1131
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1132

1133
  // stage
1134
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1135
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1136

1137
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
22,409✔
1138
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1139

1140
  // input queue
1141
  char        vbuf[40] = {0};
22,409✔
1142
  char        buf[38] = {0};
22,409✔
1143
  const char *queueInfoStr = "%4.2f MiB (%6.2f%)";
22,409✔
1144
  snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate);
22,409✔
1145
  STR_TO_VARSTR(vbuf, buf);
22,409✔
1146

1147
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1148
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1149

1150
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
22,409✔
1151
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1152

1153
  // input total
1154
  const char *formatTotalMb = "%7.2f MiB";
22,409✔
1155
  const char *formatTotalGb = "%7.2f GiB";
22,409✔
1156
  if (pe->procsTotal < 1024) {
22,409!
1157
    snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal);
22,409✔
1158
  } else {
UNCOV
1159
    snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024);
×
1160
  }
1161

1162
  memset(vbuf, 0, tListLen(vbuf));
22,409✔
1163
  STR_TO_VARSTR(vbuf, buf);
22,409✔
1164

1165
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1166
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1167

1168
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
22,409✔
1169
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1170

1171
  // process throughput
1172
  const char *formatKb = "%7.2f KiB/s";
22,409✔
1173
  const char *formatMb = "%7.2f MiB/s";
22,409✔
1174
  if (pe->procsThroughput < 1024) {
22,409✔
1175
    snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput);
22,338✔
1176
  } else {
1177
    snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024);
71✔
1178
  }
1179

1180
  memset(vbuf, 0, tListLen(vbuf));
22,409✔
1181
  STR_TO_VARSTR(vbuf, buf);
22,409✔
1182

1183
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1184
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1185

1186
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
22,409✔
1187
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1188

1189
  // output total
1190
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1191
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1192

1193
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
22,409✔
1194
    colDataSetNULL(pColInfo, numOfRows);
8,488!
1195
  } else {
1196
    sprintf(buf, formatTotalMb, pe->outputTotal);
13,921✔
1197
    memset(vbuf, 0, tListLen(vbuf));
13,921✔
1198
    STR_TO_VARSTR(vbuf, buf);
13,921✔
1199

1200
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
13,921✔
1201
    TSDB_CHECK_CODE(code, lino, _end);
13,921!
1202
  }
1203

1204
  // output throughput
1205
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1206
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1207

1208
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
22,409✔
1209
    colDataSetNULL(pColInfo, numOfRows);
8,488!
1210
  } else {
1211
    if (pe->outputThroughput < 1024) {
13,921✔
1212
      snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput);
13,871✔
1213
    } else {
1214
      snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024);
50✔
1215
    }
1216

1217
    memset(vbuf, 0, tListLen(vbuf));
13,921✔
1218
    STR_TO_VARSTR(vbuf, buf);
13,921✔
1219

1220
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
13,921✔
1221
    TSDB_CHECK_CODE(code, lino, _end);
13,921!
1222
  }
1223

1224
  // output queue
1225
  //          sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
1226
  //        STR_TO_VARSTR(vbuf, buf);
1227

1228
  //        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1229
  //        colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
1230

1231
  // info
1232
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
22,409✔
1233
    const char *sinkStr = "%.2f MiB";
8,488✔
1234
    snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
8,488✔
1235
  } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
13,921✔
1236
    // offset info
1237
    const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
12,808✔
1238
    snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
12,808✔
1239
  } else {
1240
    memset(buf, 0, tListLen(buf));
1,113✔
1241
  }
1242

1243
  STR_TO_VARSTR(vbuf, buf);
22,409✔
1244

1245
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1246
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1247

1248
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
22,409✔
1249
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1250

1251
  // start_time
1252
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1253
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1254

1255
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false);
22,409✔
1256
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1257

1258
  // start id
1259
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1260
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1261

1262
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false);
22,409✔
1263
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1264

1265
  // start ver
1266
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1267
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1268

1269
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false);
22,409✔
1270
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1271

1272
  // checkpoint time
1273
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1274
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1275

1276
  if (pe->checkpointInfo.latestTime != 0) {
22,409✔
1277
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
480✔
1278
  } else {
1279
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
21,929✔
1280
  }
1281
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1282

1283
  // checkpoint_id
1284
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1285
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1286

1287
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false);
22,409✔
1288
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1289

1290
  // checkpoint version
1291
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1292
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1293

1294
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false);
22,409✔
1295
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1296

1297
  // checkpoint size
1298
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1299
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1300

1301
  colDataSetNULL(pColInfo, numOfRows);
22,409!
1302

1303
  // checkpoint backup status
1304
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1305
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1306

1307
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
22,409✔
1308
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1309

1310
  // ds_err_info
1311
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1312
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1313

1314
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
22,409✔
1315
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1316

1317
  // history_task_id
1318
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1319
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1320

1321
  if (pe->hTaskId != 0) {
22,409✔
1322
    int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
66✔
1323
    code = colDataSetVal(pColInfo, numOfRows, idstr, false);
66✔
1324
  } else {
1325
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
22,343✔
1326
  }
1327
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1328

1329
  // history_task_status
1330
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,409✔
1331
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,409!
1332

1333
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
22,409✔
1334
  TSDB_CHECK_CODE(code, lino, _end);
22,409!
1335

1336
  _end:
22,409✔
1337
  if (code) {
22,409!
UNCOV
1338
    mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code));
×
1339
  }
1340
  return code;
22,409✔
1341
}
1342

1343
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
6,627✔
1344
  const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
6,627✔
1345
  const SEp *p = GET_ACTIVE_EP(pCurrent);
6,627✔
1346

1347
  if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
6,627!
1348
    return false;
6,627✔
1349
  }
UNCOV
1350
  return true;
×
1351
}
1352

1353
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) {
1,592✔
1354
  if (pInfo != NULL) {
1,592!
1355
    taosArrayDestroy(pInfo->pUpdateNodeList);
1,592✔
1356
    taosHashCleanup(pInfo->pDBMap);
1,592✔
1357
  }
1358
}
1,592✔
1359

1360
// 1. increase the replica does not affect the stream process.
1361
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
1362
// tasks on the will be removed replica.
1363
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
1364
// will handle it as mentioned in 1 & 2 items.
1365
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
1,592✔
1366
                               SVgroupChangeInfo *pInfo) {
1367
  int32_t code = 0;
1,592✔
1368
  int32_t lino = 0;
1,592✔
1369

1370
  if (pInfo == NULL) {
1,592!
UNCOV
1371
    return TSDB_CODE_INVALID_PARA;
×
1372
  }
1373

1374
  pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
1,592✔
1375
      pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
1,592✔
1376

1377
  if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
1,592!
UNCOV
1378
    mndDestroyVgroupChangeInfo(pInfo);
×
UNCOV
1379
    TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
×
1380
  }
1381

1382
  int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
1,592✔
1383
  for (int32_t i = 0; i < numOfNodes; ++i) {
8,480✔
1384
    SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
6,888✔
1385
    if (pPrevEntry == NULL) {
6,888!
UNCOV
1386
      continue;
×
1387
    }
1388

1389
    int32_t num = taosArrayGetSize(pNodeList);
6,888✔
1390
    for (int32_t j = 0; j < num; ++j) {
243,157✔
1391
      SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
242,896✔
1392
      if(pCurrent == NULL) {
242,896!
UNCOV
1393
        continue;
×
1394
      }
1395

1396
      if (pCurrent->nodeId == pPrevEntry->nodeId) {
242,896✔
1397
        if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
6,627!
UNCOV
1398
          const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
×
1399

UNCOV
1400
          char buf[256] = {0};
×
UNCOV
1401
          code = epsetToStr(&pCurrent->epset, buf, tListLen(buf));  // ignore this error
×
UNCOV
1402
          if (code) {
×
UNCOV
1403
            mError("failed to convert epset string, code:%s", tstrerror(code));
×
UNCOV
1404
            TSDB_CHECK_CODE(code, lino, _err);
×
1405
          }
1406

UNCOV
1407
          mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
×
1408
                 pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
1409

UNCOV
1410
          SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
×
UNCOV
1411
          epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
×
UNCOV
1412
          epsetAssign(&updateInfo.newEp, &pCurrent->epset);
×
1413

UNCOV
1414
          void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
×
UNCOV
1415
          TSDB_CHECK_NULL(p, code, lino, _err, terrno);
×
1416
        }
1417

1418
        // todo handle the snode info
1419
        if (pCurrent->nodeId != SNODE_HANDLE) {
6,627✔
1420
          SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
5,170✔
1421
          code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
5,170✔
1422
          mndReleaseVgroup(pMnode, pVgroup);
5,170✔
1423
          TSDB_CHECK_CODE(code, lino, _err);
5,170!
1424
        }
1425

1426
        break;
6,627✔
1427
      }
1428
    }
1429
  }
1430

1431
  return code;
1,592✔
1432

UNCOV
1433
  _err:
×
UNCOV
1434
  mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
×
1435
  mndDestroyVgroupChangeInfo(pInfo);
×
1436
  return code;
×
1437
  }
1438

1439
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
1,306✔
1440
  bool              allReady = false;
1,306✔
1441
  bool              nodeUpdated = false;
1,306✔
1442
  SVgroupChangeInfo changeInfo = {0};
1,306✔
1443

1444
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,306✔
1445

1446
  if (numOfNodes == 0) {
1,306✔
1447
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
658✔
1448
    execInfo.ts = taosGetTimestampSec();
658✔
1449
    return false;
658✔
1450
  }
1451

1452
  for (int32_t i = 0; i < numOfNodes; ++i) {
3,640✔
1453
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
2,992✔
1454
    if (pNodeEntry == NULL) {
2,992!
UNCOV
1455
      continue;
×
1456
    }
1457

1458
    if (pNodeEntry->stageUpdated) {
2,992!
UNCOV
1459
      mDebug("stream task not ready due to node update detected, checkpoint not issued");
×
UNCOV
1460
      return true;
×
1461
    }
1462
  }
1463

1464
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
648✔
1465
  if (code) {
648!
UNCOV
1466
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
1467
  }
1468

1469
  if (!allReady) {
648✔
1470
    mWarn("not all vnodes ready, quit from vnodes status check");
6!
1471
    return true;
6✔
1472
  }
1473

1474
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
642✔
1475
  if (code) {
642!
UNCOV
1476
    nodeUpdated = false;
×
1477
  } else {
1478
    nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
642✔
1479
    if (nodeUpdated) {
642!
UNCOV
1480
      mDebug("stream tasks not ready due to node update");
×
1481
    }
1482
  }
1483

1484
  mndDestroyVgroupChangeInfo(&changeInfo);
642✔
1485
  return nodeUpdated;
642✔
1486
}
1487

1488
// check if the node update happens or not
1489
bool mndStreamNodeIsUpdated(SMnode *pMnode) {
1,306✔
1490
  SArray *pNodeSnapshot = NULL;
1,306✔
1491

1492
  streamMutexLock(&execInfo.lock);
1,306✔
1493
  bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
1,306✔
1494
  streamMutexUnlock(&execInfo.lock);
1,306✔
1495

1496
  taosArrayDestroy(pNodeSnapshot);
1,306✔
1497
  return updated;
1,306✔
1498
}
1499

1500
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
732✔
1501
  SSdb      *pSdb = pMnode->pSdb;
732✔
1502
  void      *pIter = NULL;
732✔
1503
  SSnodeObj *pObj = NULL;
732✔
1504

1505
  if (pSrcDb->cfg.replications == 1) {
732!
1506
    return TSDB_CODE_SUCCESS;
732✔
1507
  } else {
1508
    while (1) {
UNCOV
1509
      pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
UNCOV
1510
      if (pIter == NULL) {
×
UNCOV
1511
        break;
×
1512
      }
1513

UNCOV
1514
      sdbRelease(pSdb, pObj);
×
UNCOV
1515
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1516
      return TSDB_CODE_SUCCESS;
×
1517
    }
1518

UNCOV
1519
    mError("snode not existed when trying to create stream in db with multiple replica");
×
UNCOV
1520
    return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
1521
  }
1522
}
1523

1524
uint32_t seed = 0;
UNCOV
1525
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
×
UNCOV
1526
  SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
×
1527
  rpcMsg.pCont = rpcMallocCont(pAction->contLen);
×
1528
  if (rpcMsg.pCont == NULL) {
×
1529
    return rpcMsg;
×
1530
  }
1531

UNCOV
1532
  rpcMsg.info.traceId.rootId = traceId;
×
UNCOV
1533
  rpcMsg.info.notFreeAhandle = 1;
×
1534

1535
  memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
×
UNCOV
1536
  return rpcMsg;
×
1537
}
1538

UNCOV
1539
void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) {
×
UNCOV
1540
  if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) ||
×
1541
      (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) ||
×
1542
      (pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) {
×
1543
    if (seed == 0) {
×
1544
      seed = taosGetTimestampSec();
×
1545
    }
1546

UNCOV
1547
    uint32_t v = taosRandR(&seed);
×
UNCOV
1548
    int32_t  choseItem = v % 5;
×
1549

1550
    if (choseItem == 0) {
×
1551
      // 1. one of update-checkpoint not send, restart and send it again
1552
      taosMsleep(5000);
×
UNCOV
1553
      if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) {
×
1554
        mError(
×
1555
            "***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will "
1556
            "rollback***");
UNCOV
1557
        exit(-1);
×
UNCOV
1558
      } else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) {  // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT
×
1559
        mError(
×
1560
            "***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will "
1561
            "not started***");
1562
      } else {  // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE
UNCOV
1563
        mError(
×
1564
            "***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will "
1565
            "started after restart***");
UNCOV
1566
        exit(-1);
×
1567
      }
1568
    } else if (choseItem == 1) {
×
1569
      // 2. repeat send update chkpt msg
1570
      mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***");
×
1571

1572
      mError("***repeat 1***");
×
UNCOV
1573
      SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature);
×
1574
      int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1);
×
1575

1576
      mError("***repeat 2***");
×
UNCOV
1577
      SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature);
×
1578
      code = tmsgSendReq(&pAction->epSet, &rpcMsg2);
×
1579

1580
      mError("***repeat 3***");
×
UNCOV
1581
      SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature);
×
1582
      code = tmsgSendReq(&pAction->epSet, &rpcMsg3);
×
1583
    } else if (choseItem == 2) {
×
1584
      // 3. sleep 40s and then send msg
1585
      mError("***idle for 30s, and then send msg***");
×
UNCOV
1586
      taosMsleep(30000);
×
1587
    } else {
1588
      // do nothing
1589
      //      mInfo("no error triggered");
1590
    }
1591
  }
UNCOV
1592
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc