• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
struct SStreamTaskIter {
25
  SStreamObj  *pStream;
26
  int32_t      level;
27
  int32_t      ordinalIndex;
28
  int32_t      totalLevel;
29
  SStreamTask *pTask;
30
};
31

32
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
33

UNCOV
34
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter) {
×
UNCOV
35
  *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
×
UNCOV
36
  if (*pIter == NULL) {
×
37
    return terrno;
×
38
  }
39

UNCOV
40
  (*pIter)->level = -1;
×
UNCOV
41
  (*pIter)->ordinalIndex = 0;
×
UNCOV
42
  (*pIter)->pStream = pStream;
×
UNCOV
43
  (*pIter)->totalLevel = taosArrayGetSize(pStream->tasks);
×
UNCOV
44
  (*pIter)->pTask = NULL;
×
45

UNCOV
46
  return 0;
×
47
}
48

UNCOV
49
bool streamTaskIterNextTask(SStreamTaskIter *pIter) {
×
UNCOV
50
  if (pIter->level >= pIter->totalLevel) {
×
51
    pIter->pTask = NULL;
×
52
    return false;
×
53
  }
54

UNCOV
55
  if (pIter->level == -1) {
×
UNCOV
56
    pIter->level += 1;
×
57
  }
58

UNCOV
59
  while (pIter->level < pIter->totalLevel) {
×
UNCOV
60
    SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
×
UNCOV
61
    if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
×
UNCOV
62
      pIter->level += 1;
×
UNCOV
63
      pIter->ordinalIndex = 0;
×
UNCOV
64
      pIter->pTask = NULL;
×
UNCOV
65
      continue;
×
66
    }
67

UNCOV
68
    pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
×
UNCOV
69
    pIter->ordinalIndex += 1;
×
UNCOV
70
    return true;
×
71
  }
72

UNCOV
73
  pIter->pTask = NULL;
×
UNCOV
74
  return false;
×
75
}
76

UNCOV
77
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask) {
×
UNCOV
78
  if (pTask) {
×
UNCOV
79
    *pTask = pIter->pTask;
×
UNCOV
80
    if (*pTask != NULL) {
×
UNCOV
81
      return TSDB_CODE_SUCCESS;
×
82
    }
83
  }
84

85
  return TSDB_CODE_INVALID_PARA;
×
86
}
87

UNCOV
88
void destroyStreamTaskIter(SStreamTaskIter *pIter) { taosMemoryFree(pIter); }
×
89

UNCOV
90
static bool checkStatusForEachReplica(SVgObj *pVgroup) {
×
UNCOV
91
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
×
UNCOV
92
    if (!pVgroup->vnodeGid[i].syncRestore) {
×
UNCOV
93
      mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
×
UNCOV
94
      return false;
×
95
    }
96

UNCOV
97
    ESyncState state = pVgroup->vnodeGid[i].syncState;
×
UNCOV
98
    if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
×
99
        state == TAOS_SYNC_STATE_CANDIDATE) {
UNCOV
100
      mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
×
101
            state);
UNCOV
102
      return false;
×
103
    }
104
  }
105

UNCOV
106
  return true;
×
107
}
108

UNCOV
109
static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) {
×
UNCOV
110
  SSnodeObj *pObj = NULL;
×
UNCOV
111
  void      *pIter = NULL;
×
UNCOV
112
  int32_t    code = 0;
×
113

UNCOV
114
  while (1) {
×
UNCOV
115
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
UNCOV
116
    if (pIter == NULL) {
×
UNCOV
117
      break;
×
118
    }
119

UNCOV
120
    SNodeEntry entry = {.nodeId = SNODE_HANDLE};
×
UNCOV
121
    code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
×
UNCOV
122
    if (code) {
×
123
      sdbRelease(pMnode->pSdb, pObj);
×
124
      sdbCancelFetch(pMnode->pSdb, pIter);
×
125
      mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
×
126
      return code;
×
127
    }
128

UNCOV
129
    char buf[256] = {0};
×
UNCOV
130
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
×
UNCOV
131
    if (code != 0) {  // print error and continue
×
132
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
133
    }
134

UNCOV
135
    void *p = taosArrayPush(pVgroupList, &entry);
×
UNCOV
136
    if (p == NULL) {
×
137
      code = terrno;
×
138
      sdbRelease(pMnode->pSdb, pObj);
×
139
      sdbCancelFetch(pMnode->pSdb, pIter);
×
140
      mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
×
141
      return code;
×
142
    } else {
UNCOV
143
      mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
×
144
    }
145

UNCOV
146
    sdbRelease(pMnode->pSdb, pObj);
×
147
  }
148

UNCOV
149
  return code;
×
150
}
151

UNCOV
152
static int32_t mndCheckMnodeStatus(SMnode* pMnode) {
×
UNCOV
153
  int32_t    code = 0;
×
154
  ESdbStatus objStatus;
UNCOV
155
  void      *pIter = NULL;
×
UNCOV
156
  SMnodeObj *pObj = NULL;
×
157

158
  while (1) {
UNCOV
159
    pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true);
×
UNCOV
160
    if (pIter == NULL) {
×
UNCOV
161
      break;
×
162
    }
163

UNCOV
164
    if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) {
×
UNCOV
165
      mDebug("mnode sync state:%d not leader/follower", pObj->syncState);
×
UNCOV
166
      sdbRelease(pMnode->pSdb, pObj);
×
UNCOV
167
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
168
      return TSDB_CODE_FAILED;
×
169
    }
170

UNCOV
171
    if (objStatus != SDB_STATUS_READY) {
×
UNCOV
172
      mWarn("mnode status:%d not ready", objStatus);
×
UNCOV
173
      sdbRelease(pMnode->pSdb, pObj);
×
UNCOV
174
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
175
      return TSDB_CODE_FAILED;
×
176
    }
177

UNCOV
178
    sdbRelease(pMnode->pSdb, pObj);
×
179
  }
180

UNCOV
181
  return TSDB_CODE_SUCCESS;
×
182
}
183

UNCOV
184
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) {
×
UNCOV
185
  SSdb     *pSdb = pMnode->pSdb;
×
UNCOV
186
  void     *pIter = NULL;
×
UNCOV
187
  SVgObj   *pVgroup = NULL;
×
UNCOV
188
  int32_t   code = 0;
×
UNCOV
189
  SHashObj *pHash = NULL;
×
190

UNCOV
191
  pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
UNCOV
192
  if (pHash == NULL) {
×
193
    mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
×
194
    return terrno;
×
195
  }
196

UNCOV
197
  while (1) {
×
UNCOV
198
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
199
    if (pIter == NULL) {
×
UNCOV
200
      break;
×
201
    }
202

UNCOV
203
    SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
×
UNCOV
204
    entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
×
205

UNCOV
206
    int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
×
UNCOV
207
    if (pReplica == NULL) {  // not exist, add it into hash map
×
UNCOV
208
      code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
×
UNCOV
209
      if (code) {
×
210
        mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
×
211
        sdbRelease(pSdb, pVgroup);
×
212
        sdbCancelFetch(pSdb, pIter);
×
213
        goto _end;  // take snapshot failed, and not all ready
×
214
      }
215
    } else {
UNCOV
216
      if (*pReplica != pVgroup->replica) {
×
UNCOV
217
        mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
×
218
              pVgroup->vgId, pVgroup->replica, *pReplica);
UNCOV
219
        *allReady = false;  // task snap success, but not all ready
×
220
      }
221
    }
222

223
    // if not all ready till now, no need to check the remaining vgroups,
224
    // but still we need to put the info of the existed vgroups into the snapshot list
UNCOV
225
    if (*allReady) {
×
UNCOV
226
      *allReady = checkStatusForEachReplica(pVgroup);
×
227
    }
228

UNCOV
229
    char buf[256] = {0};
×
UNCOV
230
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
×
UNCOV
231
    if (code != 0) {  // print error and continue
×
232
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
233
    }
234

UNCOV
235
    void *p = taosArrayPush(pVgroupList, &entry);
×
UNCOV
236
    if (p == NULL) {
×
237
      mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
×
238
      code = terrno;
×
239
      sdbRelease(pSdb, pVgroup);
×
240
      sdbCancelFetch(pSdb, pIter);
×
241
      goto _end;
×
242
    } else {
UNCOV
243
      mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
×
244
    }
245

UNCOV
246
    sdbRelease(pSdb, pVgroup);
×
247
  }
248

UNCOV
249
_end:
×
UNCOV
250
  taosHashCleanup(pHash);
×
UNCOV
251
  return code;
×
252
}
253

UNCOV
254
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
×
UNCOV
255
  int32_t   code = 0;
×
UNCOV
256
  SArray   *pVgroupList = NULL;
×
257

UNCOV
258
  *pList = NULL;
×
UNCOV
259
  *allReady = true;
×
260

UNCOV
261
  pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
×
UNCOV
262
  if (pVgroupList == NULL) {
×
263
    mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
×
264
    code = terrno;
×
265
    goto _err;
×
266
  }
267

268
  // 1. check for all vnodes status
UNCOV
269
  code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady);
×
UNCOV
270
  if (code) {
×
271
    goto _err;
×
272
  }
273

274
  // 2. add snode info
UNCOV
275
  code = mndAddSnodeInfo(pMnode, pVgroupList);
×
UNCOV
276
  if (code) {
×
277
    goto _err;
×
278
  }
279

280
  // 3. check for mnode status
UNCOV
281
  code = mndCheckMnodeStatus(pMnode);
×
UNCOV
282
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
283
    *allReady = false;
×
284
  }
285

UNCOV
286
  *pList = pVgroupList;
×
UNCOV
287
  return code;
×
288

289
_err:
×
290
  *allReady = false;
×
291
  taosArrayDestroy(pVgroupList);
×
292
  return code;
×
293
}
294

UNCOV
295
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
UNCOV
296
  void *pIter = NULL;
×
UNCOV
297
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
298
  *pStream = NULL;
×
299

UNCOV
300
  SStreamObj *p = NULL;
×
UNCOV
301
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
×
UNCOV
302
    if (p->uid == streamId) {
×
UNCOV
303
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
304
      *pStream = p;
×
UNCOV
305
      return TSDB_CODE_SUCCESS;
×
306
    }
UNCOV
307
    sdbRelease(pSdb, p);
×
308
  }
309

UNCOV
310
  return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
311
}
312

UNCOV
313
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
×
UNCOV
314
  STrans *pTrans = mndAcquireTrans(pMnode, transId);
×
UNCOV
315
  if (pTrans != NULL) {
×
UNCOV
316
    mInfo("kill active transId:%d in Db:%s", transId, pDbName);
×
UNCOV
317
    int32_t code = mndKillTrans(pMnode, pTrans);
×
UNCOV
318
    mndReleaseTrans(pMnode, pTrans);
×
UNCOV
319
    if (code) {
×
UNCOV
320
      mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
×
321
    }
322
  } else {
323
    mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
×
324
  }
UNCOV
325
}
×
326

UNCOV
327
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
×
UNCOV
328
  *hasEpset = false;
×
329

UNCOV
330
  pEpSet->numOfEps = 0;
×
UNCOV
331
  if (nodeId == SNODE_HANDLE) {
×
UNCOV
332
    SSnodeObj *pObj = NULL;
×
UNCOV
333
    void      *pIter = NULL;
×
334

UNCOV
335
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
UNCOV
336
    if (pIter != NULL) {
×
UNCOV
337
      int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
×
UNCOV
338
      sdbRelease(pMnode->pSdb, pObj);
×
UNCOV
339
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
340
      if (code) {
×
341
        *hasEpset = false;
×
342
        mError("failed to set epset");
×
343
      } else {
UNCOV
344
        *hasEpset = true;
×
345
      }
UNCOV
346
      return code;
×
347
    } else {
348
      mError("failed to acquire snode epset");
×
349
      return TSDB_CODE_INVALID_PARA;
×
350
    }
351
  } else {
UNCOV
352
    SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
×
UNCOV
353
    if (pVgObj != NULL) {
×
UNCOV
354
      SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
×
UNCOV
355
      mndReleaseVgroup(pMnode, pVgObj);
×
356

UNCOV
357
      epsetAssign(pEpSet, &epset);
×
UNCOV
358
      *hasEpset = true;
×
UNCOV
359
      return TSDB_CODE_SUCCESS;
×
360
    } else {
UNCOV
361
      mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
×
UNCOV
362
      return TSDB_CODE_SUCCESS;
×
363
    }
364
  }
365
}
366

UNCOV
367
int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
×
UNCOV
368
  *pTask = NULL;
×
369

UNCOV
370
  SStreamTask     *p = NULL;
×
UNCOV
371
  SStreamTaskIter *pIter = NULL;
×
UNCOV
372
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
373
  if (code) {
×
374
    mError("failed to create stream task iter:%s", pStream->name);
×
375
    return code;
×
376
  }
377

UNCOV
378
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
379
    code = streamTaskIterGetCurrent(pIter, &p);
×
UNCOV
380
    if (code) {
×
381
      continue;
×
382
    }
383

UNCOV
384
    if (p->id.taskId == pId->taskId) {
×
UNCOV
385
      destroyStreamTaskIter(pIter);
×
UNCOV
386
      *pTask = p;
×
UNCOV
387
      return 0;
×
388
    }
389
  }
390

391
  destroyStreamTaskIter(pIter);
×
392
  return TSDB_CODE_FAILED;
×
393
}
394

UNCOV
395
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
×
UNCOV
396
  int32_t num = 0;
×
UNCOV
397
  for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
×
UNCOV
398
    SArray *pLevel = taosArrayGetP(pStream->tasks, i);
×
UNCOV
399
    num += taosArrayGetSize(pLevel);
×
400
  }
401

UNCOV
402
  return num;
×
403
}
404

UNCOV
405
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
×
UNCOV
406
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
407
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
×
UNCOV
408
  if (pDb == NULL) {
×
409
    TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
×
410
  }
411

UNCOV
412
  int32_t numOfStreams = 0;
×
UNCOV
413
  void   *pIter = NULL;
×
414
  while (1) {
×
UNCOV
415
    SStreamObj *pStream = NULL;
×
UNCOV
416
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
417
    if (pIter == NULL) break;
×
418

419
    if (pStream->sourceDbUid == pDb->uid) {
×
420
      numOfStreams++;
×
421
    }
422

423
    sdbRelease(pSdb, pStream);
×
424
  }
425

UNCOV
426
  *pNumOfStreams = numOfStreams;
×
UNCOV
427
  mndReleaseDb(pMnode, pDb);
×
UNCOV
428
  return 0;
×
429
}
430

UNCOV
431
static void freeTaskList(void *param) {
×
UNCOV
432
  SArray **pList = (SArray **)param;
×
UNCOV
433
  taosArrayDestroy(*pList);
×
UNCOV
434
}
×
435

UNCOV
436
int32_t mndInitExecInfo() {
×
UNCOV
437
  int32_t code = taosThreadMutexInit(&execInfo.lock, NULL);
×
UNCOV
438
  if (code) {
×
439
    return code;
×
440
  }
441

UNCOV
442
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
×
443

UNCOV
444
  execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
×
UNCOV
445
  execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
×
UNCOV
446
  execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
×
UNCOV
447
  execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
×
UNCOV
448
  execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
UNCOV
449
  execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
UNCOV
450
  execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
×
UNCOV
451
  execInfo.pKilledChkptTrans = taosArrayInit(4, sizeof(SStreamTaskResetMsg));
×
452

UNCOV
453
  if (execInfo.pTaskList == NULL || execInfo.pTaskMap == NULL || execInfo.transMgmt.pDBTrans == NULL ||
×
UNCOV
454
      execInfo.pTransferStateStreams == NULL || execInfo.pChkptStreams == NULL || execInfo.pStreamConsensus == NULL ||
×
UNCOV
455
      execInfo.pNodeList == NULL || execInfo.pKilledChkptTrans == NULL) {
×
456
    mError("failed to initialize the stream runtime env, code:%s", tstrerror(terrno));
×
457
    return terrno;
×
458
  }
459

UNCOV
460
  execInfo.role = NODE_ROLE_UNINIT;
×
UNCOV
461
  execInfo.switchFromFollower = false;
×
462

UNCOV
463
  taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
×
UNCOV
464
  taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
×
UNCOV
465
  taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
×
UNCOV
466
  return 0;
×
467
}
468

UNCOV
469
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
×
UNCOV
470
  SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
×
UNCOV
471
  if (pValidList == NULL) {  // not continue
×
472
    return;
×
473
  }
474

UNCOV
475
  int32_t size = taosArrayGetSize(pNodeSnapshot);
×
UNCOV
476
  int32_t oldSize = taosArrayGetSize(execInfo.pNodeList);
×
477

UNCOV
478
  for (int32_t i = 0; i < oldSize; ++i) {
×
UNCOV
479
    SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
×
UNCOV
480
    if (p == NULL) {
×
481
      continue;
×
482
    }
483

UNCOV
484
    for (int32_t j = 0; j < size; ++j) {
×
UNCOV
485
      SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
×
UNCOV
486
      if (pEntry == NULL) {
×
487
        continue;
×
488
      }
489

UNCOV
490
      if (pEntry->nodeId == p->nodeId) {
×
UNCOV
491
        p->hbTimestamp = pEntry->hbTimestamp;
×
492

UNCOV
493
        void *px = taosArrayPush(pValidList, p);
×
UNCOV
494
        if (px == NULL) {
×
495
          mError("failed to put node into list, nodeId:%d", p->nodeId);
×
496
        } else {
UNCOV
497
          mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
×
498
        }
UNCOV
499
        break;
×
500
      }
501
    }
502
  }
503

UNCOV
504
  taosArrayDestroy(execInfo.pNodeList);
×
UNCOV
505
  execInfo.pNodeList = pValidList;
×
506

UNCOV
507
  mDebug("remain %d valid node entries after clean expired nodes info, prev size:%d",
×
508
         (int32_t)taosArrayGetSize(pValidList), oldSize);
509
}
510

UNCOV
511
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
×
UNCOV
512
  void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
×
UNCOV
513
  if (p == NULL) {
×
UNCOV
514
    return TSDB_CODE_SUCCESS;
×
515
  }
516

UNCOV
517
  int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
×
UNCOV
518
  if (code) {
×
519
    return code;
×
520
  }
521

UNCOV
522
  for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
×
UNCOV
523
    STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
×
UNCOV
524
    if (pId == NULL) {
×
525
      continue;
×
526
    }
527

UNCOV
528
    if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
×
UNCOV
529
      taosArrayRemove(pExecNode->pTaskList, k);
×
530

UNCOV
531
      int32_t num = taosArrayGetSize(pExecNode->pTaskList);
×
UNCOV
532
      mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
×
UNCOV
533
      break;
×
534
    }
535
  }
536

UNCOV
537
  return TSDB_CODE_SUCCESS;
×
538
}
539

UNCOV
540
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo) {
×
UNCOV
541
  for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
×
542
    STaskId *pId = taosArrayGet(pTaskIds, i);
×
543
    if (pId == NULL) {
×
544
      continue;
×
545
    }
546

547
    int32_t code = doRemoveTasks(pExecInfo, pId);
×
548
    if (code) {
×
549
      mError("failed to remove task in buffer list, 0x%" PRIx64, pId->taskId);
×
550
    }
551
  }
UNCOV
552
}
×
553

UNCOV
554
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
×
UNCOV
555
  SStreamTaskIter *pIter = NULL;
×
UNCOV
556
  streamMutexLock(&pExecNode->lock);
×
557

558
  // 1. remove task entries
UNCOV
559
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
560
  if (code) {
×
561
    streamMutexUnlock(&pExecNode->lock);
×
562
    mError("failed to create stream task iter:%s", pStream->name);
×
563
    return;
×
564
  }
565

UNCOV
566
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
567
    SStreamTask *pTask = NULL;
×
UNCOV
568
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
569
    if (code) {
×
570
      continue;
×
571
    }
572

UNCOV
573
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
×
UNCOV
574
    code = doRemoveTasks(pExecNode, &id);
×
UNCOV
575
    if (code) {
×
576
      mError("failed to remove task in buffer list, 0x%" PRIx64, id.taskId);
×
577
    }
578
  }
579

UNCOV
580
  if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) {
×
581
    streamMutexUnlock(&pExecNode->lock);
×
582
    destroyStreamTaskIter(pIter);
×
583
    mError("task map size, task list size, not equal");
×
584
    return;
×
585
  }
586

587
  // 2. remove stream entry in consensus hash table and checkpoint-report hash table
UNCOV
588
  code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
×
UNCOV
589
  if (code) {
×
590
    mError("failed to clear consensus checkpointId, code:%s", tstrerror(code));
×
591
  }
592

UNCOV
593
  code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
×
UNCOV
594
  if (code) {
×
UNCOV
595
    mError("failed to clear the checkpoint report info, code:%s", tstrerror(code));
×
596
  }
597

UNCOV
598
  streamMutexUnlock(&pExecNode->lock);
×
UNCOV
599
  destroyStreamTaskIter(pIter);
×
600
}
601

UNCOV
602
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
×
UNCOV
603
  size_t num = taosArrayGetSize(pList);
×
604

UNCOV
605
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
606
    SNodeEntry *pEntry = taosArrayGet(pList, i);
×
UNCOV
607
    if (pEntry == NULL) {
×
608
      continue;
×
609
    }
610

UNCOV
611
    if (pEntry->nodeId == nodeId) {
×
UNCOV
612
      return true;
×
613
    }
614
  }
615

616
  return false;
×
617
}
618

UNCOV
619
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
×
UNCOV
620
  SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
×
UNCOV
621
  if (pRemovedTasks == NULL) {
×
622
    return terrno;
×
623
  }
624

UNCOV
625
  int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
×
UNCOV
626
  for (int32_t i = 0; i < numOfTask; ++i) {
×
UNCOV
627
    STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
628
    if (pId == NULL) {
×
629
      continue;
×
630
    }
631

UNCOV
632
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
×
UNCOV
633
    if (pEntry == NULL) {
×
634
      continue;
×
635
    }
636

UNCOV
637
    if (pEntry->nodeId == SNODE_HANDLE) {
×
UNCOV
638
      continue;
×
639
    }
640

UNCOV
641
    bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
×
UNCOV
642
    if (!existed) {
×
643
      void *p = taosArrayPush(pRemovedTasks, pId);
×
644
      if (p == NULL) {
×
645
        mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
×
646
      }
647
    }
648
  }
649

UNCOV
650
  removeTasksInBuf(pRemovedTasks, &execInfo);
×
651

UNCOV
652
  mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
×
653
         (int32_t)taosArrayGetSize(execInfo.pTaskList));
654

UNCOV
655
  removeExpiredNodeInfo(pNodeSnapshot);
×
656

UNCOV
657
  taosArrayDestroy(pRemovedTasks);
×
UNCOV
658
  return 0;
×
659
}
660

UNCOV
661
static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) {
×
UNCOV
662
  int64_t checkpointId = -1;
×
UNCOV
663
  int32_t transId = -1;
×
UNCOV
664
  int32_t taskId = -1;
×
665

UNCOV
666
  int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList);
×
UNCOV
667
  if (existed != numOfTasks) {
×
UNCOV
668
    mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName,
×
669
           existed, numOfTasks, numOfTasks - existed);
UNCOV
670
    return -1;
×
671
  }
672

673
  // acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList
UNCOV
674
  for(int32_t i = 0; i < numOfTasks; ++i) {
×
UNCOV
675
    STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i);
×
UNCOV
676
    if (pInfo == NULL) {
×
677
      continue;
×
678
    }
679

UNCOV
680
    if (checkpointId == -1) {
×
UNCOV
681
      checkpointId = pInfo->checkpointId;
×
UNCOV
682
      transId = pInfo->transId;
×
UNCOV
683
      taskId = pInfo->taskId;
×
UNCOV
684
    } else if (checkpointId != pInfo->checkpointId) {
×
685
      mError("stream:0x%" PRIx64
×
686
             " checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64
687
             ", type 2 taskId:0x%x checkpointId:%" PRId64,
688
             pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId);
689
      return -1;
×
690
    }
691
  }
692

693
  // check for the correct checkpointId for current task info in STaskChkptInfo
UNCOV
694
  STaskChkptInfo  *p = taosArrayGet(pReportInfo->pTaskList, 0);
×
UNCOV
695
  STaskId id = {.streamId = p->streamId, .taskId = p->taskId};
×
UNCOV
696
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
697

698
  // cross-check failed, there must be something unknown wrong
UNCOV
699
  SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId));
×
UNCOV
700
  if (pTransInfo == NULL) {
×
UNCOV
701
    mWarn("stream:0x%" PRIx64 " no active trans exists for checkpoint transId:%d, it may have been cleared already",
×
702
           id.streamId, transId);
703

UNCOV
704
    if (pe->checkpointInfo.activeId != 0 && pe->checkpointInfo.activeId != checkpointId) {
×
705
      mWarn("stream:0x%" PRIx64 " active checkpointId is not equalled to the required, current:%" PRId64
×
706
            ", req:%" PRId64 " recheck next time",
707
            id.streamId, pe->checkpointInfo.activeId, checkpointId);
708
      return -1;
×
709
    } else {
710
      //  do nothing
711
    }
712
  } else {
UNCOV
713
    if (pTransInfo->transId != transId) {
×
UNCOV
714
      mError("stream:0x%" PRIx64
×
715
             " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time",
716
             id.streamId, pTransInfo->transId, transId);
UNCOV
717
      return -1;
×
718
    }
719
  }
720

UNCOV
721
  mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId,
×
722
         pName, numOfTasks);
723

UNCOV
724
  return TSDB_CODE_SUCCESS;
×
725
}
726

UNCOV
727
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
×
UNCOV
728
  SMnode *pMnode = pReq->info.node;
×
UNCOV
729
  void   *pIter = NULL;
×
UNCOV
730
  int32_t code = 0;
×
UNCOV
731
  SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
×
UNCOV
732
  if (pDropped == NULL) {
×
733
    return terrno;
×
734
  }
735

UNCOV
736
  mDebug("start to scan checkpoint report info");
×
737

UNCOV
738
  streamMutexLock(&execInfo.lock);
×
739

UNCOV
740
  while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
×
UNCOV
741
    SChkptReportInfo *px = (SChkptReportInfo *)pIter;
×
UNCOV
742
    if (taosArrayGetSize(px->pTaskList) == 0) {
×
UNCOV
743
      continue;
×
744
    }
745

UNCOV
746
    STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
×
UNCOV
747
    if (pInfo == NULL) {
×
748
      continue;
×
749
    }
750

UNCOV
751
    SStreamObj *pStream = NULL;
×
UNCOV
752
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
×
UNCOV
753
    if (pStream == NULL || code != 0) {
×
754
      mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
×
755
      void *p = taosArrayPush(pDropped, &pInfo->streamId);
×
756
      if (p == NULL) {
×
757
        mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
×
758
      }
759
      continue;
×
760
    }
761

UNCOV
762
    int32_t total = mndGetNumOfStreamTasks(pStream);
×
UNCOV
763
    int32_t ret = allTasksSendChkptReport(px, total, pStream->name);
×
UNCOV
764
    if (ret == 0) {
×
UNCOV
765
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
×
UNCOV
766
      if (code == 0) {
×
UNCOV
767
        code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
×
UNCOV
768
        if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {  // remove this entry
×
UNCOV
769
          taosArrayClear(px->pTaskList);
×
UNCOV
770
          mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64
×
771
                " to %" PRId64,
772
                pInfo->streamId, px->reportChkpt, pInfo->checkpointId);
UNCOV
773
          px->reportChkpt = pInfo->checkpointId;
×
774
        } else {
775
          mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet",
×
776
                 pInfo->streamId);
777
        }
778

UNCOV
779
        sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
780
        break;
×
781
      } else {
782
        mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
×
783
      }
784
    }
785

UNCOV
786
    sdbRelease(pMnode->pSdb, pStream);
×
787
  }
788

UNCOV
789
  int32_t size = taosArrayGetSize(pDropped);
×
UNCOV
790
  if (size > 0) {
×
791
    for (int32_t i = 0; i < size; ++i) {
×
792
      int64_t *pStreamId = (int64_t *)taosArrayGet(pDropped, i);
×
793
      if (pStreamId == NULL) {
×
794
        continue;
×
795
      }
796

797
      code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId));
×
798
      if (code) {
×
799
        mError("failed to remove stream in buf:0x%" PRIx64, *pStreamId);
×
800
      }
801
    }
802

803
    int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
×
804
    mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
×
805
  }
806

UNCOV
807
  streamMutexUnlock(&execInfo.lock);
×
808

UNCOV
809
  taosArrayDestroy(pDropped);
×
810

UNCOV
811
  mDebug("end to scan checkpoint report info")
×
UNCOV
812
  return TSDB_CODE_SUCCESS;
×
813
}
814

UNCOV
815
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
×
816
                                          int64_t ts) {
UNCOV
817
  char         msg[128] = {0};
×
UNCOV
818
  STrans      *pTrans = NULL;
×
UNCOV
819
  SStreamTask *pTask = NULL;
×
820

UNCOV
821
  snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
×
822

UNCOV
823
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
×
UNCOV
824
  if (pTrans == NULL || code != 0) {
×
825
    return terrno;
×
826
  }
827

UNCOV
828
  STaskId id = {.streamId = pStream->uid, .taskId = taskId};
×
UNCOV
829
  code = mndGetStreamTask(&id, pStream, &pTask);
×
UNCOV
830
  if (code) {
×
831
    mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);
×
832
    sdbRelease(pMnode->pSdb, pStream);
×
833
    return code;
×
834
  }
835

UNCOV
836
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
×
UNCOV
837
  if (code) {
×
838
    sdbRelease(pMnode->pSdb, pStream);
×
839
    return code;
×
840
  }
841

UNCOV
842
  code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts);
×
UNCOV
843
  if (code != 0) {
×
844
    sdbRelease(pMnode->pSdb, pStream);
×
845
    mndTransDrop(pTrans);
×
846
    return code;
×
847
  }
848

UNCOV
849
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
UNCOV
850
  if (code) {
×
851
    sdbRelease(pMnode->pSdb, pStream);
×
852
    mndTransDrop(pTrans);
×
853
    return code;
×
854
  }
855

UNCOV
856
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
857
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
858
    mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
×
859
    sdbRelease(pMnode->pSdb, pStream);
×
860
    mndTransDrop(pTrans);
×
861
    return code;
×
862
  }
863

UNCOV
864
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
865
  mndTransDrop(pTrans);
×
866

UNCOV
867
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
868
}
869

UNCOV
870
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
×
UNCOV
871
  *pInfo = NULL;
×
872

UNCOV
873
  void *px = taosHashGet(pHash, &streamId, sizeof(streamId));
×
UNCOV
874
  if (px != NULL) {
×
UNCOV
875
    *pInfo = px;
×
UNCOV
876
    return 0;
×
877
  }
878

UNCOV
879
  SCheckpointConsensusInfo p = {
×
UNCOV
880
      .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
×
881
      .numOfTasks = numOfTasks,
882
      .streamId = streamId,
883
  };
884

UNCOV
885
  if (p.pTaskList == NULL) {
×
886
    return terrno;
×
887
  }
888

UNCOV
889
  int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
×
UNCOV
890
  if (code == 0) {
×
UNCOV
891
    void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId));
×
UNCOV
892
    *pInfo = pChkptInfo;
×
893
  } else {
894
    *pInfo = NULL;
×
895
  }
896

UNCOV
897
  return code;
×
898
}
899

900
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
901
// discard the msg may lead to the lost of connections.
UNCOV
902
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) {
×
UNCOV
903
  SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
×
UNCOV
904
  memcpy(&info.req, pRestoreInfo, sizeof(info.req));
×
905

UNCOV
906
  int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList);
×
UNCOV
907
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
908
    SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
×
UNCOV
909
    if (p == NULL) {
×
910
      continue;
×
911
    }
912

UNCOV
913
    if (p->req.taskId == info.req.taskId) {
×
UNCOV
914
      mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
×
915
             "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d",
916
             pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId,
917
             info.req.checkpointId, num);
UNCOV
918
      p->req.startTs = info.req.startTs;
×
UNCOV
919
      p->req.checkpointId = info.req.checkpointId;
×
UNCOV
920
      p->req.transId = info.req.transId;
×
UNCOV
921
      return;
×
922
    }
923
  }
924

UNCOV
925
  void *p = taosArrayPush(pInfo->pTaskList, &info);
×
UNCOV
926
  if (p == NULL) {
×
927
    mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
×
928
  } else {
UNCOV
929
    num = taosArrayGetSize(pInfo->pTaskList);
×
UNCOV
930
    mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
×
931
           " waiting tasks:%d",
932
           pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
933
  }
934
}
935

UNCOV
936
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) {
×
UNCOV
937
  taosArrayDestroy(pInfo->pTaskList);
×
UNCOV
938
  pInfo->pTaskList = NULL;
×
UNCOV
939
}
×
940

UNCOV
941
int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
×
UNCOV
942
  int32_t code = 0;
×
UNCOV
943
  int32_t numOfStreams = taosHashGetSize(pHash);
×
UNCOV
944
  if (numOfStreams == 0) {
×
UNCOV
945
    return code;
×
946
  }
947

UNCOV
948
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
×
UNCOV
949
  if (code == 0) {
×
UNCOV
950
    mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
×
951
  } else {
952
    mError("failed to remove stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
×
953
  }
954

UNCOV
955
  return code;
×
956
}
957

UNCOV
958
int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
×
UNCOV
959
  int32_t code = 0;
×
UNCOV
960
  int32_t numOfStreams = taosHashGetSize(pHash);
×
UNCOV
961
  if (numOfStreams == 0) {
×
UNCOV
962
    return code;
×
963
  }
964

UNCOV
965
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
×
UNCOV
966
  if (code == 0) {
×
UNCOV
967
    mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
×
968
  } else {
UNCOV
969
    mError("failed to remove stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
×
970
  }
971

UNCOV
972
  return code;
×
973
}
974

UNCOV
975
int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId) {
×
UNCOV
976
  SChkptReportInfo *pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
×
UNCOV
977
  if (pInfo != NULL) {
×
UNCOV
978
    taosArrayClear(pInfo->pTaskList);
×
UNCOV
979
    mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
×
980
           pInfo->reportChkpt);
UNCOV
981
    return 0;
×
982
  }
983

984
  return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
985
}
986

UNCOV
987
static void mndShowStreamStatus(char *dst, int8_t status) {
×
UNCOV
988
  if (status == STREAM_STATUS__NORMAL) {
×
UNCOV
989
    tstrncpy(dst, "ready", MND_STREAM_TRIGGER_NAME_SIZE);
×
UNCOV
990
  } else if (status == STREAM_STATUS__STOP) {
×
991
    tstrncpy(dst, "stop", MND_STREAM_TRIGGER_NAME_SIZE);
×
UNCOV
992
  } else if (status == STREAM_STATUS__FAILED) {
×
993
    tstrncpy(dst, "failed", MND_STREAM_TRIGGER_NAME_SIZE);
×
UNCOV
994
  } else if (status == STREAM_STATUS__RECOVER) {
×
995
    tstrncpy(dst, "recover", MND_STREAM_TRIGGER_NAME_SIZE);
×
UNCOV
996
  } else if (status == STREAM_STATUS__PAUSE) {
×
UNCOV
997
    tstrncpy(dst, "paused", MND_STREAM_TRIGGER_NAME_SIZE);
×
998
  }
UNCOV
999
}
×
1000

UNCOV
1001
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
×
UNCOV
1002
  int8_t trigger = pStream->conf.trigger;
×
UNCOV
1003
  if (trigger == STREAM_TRIGGER_AT_ONCE) {
×
UNCOV
1004
    tstrncpy(dst, "at once", MND_STREAM_TRIGGER_NAME_SIZE);
×
UNCOV
1005
  } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
×
UNCOV
1006
    tstrncpy(dst, "window close", MND_STREAM_TRIGGER_NAME_SIZE);
×
UNCOV
1007
  } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
×
UNCOV
1008
    tstrncpy(dst, "max delay", MND_STREAM_TRIGGER_NAME_SIZE);
×
UNCOV
1009
  } else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
×
UNCOV
1010
    tstrncpy(dst, "force window close", MND_STREAM_TRIGGER_NAME_SIZE);
×
1011
  }
UNCOV
1012
}
×
1013

UNCOV
1014
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
×
UNCOV
1015
  memset(pBuf, 0, bufLen);
×
UNCOV
1016
  pBuf[2] = '0';
×
UNCOV
1017
  pBuf[3] = 'x';
×
1018

UNCOV
1019
  int32_t len = tintToHex(id, &pBuf[4]);
×
UNCOV
1020
  varDataSetLen(pBuf, len + 2);
×
UNCOV
1021
}
×
1022

UNCOV
1023
static int32_t isAllTaskPaused(SStreamObj *pStream, bool *pRes) {
×
UNCOV
1024
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1025
  int32_t          lino = 0;
×
UNCOV
1026
  SStreamTaskIter *pIter = NULL;
×
UNCOV
1027
  bool             isPaused =  true;
×
1028

UNCOV
1029
  taosRLockLatch(&pStream->lock);
×
UNCOV
1030
  code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
1031
  TSDB_CHECK_CODE(code, lino, _end);
×
1032

UNCOV
1033
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
1034
    SStreamTask *pTask = NULL;
×
UNCOV
1035
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
1036
    TSDB_CHECK_CODE(code, lino, _end);
×
1037

UNCOV
1038
    STaskId           id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
×
UNCOV
1039
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
1040
    if (pe == NULL) {
×
UNCOV
1041
      continue;
×
1042
    }
UNCOV
1043
    if (pe->status != TASK_STATUS__PAUSE) {
×
UNCOV
1044
      isPaused = false;
×
1045
    }
1046
  }
UNCOV
1047
  (*pRes) = isPaused;
×
1048

UNCOV
1049
_end:
×
UNCOV
1050
  destroyStreamTaskIter(pIter);
×
UNCOV
1051
  taosRUnLockLatch(&pStream->lock);
×
UNCOV
1052
  if (code != TSDB_CODE_SUCCESS) {
×
1053
    mError("error happens when get stream status, lino:%d, code:%s", lino, tstrerror(code));
×
1054
  }
UNCOV
1055
  return code;
×
1056
}
1057

UNCOV
1058
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
×
UNCOV
1059
  int32_t code = 0;
×
UNCOV
1060
  int32_t cols = 0;
×
UNCOV
1061
  int32_t lino = 0;
×
1062

UNCOV
1063
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1064
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
×
UNCOV
1065
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1066
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1067

UNCOV
1068
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
×
UNCOV
1069
  TSDB_CHECK_CODE(code, lino, _end);
×
1070

1071
  // create time
UNCOV
1072
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1073
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
UNCOV
1074
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
×
UNCOV
1075
  TSDB_CHECK_CODE(code, lino, _end);
×
1076

1077
  // stream id
UNCOV
1078
  char buf[128] = {0};
×
UNCOV
1079
  int64ToHexStr(pStream->uid, buf, tListLen(buf));
×
UNCOV
1080
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1081
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
UNCOV
1082
  code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
UNCOV
1083
  TSDB_CHECK_CODE(code, lino, _end);
×
1084

1085
  // related fill-history stream id
UNCOV
1086
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1087
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
UNCOV
1088
  if (pStream->hTaskUid != 0) {
×
1089
    int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
×
1090
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1091
  } else {
UNCOV
1092
    code = colDataSetVal(pColInfo, numOfRows, buf, true);
×
1093
  }
UNCOV
1094
  TSDB_CHECK_CODE(code, lino, _end);
×
1095

1096
  // related fill-history stream id
UNCOV
1097
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1098
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
×
UNCOV
1099
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1100
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
UNCOV
1101
  code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
×
UNCOV
1102
  TSDB_CHECK_CODE(code, lino, _end);
×
1103

UNCOV
1104
  char status[20 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1105
  char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
×
UNCOV
1106
  bool isPaused = false;
×
UNCOV
1107
  code = isAllTaskPaused(pStream, &isPaused);
×
UNCOV
1108
  TSDB_CHECK_CODE(code, lino, _end);
×
1109

UNCOV
1110
  int8_t streamStatus = atomic_load_8(&pStream->status);
×
UNCOV
1111
  if (isPaused) {
×
UNCOV
1112
    streamStatus = STREAM_STATUS__PAUSE;
×
1113
  }
UNCOV
1114
  mndShowStreamStatus(status2, streamStatus);
×
UNCOV
1115
  STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
×
UNCOV
1116
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1117
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1118

UNCOV
1119
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
×
UNCOV
1120
  TSDB_CHECK_CODE(code, lino, _end);
×
1121

UNCOV
1122
  char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1123
  STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
×
UNCOV
1124
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1125
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1126

UNCOV
1127
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
×
UNCOV
1128
  TSDB_CHECK_CODE(code, lino, _end);
×
1129

UNCOV
1130
  char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1131
  STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
×
UNCOV
1132
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1133
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1134

UNCOV
1135
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
×
UNCOV
1136
  TSDB_CHECK_CODE(code, lino, _end);
×
1137

UNCOV
1138
  if (pStream->targetSTbName[0] == 0) {
×
UNCOV
1139
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1140
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1141

UNCOV
1142
    code = colDataSetVal(pColInfo, numOfRows, NULL, true);
×
1143
  } else {
UNCOV
1144
    char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1145
    STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
×
UNCOV
1146
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1147
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1148

UNCOV
1149
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
×
1150
  }
UNCOV
1151
  TSDB_CHECK_CODE(code, lino, _end);
×
1152

UNCOV
1153
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1154
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1155

UNCOV
1156
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
×
UNCOV
1157
  TSDB_CHECK_CODE(code, lino, _end);
×
1158

UNCOV
1159
  char trigger[20 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1160
  char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
×
UNCOV
1161
  mndShowStreamTrigger(trigger2, pStream);
×
UNCOV
1162
  STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
×
UNCOV
1163
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1164
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1165

UNCOV
1166
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
×
UNCOV
1167
  TSDB_CHECK_CODE(code, lino, _end);
×
1168

1169
  // sink_quota
UNCOV
1170
  char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1171
  sinkQuota[0] = '0';
×
UNCOV
1172
  char dstStr[20] = {0};
×
UNCOV
1173
  STR_TO_VARSTR(dstStr, sinkQuota)
×
UNCOV
1174
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1175
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1176

UNCOV
1177
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
×
UNCOV
1178
  TSDB_CHECK_CODE(code, lino, _end);
×
1179

1180
  // checkpoint interval
UNCOV
1181
  char tmp[20 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1182
  (void)tsnprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%d sec", tsStreamCheckpointInterval);
×
UNCOV
1183
  varDataSetLen(tmp, strlen(varDataVal(tmp)));
×
1184

UNCOV
1185
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1186
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1187

UNCOV
1188
  code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
×
UNCOV
1189
  TSDB_CHECK_CODE(code, lino, _end);
×
1190

1191
  // checkpoint backup type
UNCOV
1192
  char backup[20 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1193
  STR_TO_VARSTR(backup, "none")
×
UNCOV
1194
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1195
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1196

UNCOV
1197
  code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
×
UNCOV
1198
  TSDB_CHECK_CODE(code, lino, _end);
×
1199

1200
  // history scan idle
UNCOV
1201
  char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1202
  tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
×
1203

UNCOV
1204
  memset(dstStr, 0, tListLen(dstStr));
×
UNCOV
1205
  STR_TO_VARSTR(dstStr, scanHistoryIdle)
×
UNCOV
1206
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1207
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1208

UNCOV
1209
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
×
1210

UNCOV
1211
_end:
×
UNCOV
1212
  if (code) {
×
1213
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1214
  }
UNCOV
1215
  return code;
×
1216
}
1217

UNCOV
1218
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows,
×
1219
                              int32_t precision) {
UNCOV
1220
  SColumnInfoData *pColInfo = NULL;
×
UNCOV
1221
  int32_t          cols = 0;
×
UNCOV
1222
  int32_t          code = 0;
×
UNCOV
1223
  int32_t          lino = 0;
×
1224

UNCOV
1225
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
×
1226

UNCOV
1227
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
1228
  if (pe == NULL) {
×
1229
    mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
×
1230
           " no valid status/stage info",
1231
           id.taskId, pStream->name, pStream->uid, pStream->createTime);
1232
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1233
  }
1234

1235
  // stream name
UNCOV
1236
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1237
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
×
1238

UNCOV
1239
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1240
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1241

UNCOV
1242
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
×
UNCOV
1243
  TSDB_CHECK_CODE(code, lino, _end);
×
1244

1245
  // task id
UNCOV
1246
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1247
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1248

UNCOV
1249
  char idstr[128] = {0};
×
UNCOV
1250
  int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
×
UNCOV
1251
  code = colDataSetVal(pColInfo, numOfRows, idstr, false);
×
UNCOV
1252
  TSDB_CHECK_CODE(code, lino, _end);
×
1253

1254
  // node type
UNCOV
1255
  char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1256
  varDataSetLen(nodeType, 5);
×
UNCOV
1257
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1258
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1259

UNCOV
1260
  if (pTask->info.nodeId > 0) {
×
UNCOV
1261
    memcpy(varDataVal(nodeType), "vnode", 5);
×
1262
  } else {
UNCOV
1263
    memcpy(varDataVal(nodeType), "snode", 5);
×
1264
  }
UNCOV
1265
  code = colDataSetVal(pColInfo, numOfRows, nodeType, false);
×
UNCOV
1266
  TSDB_CHECK_CODE(code, lino, _end);
×
1267

1268
  // node id
UNCOV
1269
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1270
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1271

UNCOV
1272
  int64_t nodeId = TMAX(pTask->info.nodeId, 0);
×
UNCOV
1273
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
×
UNCOV
1274
  TSDB_CHECK_CODE(code, lino, _end);
×
1275

1276
  // level
UNCOV
1277
  char level[20 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1278
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
UNCOV
1279
    STR_WITH_SIZE_TO_VARSTR(level, "source", 6);
×
UNCOV
1280
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
×
UNCOV
1281
    STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
×
UNCOV
1282
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
×
UNCOV
1283
    STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
×
1284
  }
1285

UNCOV
1286
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1287
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1288

UNCOV
1289
  code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
×
UNCOV
1290
  TSDB_CHECK_CODE(code, lino, _end);
×
1291

1292
  // status
UNCOV
1293
  char status[20 + VARSTR_HEADER_SIZE] = {0};
×
1294

UNCOV
1295
  const char *pStatus = streamTaskGetStatusStr(pe->status);
×
UNCOV
1296
  STR_TO_VARSTR(status, pStatus);
×
1297

1298
  // status
UNCOV
1299
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1300
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1301

UNCOV
1302
  code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
×
UNCOV
1303
  TSDB_CHECK_CODE(code, lino, _end);
×
1304

1305
  // stage
UNCOV
1306
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1307
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1308

UNCOV
1309
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
×
UNCOV
1310
  TSDB_CHECK_CODE(code, lino, _end);
×
1311

1312
  // input queue
UNCOV
1313
  char        vbuf[TSDB_STREAM_NOTIFY_STAT_LEN + 2] = {0};
×
UNCOV
1314
  char        buf[TSDB_STREAM_NOTIFY_STAT_LEN] = {0};
×
UNCOV
1315
  const char *queueInfoStr = "%4.2f MiB (%6.2f%)";
×
UNCOV
1316
  snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate);
×
UNCOV
1317
  STR_TO_VARSTR(vbuf, buf);
×
1318

UNCOV
1319
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1320
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1321

UNCOV
1322
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
UNCOV
1323
  TSDB_CHECK_CODE(code, lino, _end);
×
1324

1325
  // input total
UNCOV
1326
  const char *formatTotalMb = "%7.2f MiB";
×
UNCOV
1327
  const char *formatTotalGb = "%7.2f GiB";
×
UNCOV
1328
  if (pe->procsTotal < 1024) {
×
UNCOV
1329
    snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal);
×
1330
  } else {
1331
    snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024);
×
1332
  }
1333

UNCOV
1334
  memset(vbuf, 0, tListLen(vbuf));
×
UNCOV
1335
  STR_TO_VARSTR(vbuf, buf);
×
1336

UNCOV
1337
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1338
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1339

UNCOV
1340
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
UNCOV
1341
  TSDB_CHECK_CODE(code, lino, _end);
×
1342

1343
  // process throughput
UNCOV
1344
  const char *formatKb = "%7.2f KiB/s";
×
UNCOV
1345
  const char *formatMb = "%7.2f MiB/s";
×
UNCOV
1346
  if (pe->procsThroughput < 1024) {
×
UNCOV
1347
    snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput);
×
1348
  } else {
UNCOV
1349
    snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024);
×
1350
  }
1351

UNCOV
1352
  memset(vbuf, 0, tListLen(vbuf));
×
UNCOV
1353
  STR_TO_VARSTR(vbuf, buf);
×
1354

UNCOV
1355
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1356
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1357

UNCOV
1358
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
UNCOV
1359
  TSDB_CHECK_CODE(code, lino, _end);
×
1360

1361
  // output total
UNCOV
1362
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1363
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1364

UNCOV
1365
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
×
UNCOV
1366
    colDataSetNULL(pColInfo, numOfRows);
×
1367
  } else {
UNCOV
1368
    (void)tsnprintf(buf, sizeof(buf), formatTotalMb, pe->outputTotal);
×
UNCOV
1369
    memset(vbuf, 0, tListLen(vbuf));
×
UNCOV
1370
    STR_TO_VARSTR(vbuf, buf);
×
1371

UNCOV
1372
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
UNCOV
1373
    TSDB_CHECK_CODE(code, lino, _end);
×
1374
  }
1375

1376
  // output throughput
UNCOV
1377
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1378
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1379

UNCOV
1380
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
×
UNCOV
1381
    colDataSetNULL(pColInfo, numOfRows);
×
1382
  } else {
UNCOV
1383
    if (pe->outputThroughput < 1024) {
×
UNCOV
1384
      snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput);
×
1385
    } else {
UNCOV
1386
      snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024);
×
1387
    }
1388

UNCOV
1389
    memset(vbuf, 0, tListLen(vbuf));
×
UNCOV
1390
    STR_TO_VARSTR(vbuf, buf);
×
1391

UNCOV
1392
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
UNCOV
1393
    TSDB_CHECK_CODE(code, lino, _end);
×
1394
  }
1395
  // info
UNCOV
1396
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
×
UNCOV
1397
    const char *sinkStr = "%.2f MiB";
×
UNCOV
1398
    snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
×
UNCOV
1399
  } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {  // offset info
×
UNCOV
1400
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
×
UNCOV
1401
      int32_t ret = taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision);
×
UNCOV
1402
      if (ret != 0) {
×
1403
        mError("failed to format processed timewindow, skey:%" PRId64, pe->processedVer);
×
1404
        memset(buf, 0, tListLen(buf));
×
1405
      }
1406
    } else {
UNCOV
1407
      const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
×
UNCOV
1408
      snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
×
1409
    }
1410
  } else {
UNCOV
1411
    memset(buf, 0, tListLen(buf));
×
1412
  }
1413

UNCOV
1414
  STR_TO_VARSTR(vbuf, buf);
×
1415

UNCOV
1416
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1417
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1418

UNCOV
1419
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
UNCOV
1420
  TSDB_CHECK_CODE(code, lino, _end);
×
1421

1422
  // start_time
UNCOV
1423
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1424
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1425

UNCOV
1426
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false);
×
UNCOV
1427
  TSDB_CHECK_CODE(code, lino, _end);
×
1428

1429
  // start id
UNCOV
1430
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1431
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1432

UNCOV
1433
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false);
×
UNCOV
1434
  TSDB_CHECK_CODE(code, lino, _end);
×
1435

1436
  // start ver
UNCOV
1437
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1438
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1439

UNCOV
1440
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false);
×
UNCOV
1441
  TSDB_CHECK_CODE(code, lino, _end);
×
1442

1443
  // checkpoint time
UNCOV
1444
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1445
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1446

UNCOV
1447
  if (pe->checkpointInfo.latestTime != 0) {
×
UNCOV
1448
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
×
1449
  } else {
UNCOV
1450
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
×
1451
  }
UNCOV
1452
  TSDB_CHECK_CODE(code, lino, _end);
×
1453

1454
  // checkpoint_id
UNCOV
1455
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1456
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1457

UNCOV
1458
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false);
×
UNCOV
1459
  TSDB_CHECK_CODE(code, lino, _end);
×
1460

1461
  // checkpoint version
UNCOV
1462
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1463
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1464

UNCOV
1465
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false);
×
UNCOV
1466
  TSDB_CHECK_CODE(code, lino, _end);
×
1467

1468
  // checkpoint size
UNCOV
1469
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1470
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1471

UNCOV
1472
  colDataSetNULL(pColInfo, numOfRows);
×
1473

1474
  // checkpoint backup status
UNCOV
1475
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1476
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1477

UNCOV
1478
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
×
UNCOV
1479
  TSDB_CHECK_CODE(code, lino, _end);
×
1480

1481
  // ds_err_info
UNCOV
1482
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1483
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1484

UNCOV
1485
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
×
UNCOV
1486
  TSDB_CHECK_CODE(code, lino, _end);
×
1487

1488
  // history_task_id
UNCOV
1489
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1490
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1491

UNCOV
1492
  if (pe->hTaskId != 0) {
×
UNCOV
1493
    int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
×
UNCOV
1494
    code = colDataSetVal(pColInfo, numOfRows, idstr, false);
×
1495
  } else {
UNCOV
1496
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
×
1497
  }
UNCOV
1498
  TSDB_CHECK_CODE(code, lino, _end);
×
1499

1500
  // history_task_status
UNCOV
1501
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1502
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1503

UNCOV
1504
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
×
UNCOV
1505
  TSDB_CHECK_CODE(code, lino, _end);
×
1506

1507
  // notify_event_stat
UNCOV
1508
  int32_t offset =0;
×
UNCOV
1509
  if (pe->notifyEventStat.notifyEventAddTimes > 0) {
×
1510
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Add %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1511
                        pe->notifyEventStat.notifyEventAddTimes, pe->notifyEventStat.notifyEventAddElems,
1512
                        pe->notifyEventStat.notifyEventAddCostSec);
1513
  }
UNCOV
1514
  if (pe->notifyEventStat.notifyEventPushTimes > 0) {
×
1515
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Push %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1516
                        pe->notifyEventStat.notifyEventPushTimes, pe->notifyEventStat.notifyEventPushElems,
1517
                        pe->notifyEventStat.notifyEventPushCostSec);
1518
  }
UNCOV
1519
  if (pe->notifyEventStat.notifyEventPackTimes > 0) {
×
1520
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Pack %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1521
                        pe->notifyEventStat.notifyEventPackTimes, pe->notifyEventStat.notifyEventPackElems,
1522
                        pe->notifyEventStat.notifyEventPackCostSec);
1523
  }
UNCOV
1524
  if (pe->notifyEventStat.notifyEventSendTimes > 0) {
×
1525
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Send %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1526
                        pe->notifyEventStat.notifyEventSendTimes, pe->notifyEventStat.notifyEventSendElems,
1527
                        pe->notifyEventStat.notifyEventSendCostSec);
1528
  }
UNCOV
1529
  if (pe->notifyEventStat.notifyEventHoldElems > 0) {
×
1530
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "[Hold %" PRId64 " elems] ",
×
1531
                        pe->notifyEventStat.notifyEventHoldElems);
1532
  }
UNCOV
1533
  TSDB_CHECK_CONDITION(offset < sizeof(buf), code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
UNCOV
1534
  buf[offset] = '\0';
×
1535

UNCOV
1536
  STR_TO_VARSTR(vbuf, buf);
×
1537

UNCOV
1538
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1539
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1540

UNCOV
1541
  if (offset == 0) {
×
UNCOV
1542
    colDataSetNULL(pColInfo, numOfRows);
×
1543
  } else {
1544
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
1545
    TSDB_CHECK_CODE(code, lino, _end);
×
1546
  }
1547

1548
_end:
×
UNCOV
1549
  if (code) {
×
1550
    mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code));
×
1551
  }
UNCOV
1552
  return code;
×
1553
}
1554

UNCOV
1555
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
×
UNCOV
1556
  const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
×
UNCOV
1557
  const SEp *p = GET_ACTIVE_EP(pCurrent);
×
1558

UNCOV
1559
  if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
×
UNCOV
1560
    return false;
×
1561
  }
1562
  return true;
×
1563
}
1564

UNCOV
1565
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo) {
×
UNCOV
1566
  if (pInfo != NULL) {
×
UNCOV
1567
    taosArrayDestroy(pInfo->pUpdateNodeList);
×
UNCOV
1568
    taosHashCleanup(pInfo->pDBMap);
×
1569
  }
UNCOV
1570
}
×
1571

1572
// 1. increase the replica does not affect the stream process.
1573
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
1574
// tasks on the will be removed replica.
1575
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
1576
// will handle it as mentioned in 1 & 2 items.
UNCOV
1577
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
×
1578
                               SVgroupChangeInfo *pInfo) {
UNCOV
1579
  int32_t code = 0;
×
UNCOV
1580
  int32_t lino = 0;
×
1581

UNCOV
1582
  if (pInfo == NULL) {
×
1583
    return TSDB_CODE_INVALID_PARA;
×
1584
  }
1585

UNCOV
1586
  pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo));
×
UNCOV
1587
  pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
×
1588

UNCOV
1589
  if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
×
1590
    mndDestroyVgroupChangeInfo(pInfo);
×
1591
    TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
×
1592
  }
1593

UNCOV
1594
  int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
×
UNCOV
1595
  for (int32_t i = 0; i < numOfNodes; ++i) {
×
UNCOV
1596
    SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
×
UNCOV
1597
    if (pPrevEntry == NULL) {
×
1598
      continue;
×
1599
    }
1600

UNCOV
1601
    int32_t num = taosArrayGetSize(pNodeList);
×
UNCOV
1602
    for (int32_t j = 0; j < num; ++j) {
×
UNCOV
1603
      SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
×
UNCOV
1604
      if (pCurrent == NULL) {
×
1605
        continue;
×
1606
      }
1607

UNCOV
1608
      if (pCurrent->nodeId == pPrevEntry->nodeId) {
×
UNCOV
1609
        if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
×
UNCOV
1610
          const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
×
1611

UNCOV
1612
          char buf[256] = {0};
×
UNCOV
1613
          code = epsetToStr(&pCurrent->epset, buf, tListLen(buf));  // ignore this error
×
UNCOV
1614
          if (code) {
×
1615
            mError("failed to convert epset string, code:%s", tstrerror(code));
×
1616
            TSDB_CHECK_CODE(code, lino, _err);
×
1617
          }
1618

UNCOV
1619
          mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
×
1620
                 pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
1621

UNCOV
1622
          SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
×
UNCOV
1623
          epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
×
UNCOV
1624
          epsetAssign(&updateInfo.newEp, &pCurrent->epset);
×
1625

UNCOV
1626
          void *p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
×
UNCOV
1627
          TSDB_CHECK_NULL(p, code, lino, _err, terrno);
×
1628
        }
1629

1630
        // todo handle the snode info
UNCOV
1631
        if (pCurrent->nodeId != SNODE_HANDLE) {
×
UNCOV
1632
          SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
×
UNCOV
1633
          code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
UNCOV
1634
          mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
1635
          TSDB_CHECK_CODE(code, lino, _err);
×
1636
        }
1637

UNCOV
1638
        break;
×
1639
      }
1640
    }
1641
  }
1642

UNCOV
1643
  return code;
×
1644

1645
_err:
×
1646
  mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
×
1647
  mndDestroyVgroupChangeInfo(pInfo);
×
1648
  return code;
×
1649
}
1650

UNCOV
1651
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
×
UNCOV
1652
  bool              allReady = false;
×
UNCOV
1653
  bool              nodeUpdated = false;
×
UNCOV
1654
  SVgroupChangeInfo changeInfo = {0};
×
1655

UNCOV
1656
  int32_t numOfNodes = extractStreamNodeList(pMnode);
×
1657

UNCOV
1658
  if (numOfNodes == 0) {
×
UNCOV
1659
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
×
UNCOV
1660
    execInfo.ts = taosGetTimestampSec();
×
UNCOV
1661
    return false;
×
1662
  }
1663

UNCOV
1664
  for (int32_t i = 0; i < numOfNodes; ++i) {
×
UNCOV
1665
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
×
UNCOV
1666
    if (pNodeEntry == NULL) {
×
1667
      continue;
×
1668
    }
1669

UNCOV
1670
    if (pNodeEntry->stageUpdated) {
×
UNCOV
1671
      mDebug("stream task not ready due to node update detected, checkpoint not issued");
×
UNCOV
1672
      return true;
×
1673
    }
1674
  }
1675

UNCOV
1676
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
×
UNCOV
1677
  if (code) {
×
1678
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
1679
  }
1680

UNCOV
1681
  if (!allReady) {
×
UNCOV
1682
    mWarn("not all vnodes ready, quit from vnodes status check");
×
UNCOV
1683
    return true;
×
1684
  }
1685

UNCOV
1686
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
×
UNCOV
1687
  if (code) {
×
1688
    nodeUpdated = false;
×
1689
  } else {
UNCOV
1690
    nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
×
UNCOV
1691
    if (nodeUpdated) {
×
1692
      mDebug("stream tasks not ready due to node update");
×
1693
    }
1694
  }
1695

UNCOV
1696
  mndDestroyVgroupChangeInfo(&changeInfo);
×
UNCOV
1697
  return nodeUpdated;
×
1698
}
1699

1700
// check if the node update happens or not
UNCOV
1701
bool mndStreamNodeIsUpdated(SMnode *pMnode) {
×
UNCOV
1702
  SArray *pNodeSnapshot = NULL;
×
1703

UNCOV
1704
  streamMutexLock(&execInfo.lock);
×
UNCOV
1705
  bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
×
UNCOV
1706
  streamMutexUnlock(&execInfo.lock);
×
1707

UNCOV
1708
  taosArrayDestroy(pNodeSnapshot);
×
UNCOV
1709
  return updated;
×
1710
}
1711

UNCOV
1712
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
×
UNCOV
1713
  SSdb      *pSdb = pMnode->pSdb;
×
UNCOV
1714
  void      *pIter = NULL;
×
UNCOV
1715
  SSnodeObj *pObj = NULL;
×
1716

UNCOV
1717
  if (pSrcDb->cfg.replications == 1) {
×
UNCOV
1718
    return TSDB_CODE_SUCCESS;
×
1719
  } else {
1720
    while (1) {
UNCOV
1721
      pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
UNCOV
1722
      if (pIter == NULL) {
×
UNCOV
1723
        break;
×
1724
      }
1725

UNCOV
1726
      sdbRelease(pSdb, pObj);
×
UNCOV
1727
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1728
      return TSDB_CODE_SUCCESS;
×
1729
    }
1730

UNCOV
1731
    mError("snode not existed when trying to create stream in db with multiple replica");
×
UNCOV
1732
    return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
1733
  }
1734
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc