• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4069

12 May 2025 05:35AM UTC coverage: 63.048% (+0.5%) from 62.547%
#4069

push

travis-ci

web-flow
Merge pull request #31053 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

157521 of 317858 branches covered (49.56%)

Branch coverage included in aggregate %.

374 of 573 new or added lines in 31 files covered. (65.27%)

4949 existing lines in 87 files now uncovered.

242707 of 316936 relevant lines covered (76.58%)

18229906.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.23
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
struct SStreamTaskIter {
25
  SStreamObj  *pStream;
26
  int32_t      level;
27
  int32_t      ordinalIndex;
28
  int32_t      totalLevel;
29
  SStreamTask *pTask;
30
};
31

32
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
33

34
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter) {
109,948✔
35
  *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
109,948!
36
  if (*pIter == NULL) {
109,955!
37
    return terrno;
×
38
  }
39

40
  (*pIter)->level = -1;
109,955✔
41
  (*pIter)->ordinalIndex = 0;
109,955✔
42
  (*pIter)->pStream = pStream;
109,955✔
43
  (*pIter)->totalLevel = taosArrayGetSize(pStream->pTaskList);
109,955✔
44
  (*pIter)->pTask = NULL;
109,925✔
45

46
  return 0;
109,925✔
47
}
48

49
bool streamTaskIterNextTask(SStreamTaskIter *pIter) {
495,030✔
50
  if (pIter->level >= pIter->totalLevel) {
495,030!
51
    pIter->pTask = NULL;
×
52
    return false;
×
53
  }
54

55
  if (pIter->level == -1) {
495,030✔
56
    pIter->level += 1;
109,928✔
57
  }
58

59
  while (pIter->level < pIter->totalLevel) {
709,729✔
60
    SArray *pList = taosArrayGetP(pIter->pStream->pTaskList, pIter->level);
599,110✔
61
    if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
596,036✔
62
      pIter->level += 1;
214,699✔
63
      pIter->ordinalIndex = 0;
214,699✔
64
      pIter->pTask = NULL;
214,699✔
65
      continue;
214,699✔
66
    }
67

68
    pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
383,130✔
69
    pIter->ordinalIndex += 1;
381,480✔
70
    return true;
381,480✔
71
  }
72

73
  pIter->pTask = NULL;
110,619✔
74
  return false;
110,619✔
75
}
76

77
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask) {
382,035✔
78
  if (pTask) {
382,035!
79
    *pTask = pIter->pTask;
382,070✔
80
    if (*pTask != NULL) {
382,070✔
81
      return TSDB_CODE_SUCCESS;
382,069✔
82
    }
83
  }
84

85
  return TSDB_CODE_INVALID_PARA;
×
86
}
87

88
void destroyStreamTaskIter(SStreamTaskIter *pIter) { taosMemoryFree(pIter); }
109,719!
89

90
static bool checkStatusForEachReplica(SVgObj *pVgroup) {
166,391✔
91
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
337,315✔
92
    if (!pVgroup->vnodeGid[i].syncRestore) {
172,186✔
93
      mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
1,248!
94
      return false;
1,248✔
95
    }
96

97
    ESyncState state = pVgroup->vnodeGid[i].syncState;
170,938✔
98
    if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
170,938!
99
        state == TAOS_SYNC_STATE_CANDIDATE) {
100
      mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
14!
101
            state);
102
      return false;
14✔
103
    }
104
  }
105

106
  return true;
165,129✔
107
}
108

109
static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) {
16,997✔
110
  SSnodeObj *pObj = NULL;
16,997✔
111
  void      *pIter = NULL;
16,997✔
112
  int32_t    code = 0;
16,997✔
113

114
  while (1) {
7,229✔
115
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
24,226✔
116
    if (pIter == NULL) {
24,226✔
117
      break;
16,997✔
118
    }
119

120
    SNodeEntry entry = {.nodeId = SNODE_HANDLE};
7,229✔
121
    code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
7,229✔
122
    if (code) {
7,229!
123
      sdbRelease(pMnode->pSdb, pObj);
×
124
      sdbCancelFetch(pMnode->pSdb, pIter);
×
125
      mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
×
126
      return code;
×
127
    }
128

129
    char buf[256] = {0};
7,229✔
130
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
7,229✔
131
    if (code != 0) {  // print error and continue
7,229!
132
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
133
    }
134

135
    void *p = taosArrayPush(pVgroupList, &entry);
7,229✔
136
    if (p == NULL) {
7,229!
137
      code = terrno;
×
138
      sdbRelease(pMnode->pSdb, pObj);
×
139
      sdbCancelFetch(pMnode->pSdb, pIter);
×
140
      mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
×
141
      return code;
×
142
    } else {
143
      mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
7,229✔
144
    }
145

146
    sdbRelease(pMnode->pSdb, pObj);
7,229✔
147
  }
148

149
  return code;
16,997✔
150
}
151

152
static int32_t mndCheckMnodeStatus(SMnode* pMnode) {
16,997✔
153
  int32_t    code = 0;
16,997✔
154
  ESdbStatus objStatus;
155
  void      *pIter = NULL;
16,997✔
156
  SMnodeObj *pObj = NULL;
16,997✔
157

158
  while (1) {
159
    pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true);
34,636✔
160
    if (pIter == NULL) {
34,636✔
161
      break;
16,834✔
162
    }
163

164
    if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) {
17,802✔
165
      mDebug("mnode sync state:%d not leader/follower", pObj->syncState);
160!
166
      sdbRelease(pMnode->pSdb, pObj);
160✔
167
      sdbCancelFetch(pMnode->pSdb, pIter);
160✔
168
      return TSDB_CODE_FAILED;
160✔
169
    }
170

171
    if (objStatus != SDB_STATUS_READY) {
17,642✔
172
      mWarn("mnode status:%d not ready", objStatus);
3!
173
      sdbRelease(pMnode->pSdb, pObj);
3✔
174
      sdbCancelFetch(pMnode->pSdb, pIter);
3✔
175
      return TSDB_CODE_FAILED;
3✔
176
    }
177

178
    sdbRelease(pMnode->pSdb, pObj);
17,639✔
179
  }
180

181
  return TSDB_CODE_SUCCESS;
16,834✔
182
}
183

184
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady, SHashObj* pTermMap) {
16,997✔
185
  SSdb     *pSdb = pMnode->pSdb;
16,997✔
186
  void     *pIter = NULL;
16,997✔
187
  SVgObj   *pVgroup = NULL;
16,997✔
188
  int32_t   code = 0;
16,997✔
189
  SHashObj *pHash = NULL;
16,997✔
190

191
  pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
16,997✔
192
  if (pHash == NULL) {
16,997!
193
    mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
×
194
    return terrno;
×
195
  }
196

197
  while (1) {
169,750✔
198
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
186,747✔
199
    if (pIter == NULL) {
186,747✔
200
      break;
16,997✔
201
    }
202

203
    SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
169,750✔
204
    entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
169,750✔
205

206
    int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
169,750✔
207
    if (pReplica == NULL) {  // not exist, add it into hash map
169,750✔
208
      code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
82,784✔
209
      if (code) {
82,784!
210
        mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
×
211
        sdbRelease(pSdb, pVgroup);
×
212
        sdbCancelFetch(pSdb, pIter);
×
213
        goto _end;  // take snapshot failed, and not all ready
×
214
      }
215
    } else {
216
      if (*pReplica != pVgroup->replica) {
86,966✔
217
        mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
489!
218
              pVgroup->vgId, pVgroup->replica, *pReplica);
219
        *allReady = false;  // task snap success, but not all ready
489✔
220
      }
221
    }
222

223
    // if not all ready till now, no need to check the remaining vgroups,
224
    // but still we need to put the info of the existed vgroups into the snapshot list
225
    if (*allReady) {
169,750✔
226
      *allReady = checkStatusForEachReplica(pVgroup);
166,391✔
227
    }
228

229
    char buf[256] = {0};
169,750✔
230
    code = epsetToStr(&entry.epset, buf, tListLen(buf));
169,750✔
231
    if (code != 0) {  // print error and continue
169,750!
232
      mError("failed to convert epset to str, code:%s", tstrerror(code));
×
233
    }
234

235
    void *p = taosArrayPush(pVgroupList, &entry);
169,750✔
236
    if (p == NULL) {
169,750!
237
      mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
×
238
      code = terrno;
×
239
      sdbRelease(pSdb, pVgroup);
×
240
      sdbCancelFetch(pSdb, pIter);
×
241
      goto _end;
×
242
    } else {
243
      mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
169,750✔
244
    }
245

246
    if (pTermMap != NULL) {
169,750✔
247
      int64_t term = pVgroup->vnodeGid[0].syncTerm;
129,337✔
248
      code = taosHashPut(pTermMap, &pVgroup->vgId, sizeof(pVgroup->vgId), &term, sizeof(term));
129,337✔
249
      if (code) {
129,337!
250
        mError("failed to put vnode:%d term into hashMap, code:%s", pVgroup->vgId, tstrerror(code));
×
251
      }
252
    }
253

254
    sdbRelease(pSdb, pVgroup);
169,750✔
255
  }
256

257
_end:
16,997✔
258
  taosHashCleanup(pHash);
16,997✔
259
  return code;
16,997✔
260
}
261

262
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList, SHashObj* pTermMap) {
16,997✔
263
  int32_t   code = 0;
16,997✔
264
  SArray   *pVgroupList = NULL;
16,997✔
265

266
  *pList = NULL;
16,997✔
267
  *allReady = true;
16,997✔
268

269
  pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
16,997✔
270
  if (pVgroupList == NULL) {
16,997!
271
    mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
×
272
    code = terrno;
×
273
    goto _err;
×
274
  }
275

276
  // 1. check for all vnodes status
277
  code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady, pTermMap);
16,997✔
278
  if (code) {
16,997!
279
    goto _err;
×
280
  }
281

282
  // 2. add snode info
283
  code = mndAddSnodeInfo(pMnode, pVgroupList);
16,997✔
284
  if (code) {
16,997!
285
    goto _err;
×
286
  }
287

288
  // 3. check for mnode status
289
  code = mndCheckMnodeStatus(pMnode);
16,997✔
290
  if (code != TSDB_CODE_SUCCESS) {
16,997✔
291
    *allReady = false;
163✔
292
  }
293

294
  *pList = pVgroupList;
16,997✔
295
  return code;
16,997✔
296

297
_err:
×
298
  *allReady = false;
×
299
  taosArrayDestroy(pVgroupList);
×
300
  return code;
×
301
}
302

303
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
9,771✔
304
  void *pIter = NULL;
9,771✔
305
  SSdb *pSdb = pMnode->pSdb;
9,771✔
306
  *pStream = NULL;
9,771✔
307

308
  SStreamObj *p = NULL;
9,771✔
309
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
16,412✔
310
    if (p->uid == streamId) {
16,409✔
311
      sdbCancelFetch(pSdb, pIter);
9,768✔
312
      *pStream = p;
9,768✔
313
      return TSDB_CODE_SUCCESS;
9,768✔
314
    }
315
    sdbRelease(pSdb, p);
6,641✔
316
  }
317

318
  return TSDB_CODE_STREAM_TASK_NOT_EXIST;
3✔
319
}
320

321
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
×
322
  STrans *pTrans = mndAcquireTrans(pMnode, transId);
×
323
  if (pTrans != NULL) {
×
324
    mInfo("kill active transId:%d in Db:%s", transId, pDbName);
×
325
    int32_t code = mndKillTrans(pMnode, pTrans);
×
326
    mndReleaseTrans(pMnode, pTrans);
×
327
    if (code) {
×
328
      mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
×
329
    }
330
  } else {
331
    mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
×
332
  }
333
}
×
334

335
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
14,666✔
336
  *hasEpset = false;
14,666✔
337

338
  pEpSet->numOfEps = 0;
14,666✔
339
  if (nodeId == SNODE_HANDLE) {
14,666✔
340
    SSnodeObj *pObj = NULL;
256✔
341
    void      *pIter = NULL;
256✔
342

343
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
256✔
344
    if (pIter != NULL) {
256!
345
      int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
256✔
346
      sdbRelease(pMnode->pSdb, pObj);
256✔
347
      sdbCancelFetch(pMnode->pSdb, pIter);
256✔
348
      if (code) {
256!
349
        *hasEpset = false;
×
350
        mError("failed to set epset");
×
351
      } else {
352
        *hasEpset = true;
256✔
353
      }
354
      return code;
256✔
355
    } else {
356
      mError("failed to acquire snode epset");
×
357
      return TSDB_CODE_INVALID_PARA;
×
358
    }
359
  } else {
360
    SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
14,410✔
361
    if (pVgObj != NULL) {
14,410✔
362
      SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
14,403✔
363
      mndReleaseVgroup(pMnode, pVgObj);
14,403✔
364

365
      epsetAssign(pEpSet, &epset);
14,403✔
366
      *hasEpset = true;
14,403✔
367
      return TSDB_CODE_SUCCESS;
14,403✔
368
    } else {
369
      mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
7!
370
      return TSDB_CODE_SUCCESS;
7✔
371
    }
372
  }
373
}
374

375
int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
×
376
  *pTask = NULL;
×
377

378
  SStreamTask     *p = NULL;
×
379
  SStreamTaskIter *pIter = NULL;
×
380
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
381
  if (code) {
×
382
    mError("failed to create stream task iter:%s", pStream->name);
×
383
    return code;
×
384
  }
385

386
  while (streamTaskIterNextTask(pIter)) {
×
387
    code = streamTaskIterGetCurrent(pIter, &p);
×
388
    if (code) {
×
389
      continue;
×
390
    }
391

392
    if (p->id.taskId == pId->taskId) {
×
393
      destroyStreamTaskIter(pIter);
×
394
      *pTask = p;
×
395
      return 0;
×
396
    }
397
  }
398

399
  destroyStreamTaskIter(pIter);
×
400
  return TSDB_CODE_FAILED;
×
401
}
402

403
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
76,651✔
404
  int32_t num = 0;
76,651✔
405
  for (int32_t i = 0; i < taosArrayGetSize(pStream->pTaskList); ++i) {
226,759✔
406
    SArray *pLevel = taosArrayGetP(pStream->pTaskList, i);
150,096✔
407
    num += taosArrayGetSize(pLevel);
150,119✔
408
  }
409

410
  return num;
76,484✔
411
}
412

413
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
56✔
414
  SSdb   *pSdb = pMnode->pSdb;
56✔
415
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
56✔
416
  if (pDb == NULL) {
56!
417
    TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
×
418
  }
419

420
  int32_t numOfStreams = 0;
56✔
421
  void   *pIter = NULL;
56✔
422
  while (1) {
×
423
    SStreamObj *pStream = NULL;
56✔
424
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
56✔
425
    if (pIter == NULL) break;
56!
426

427
    if (pStream->sourceDbUid == pDb->uid) {
×
428
      numOfStreams++;
×
429
    }
430

431
    sdbRelease(pSdb, pStream);
×
432
  }
433

434
  *pNumOfStreams = numOfStreams;
56✔
435
  mndReleaseDb(pMnode, pDb);
56✔
436
  return 0;
56✔
437
}
438

439
static void freeTaskList(void *param) {
1,234✔
440
  SArray **pList = (SArray **)param;
1,234✔
441
  taosArrayDestroy(*pList);
1,234✔
442
}
1,234✔
443

444
int32_t mndInitExecInfo() {
2,189✔
445
  int32_t code = taosThreadMutexInit(&execInfo.lock, NULL);
2,189✔
446
  if (code) {
2,189!
447
    return code;
×
448
  }
449

450
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
2,189✔
451

452
  execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
2,189✔
453
  execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
2,189✔
454
  execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
2,189✔
455
  execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
2,189✔
456
  execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2,189✔
457
  execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2,189✔
458
  execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
2,189✔
459
  execInfo.pKilledChkptTrans = taosArrayInit(4, sizeof(SStreamTaskResetMsg));
2,189✔
460

461
  if (execInfo.pTaskList == NULL || execInfo.pTaskMap == NULL || execInfo.transMgmt.pDBTrans == NULL ||
2,189!
462
      execInfo.pTransferStateStreams == NULL || execInfo.pChkptStreams == NULL || execInfo.pStreamConsensus == NULL ||
2,189!
463
      execInfo.pNodeList == NULL || execInfo.pKilledChkptTrans == NULL) {
2,189!
464
    mError("failed to initialize the stream runtime env, code:%s", tstrerror(terrno));
×
465
    return terrno;
×
466
  }
467

468
  execInfo.role = NODE_ROLE_UNINIT;
2,189✔
469
  execInfo.switchFromFollower = false;
2,189✔
470

471
  taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
2,189✔
472
  taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
2,189✔
473
  taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
2,189✔
474
  return 0;
2,189✔
475
}
476

477
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
1,313✔
478
  SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
1,313✔
479
  if (pValidList == NULL) {  // not continue
1,313!
480
    return;
×
481
  }
482

483
  int32_t size = taosArrayGetSize(pNodeSnapshot);
1,313✔
484
  int32_t oldSize = taosArrayGetSize(execInfo.pNodeList);
1,313✔
485

486
  for (int32_t i = 0; i < oldSize; ++i) {
8,329✔
487
    SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
7,016✔
488
    if (p == NULL) {
7,016!
489
      continue;
×
490
    }
491

492
    for (int32_t j = 0; j < size; ++j) {
62,662✔
493
      SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
61,687✔
494
      if (pEntry == NULL) {
61,687!
495
        continue;
×
496
      }
497

498
      if (pEntry->nodeId == p->nodeId) {
61,687✔
499
        p->hbTimestamp = pEntry->hbTimestamp;
6,041✔
500

501
        void *px = taosArrayPush(pValidList, p);
6,041✔
502
        if (px == NULL) {
6,041!
503
          mError("failed to put node into list, nodeId:%d", p->nodeId);
×
504
        } else {
505
          mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
6,041✔
506
        }
507
        break;
6,041✔
508
      }
509
    }
510
  }
511

512
  taosArrayDestroy(execInfo.pNodeList);
1,313✔
513
  execInfo.pNodeList = pValidList;
1,313✔
514

515
  mDebug("remain %d valid node entries after clean expired nodes info, prev size:%d",
1,313✔
516
         (int32_t)taosArrayGetSize(pValidList), oldSize);
517
}
518

519
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
6,243✔
520
  void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
6,243✔
521
  if (p == NULL) {
6,243✔
522
    return TSDB_CODE_SUCCESS;
122✔
523
  }
524

525
  int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
6,121✔
526
  if (code) {
6,121!
527
    return code;
×
528
  }
529

530
  for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
14,609!
531
    STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
14,609✔
532
    if (pId == NULL) {
14,609!
533
      continue;
×
534
    }
535

536
    if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
14,609!
537
      taosArrayRemove(pExecNode->pTaskList, k);
6,121✔
538

539
      int32_t num = taosArrayGetSize(pExecNode->pTaskList);
6,121✔
540
      mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
6,121!
541
      break;
6,121✔
542
    }
543
  }
544

545
  return TSDB_CODE_SUCCESS;
6,121✔
546
}
547

548
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo) {
1,313✔
549
  for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
1,319✔
550
    STaskId *pId = taosArrayGet(pTaskIds, i);
6✔
551
    if (pId == NULL) {
6!
552
      continue;
×
553
    }
554

555
    int32_t code = doRemoveTasks(pExecInfo, pId);
6✔
556
    if (code) {
6!
557
      mError("failed to remove task in buffer list, 0x%" PRIx64, pId->taskId);
×
558
    }
559
  }
560
}
1,313✔
561

562
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,187✔
563
  SStreamTaskIter *pIter = NULL;
1,187✔
564
  streamMutexLock(&pExecNode->lock);
1,187✔
565

566
  // 1. remove task entries
567
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,187✔
568
  if (code) {
1,187!
569
    streamMutexUnlock(&pExecNode->lock);
×
570
    mError("failed to create stream task iter:%s", pStream->name);
×
571
    return;
×
572
  }
573

574
  while (streamTaskIterNextTask(pIter)) {
7,424✔
575
    SStreamTask *pTask = NULL;
6,237✔
576
    code = streamTaskIterGetCurrent(pIter, &pTask);
6,237✔
577
    if (code) {
6,237!
578
      continue;
×
579
    }
580

581
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
6,237✔
582
    code = doRemoveTasks(pExecNode, &id);
6,237✔
583
    if (code) {
6,237!
584
      mError("failed to remove task in buffer list, 0x%" PRIx64, id.taskId);
×
585
    }
586
  }
587

588
  if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) {
1,187!
589
    streamMutexUnlock(&pExecNode->lock);
×
590
    destroyStreamTaskIter(pIter);
×
591
    mError("task map size, task list size, not equal");
×
592
    return;
×
593
  }
594

595
  // 2. remove stream entry in consensus hash table and checkpoint-report hash table
596
  code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
1,187✔
597
  if (code) {
1,187!
598
    mError("failed to clear consensus checkpointId, code:%s", tstrerror(code));
×
599
  }
600

601
  code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
1,187✔
602
  if (code) {
1,187✔
603
    mError("failed to clear the checkpoint report info, code:%s", tstrerror(code));
365!
604
  }
605

606
  streamMutexUnlock(&pExecNode->lock);
1,187✔
607
  destroyStreamTaskIter(pIter);
1,187✔
608
}
609

610
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
13,565✔
611
  size_t num = taosArrayGetSize(pList);
13,565✔
612

613
  for (int32_t i = 0; i < num; ++i) {
75,325✔
614
    SNodeEntry *pEntry = taosArrayGet(pList, i);
75,319✔
615
    if (pEntry == NULL) {
75,319!
616
      continue;
×
617
    }
618

619
    if (pEntry->nodeId == nodeId) {
75,319✔
620
      return true;
13,559✔
621
    }
622
  }
623

624
  return false;
6✔
625
}
626

627
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
1,313✔
628
  SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
1,313✔
629
  if (pRemovedTasks == NULL) {
1,313!
630
    return terrno;
×
631
  }
632

633
  int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
1,313✔
634
  for (int32_t i = 0; i < numOfTask; ++i) {
15,576✔
635
    STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
14,263✔
636
    if (pId == NULL) {
14,263!
637
      continue;
×
638
    }
639

640
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
14,263✔
641
    if (pEntry == NULL) {
14,263!
642
      continue;
×
643
    }
644

645
    if (pEntry->nodeId == SNODE_HANDLE) {
14,263✔
646
      continue;
698✔
647
    }
648

649
    bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
13,565✔
650
    if (!existed) {
13,565✔
651
      void *p = taosArrayPush(pRemovedTasks, pId);
6✔
652
      if (p == NULL) {
6!
653
        mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
×
654
      }
655
    }
656
  }
657

658
  removeTasksInBuf(pRemovedTasks, &execInfo);
1,313✔
659

660
  mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
1,313✔
661
         (int32_t)taosArrayGetSize(execInfo.pTaskList));
662

663
  removeExpiredNodeInfo(pNodeSnapshot);
1,313✔
664

665
  taosArrayDestroy(pRemovedTasks);
1,313✔
666
  return 0;
1,313✔
667
}
668

669
static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) {
860✔
670
  int64_t checkpointId = -1;
860✔
671
  int32_t transId = -1;
860✔
672
  int32_t taskId = -1;
860✔
673

674
  int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList);
860✔
675
  if (existed != numOfTasks) {
860!
676
    mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName,
×
677
           existed, numOfTasks, numOfTasks - existed);
678
    return -1;
×
679
  }
680

681
  // acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList
682
  for(int32_t i = 0; i < numOfTasks; ++i) {
4,873✔
683
    STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i);
4,013✔
684
    if (pInfo == NULL) {
4,013!
685
      continue;
×
686
    }
687

688
    if (checkpointId == -1) {
4,013✔
689
      checkpointId = pInfo->checkpointId;
860✔
690
      transId = pInfo->transId;
860✔
691
      taskId = pInfo->taskId;
860✔
692
    } else if (checkpointId != pInfo->checkpointId) {
3,153!
693
      mError("stream:0x%" PRIx64
×
694
             " checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64
695
             ", type 2 taskId:0x%x checkpointId:%" PRId64,
696
             pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId);
697
      return -1;
×
698
    }
699
  }
700

701
  // check for the correct checkpointId for current task info in STaskChkptInfo
702
  STaskChkptInfo  *p = taosArrayGet(pReportInfo->pTaskList, 0);
860✔
703
  STaskId id = {.streamId = p->streamId, .taskId = p->taskId};
860✔
704
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
860✔
705

706
  // cross-check failed, there must be something unknown wrong
707
  SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId));
860✔
708
  if (pTransInfo == NULL) {
860✔
709
    mWarn("stream:0x%" PRIx64 " no active trans exists for checkpoint transId:%d, it may have been cleared already",
176!
710
           id.streamId, transId);
711

712
    if (pe->checkpointInfo.activeId != 0 && pe->checkpointInfo.activeId != checkpointId) {
176!
713
      mWarn("stream:0x%" PRIx64 " active checkpointId is not equalled to the required, current:%" PRId64
×
714
            ", req:%" PRId64 " recheck next time",
715
            id.streamId, pe->checkpointInfo.activeId, checkpointId);
716
      return -1;
×
717
    } else {
718
      //  do nothing
719
    }
720
  } else {
721
    if (pTransInfo->transId != transId) {
684!
722
      mError("stream:0x%" PRIx64
×
723
             " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time",
724
             id.streamId, pTransInfo->transId, transId);
725
      return -1;
×
726
    }
727
  }
728

729
  mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId,
860✔
730
         pName, numOfTasks);
731

732
  return TSDB_CODE_SUCCESS;
860✔
733
}
734

735
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
3,017✔
736
  SMnode *pMnode = pReq->info.node;
3,017✔
737
  void   *pIter = NULL;
3,017✔
738
  int32_t code = 0;
3,017✔
739
  int32_t lino = 0;
3,017✔
740
  SArray *pDropped = NULL;
3,017✔
741
  int64_t ts = 0;
3,017✔
742

743
  mDebug("start to scan checkpoint report info");
3,017✔
744

745
  streamMutexLock(&execInfo.lock);
3,017✔
746

747
  int32_t num = taosHashGetSize(execInfo.pChkptStreams);
3,017✔
748
  if (num == 0) {
3,017✔
749
    goto _end;
576✔
750
  }
751

752
  pDropped = taosArrayInit(4, sizeof(int64_t));
2,441✔
753
  TSDB_CHECK_NULL(pDropped, code, lino, _end, terrno);
2,441!
754

755
  while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
7,097✔
756
    SChkptReportInfo *px = (SChkptReportInfo *)pIter;
5,516✔
757
    if (taosArrayGetSize(px->pTaskList) == 0) {
5,516✔
758
      continue;
4,656✔
759
    }
760

761
    STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
860✔
762
    if (pInfo == NULL) {
860!
763
      continue;
×
764
    }
765

766
    SStreamObj *pStream = NULL;
860✔
767
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
860✔
768
    if (pStream == NULL || code != 0) {
860!
769
      mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
×
770
      void *p = taosArrayPush(pDropped, &pInfo->streamId);
×
771
      if (p == NULL) {
×
772
        mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
×
773
      }
774
      continue;
×
775
    }
776

777
    int32_t total = mndGetNumOfStreamTasks(pStream);
860✔
778
    int32_t ret = allTasksSendChkptReport(px, total, pStream->name);
860✔
779
    if (ret == 0) {
860!
780
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
860✔
781
      if (code == 0) {
860!
782
        code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
860✔
783
        if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {  // remove this entry
860!
784
          taosArrayClear(px->pTaskList);
860✔
785
          mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64
860!
786
                " to %" PRId64,
787
                pInfo->streamId, px->reportChkpt, pInfo->checkpointId);
788
          px->reportChkpt = pInfo->checkpointId;
860✔
789
        } else {
790
          mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet",
×
791
                 pInfo->streamId);
792
        }
793

794
        sdbRelease(pMnode->pSdb, pStream);
860✔
795
        break;
860✔
796
      } else {
797
        mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
×
798
      }
799
    }
800

801
    sdbRelease(pMnode->pSdb, pStream);
×
802
  }
803

804
  int32_t size = taosArrayGetSize(pDropped);
2,441✔
805
  if (size > 0) {
2,441!
806
    for (int32_t i = 0; i < size; ++i) {
×
807
      int64_t *pStreamId = (int64_t *)taosArrayGet(pDropped, i);
×
808
      if (pStreamId == NULL) {
×
809
        continue;
×
810
      }
811

812
      code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId));
×
813
      if (code) {
×
814
        mError("failed to remove stream in buf:0x%" PRIx64, *pStreamId);
×
815
      }
816
    }
817

818
    int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
×
819
    mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
×
820
  }
821

822
_end:
3,017✔
823

824
  ts = taosGetTimestampMs();
3,017✔
825
  execInfo.chkptReportScanTs = ts;
3,017✔
826

827
  streamMutexUnlock(&execInfo.lock);
3,017✔
828

829
  if (pDropped != NULL) {
3,017✔
830
    taosArrayDestroy(pDropped);
2,441✔
831
  }
832

833
  mDebug("end to scan checkpoint report info, ts:%"PRId64, ts);
3,017✔
834
  return code;
3,017✔
835
}
836

837
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, SArray* pList) {
31✔
838
  char    msg[128] = {0};
31✔
839
  STrans *pTrans = NULL;
31✔
840

841
  snprintf(msg, tListLen(msg), "set consen-chkpt-id for stream:0x%" PRIx64, pStream->uid);
31✔
842

843
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
31✔
844
  if (pTrans == NULL || code != 0) {
31!
845
    return terrno;
×
846
  }
847

848
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
31✔
849
  if (code) {
31!
850
    return code;
×
851
  }
852

853
  code = mndStreamSetChkptIdAction(pMnode, pTrans, pStream, checkpointId, pList);
31✔
854
  if (code != 0) {
31!
855
    mndTransDrop(pTrans);
×
856
    return code;
×
857
  }
858

859
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
31✔
860
  if (code) {
31!
861
    mndTransDrop(pTrans);
×
862
    return code;
×
863
  }
864

865
  code = mndTransPrepare(pMnode, pTrans);
31✔
866

867
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
31!
868
    mError("trans:%d, failed to prepare set consensus-chkptId trans for stream:0x%" PRId64 " since %s", pTrans->id,
×
869
           pStream->uid, tstrerror(code));
870
    mndTransDrop(pTrans);
×
871
    return code;
×
872
  }
873

874
  mndTransDrop(pTrans);
31✔
875
  return TSDB_CODE_ACTION_IN_PROGRESS;
31✔
876
}
877

878
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
225✔
879
  *pInfo = NULL;
225✔
880

881
  void *px = taosHashGet(pHash, &streamId, sizeof(streamId));
225✔
882
  if (px != NULL) {
225✔
883
    *pInfo = px;
185✔
884
    return 0;
185✔
885
  }
886

887
  SCheckpointConsensusInfo p = {
40✔
888
      .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
40✔
889
      .numOfTasks = numOfTasks,
890
      .streamId = streamId,
891
  };
892

893
  if (p.pTaskList == NULL) {
40!
894
    return terrno;
×
895
  }
896

897
  int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
40✔
898
  if (code == 0) {
40!
899
    void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId));
40✔
900
    *pInfo = pChkptInfo;
40✔
901
  } else {
902
    *pInfo = NULL;
×
903
  }
904

905
  return code;
40✔
906
}
907

908
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
909
// discard the msg may lead to the lost of connections.
910
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) {
225✔
911
  SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
225✔
912
  memcpy(&info.req, pRestoreInfo, sizeof(info.req));
225✔
913

914
  int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList);
225✔
915
  for (int32_t i = 0; i < num; ++i) {
815✔
916
    SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
612✔
917
    if (p == NULL) {
612!
918
      continue;
×
919
    }
920

921
    if (p->req.taskId == info.req.taskId) {
612✔
922
      mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update send reqTs %" PRId64
22✔
923
             "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " term:%d->%d total existed:%d",
924
             pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId,
925
             info.req.checkpointId, p->req.term, info.req.term, num);
926
      p->req.startTs = info.req.startTs;
22✔
927
      p->req.checkpointId = info.req.checkpointId;
22✔
928
      p->req.transId = info.req.transId;
22✔
929
      p->req.nodeId = info.req.nodeId;
22✔
930
      p->req.term = info.req.term;
22✔
931
      return;
22✔
932
    }
933
  }
934

935
  void *p = taosArrayPush(pInfo->pTaskList, &info);
203✔
936
  if (p == NULL) {
203!
937
    mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
×
938
  } else {
939
    num = taosArrayGetSize(pInfo->pTaskList);
203✔
940
    mDebug("s-task:0x%x (vgId:%d) checkpointId:%" PRId64 " term:%d, reqTs:%" PRId64
203✔
941
           " added into consensus-checkpointId list, stream:0x%" PRIx64 " waiting tasks:%d",
942
           pRestoreInfo->taskId, pRestoreInfo->nodeId, pRestoreInfo->checkpointId, info.req.term,
943
           info.req.startTs, pRestoreInfo->streamId, num);
944
  }
945
}
946

947
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) {
31✔
948
  taosArrayDestroy(pInfo->pTaskList);
31✔
949
  pInfo->pTaskList = NULL;
31✔
950
}
31✔
951

952
int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
1,223✔
953
  int32_t code = 0;
1,223✔
954
  int32_t numOfStreams = taosHashGetSize(pHash);
1,223✔
955
  if (numOfStreams == 0) {
1,223✔
956
    return code;
1,190✔
957
  }
958

959
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
33✔
960
  if (code == 0) {
33!
961
    numOfStreams = taosHashGetSize(pHash);
33✔
962
    mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
33✔
963
  } else {
964
    mError("failed to remove stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
×
965
  }
966

967
  return code;
33✔
968
}
969

970
int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
1,187✔
971
  int32_t code = 0;
1,187✔
972
  int32_t numOfStreams = taosHashGetSize(pHash);
1,187✔
973
  if (numOfStreams == 0) {
1,187✔
974
    return code;
338✔
975
  }
976

977
  code = taosHashRemove(pHash, &streamId, sizeof(streamId));
849✔
978
  if (code == 0) {
849✔
979
    mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
484✔
980
  } else {
981
    mError("failed to remove stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
365!
982
  }
983

984
  return code;
849✔
985
}
986

987
int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId) {
×
988
  SChkptReportInfo *pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
×
989
  if (pInfo != NULL) {
×
990
    taosArrayClear(pInfo->pTaskList);
×
991
    mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
×
992
           pInfo->reportChkpt);
993
    return 0;
×
994
  }
995

996
  return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
997
}
998

999
static void mndShowStreamStatus(char *dst, int8_t status) {
35,408✔
1000
  if (status == STREAM_STATUS__NORMAL) {
35,408✔
1001
    tstrncpy(dst, "ready", MND_STREAM_TRIGGER_NAME_SIZE);
35,403✔
1002
  } else if (status == STREAM_STATUS__STOP) {
5!
1003
    tstrncpy(dst, "stop", MND_STREAM_TRIGGER_NAME_SIZE);
×
1004
  } else if (status == STREAM_STATUS__FAILED) {
5✔
1005
    tstrncpy(dst, "failed", MND_STREAM_TRIGGER_NAME_SIZE);
1✔
1006
  } else if (status == STREAM_STATUS__RECOVER) {
4!
1007
    tstrncpy(dst, "recover", MND_STREAM_TRIGGER_NAME_SIZE);
×
1008
  } else if (status == STREAM_STATUS__PAUSE) {
4✔
1009
    tstrncpy(dst, "paused", MND_STREAM_TRIGGER_NAME_SIZE);
2✔
1010
  } else if (status == STREAM_STATUS__INIT) {
2!
1011
    tstrncpy(dst, "init", MND_STREAM_TRIGGER_NAME_SIZE);
2✔
1012
  }
1013
}
35,408✔
1014

1015
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
35,279✔
1016
  int8_t trigger = pStream->conf.trigger;
35,279✔
1017
  if (trigger == STREAM_TRIGGER_AT_ONCE) {
35,279✔
1018
    tstrncpy(dst, "at once", MND_STREAM_TRIGGER_NAME_SIZE);
12,055✔
1019
  } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
23,224✔
1020
    tstrncpy(dst, "window close", MND_STREAM_TRIGGER_NAME_SIZE);
11,630✔
1021
  } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
11,594✔
1022
    tstrncpy(dst, "max delay", MND_STREAM_TRIGGER_NAME_SIZE);
11,542✔
1023
  } else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
52!
1024
    tstrncpy(dst, "force window close", MND_STREAM_TRIGGER_NAME_SIZE);
84✔
1025
  }
1026
}
35,279✔
1027

1028
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
266,232✔
1029
  memset(pBuf, 0, bufLen);
266,232✔
1030
  pBuf[2] = '0';
266,232✔
1031
  pBuf[3] = 'x';
266,232✔
1032

1033
  int32_t len = tintToHex(id, &pBuf[4]);
266,232✔
1034
  varDataSetLen(pBuf, len + 2);
266,929✔
1035
}
266,929✔
1036

1037
static int32_t isAllTaskPaused(SStreamObj *pStream, bool *pRes) {
35,335✔
1038
  int32_t          code = TSDB_CODE_SUCCESS;
35,335✔
1039
  int32_t          lino = 0;
35,335✔
1040
  SStreamTaskIter *pIter = NULL;
35,335✔
1041
  bool             isPaused = true;
35,335✔
1042
  int32_t          num = 0;
35,335✔
1043

1044
  taosRLockLatch(&pStream->lock);
35,335✔
1045
  code = createStreamTaskIter(pStream, &pIter);
35,416✔
1046
  TSDB_CHECK_CODE(code, lino, _end);
35,388!
1047

1048
  while (streamTaskIterNextTask(pIter)) {
154,922✔
1049
    SStreamTask *pTask = NULL;
116,462✔
1050
    code = streamTaskIterGetCurrent(pIter, &pTask);
116,462✔
1051
    TSDB_CHECK_CODE(code, lino, _end);
117,474!
1052

1053
    STaskId           id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
117,474✔
1054
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
117,474✔
1055
    if (pe == NULL) {
119,442✔
1056
      continue;
122✔
1057
    }
1058

1059
    if (pe->status != TASK_STATUS__PAUSE) {
119,320✔
1060
      isPaused = false;
119,319✔
1061
      mInfo("stream:0x%" PRIx64 " taskId:0x%" PRIx64 ", status:%d not paused, stream status not paused",
119,319!
1062
            pe->id.streamId, pe->id.taskId, pe->status);
1063
    } else {
1064
      mInfo("stream:0x%" PRIx64 " taskId:0x%" PRIx64 ", status: paused, stream status paused", pe->id.streamId,
1!
1065
            pe->id.taskId);
1066
    }
1067

1068
    num += 1;
119,412✔
1069
  }
1070

1071
  if (num > 0) {
35,367✔
1072
    (*pRes) = isPaused;
35,344✔
1073
  }
1074

1075
_end:
23✔
1076
  destroyStreamTaskIter(pIter);
35,367✔
1077
  taosRUnLockLatch(&pStream->lock);
35,413✔
1078
  if (code != TSDB_CODE_SUCCESS) {
35,415!
UNCOV
1079
    mError("error happens when get stream status, lino:%d, code:%s", lino, tstrerror(code));
×
1080
  }
1081
  return code;
35,415✔
1082
}
1083

1084
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
35,399✔
1085
  int32_t code = 0;
35,399✔
1086
  int32_t cols = 0;
35,399✔
1087
  int32_t lino = 0;
35,399✔
1088

1089
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
35,399✔
1090
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
35,399✔
1091
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,392✔
1092
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,307!
1093

1094
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
35,307✔
1095
  TSDB_CHECK_CODE(code, lino, _end);
35,345!
1096

1097
  // create time
1098
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,345✔
1099
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,288!
1100
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
35,288✔
1101
  TSDB_CHECK_CODE(code, lino, _end);
35,306!
1102

1103
  // stream id
1104
  char buf[128] = {0};
35,306✔
1105
  int64ToHexStr(pStream->uid, buf, tListLen(buf));
35,306✔
1106
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,400✔
1107
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,335!
1108
  code = colDataSetVal(pColInfo, numOfRows, buf, false);
35,335✔
1109
  TSDB_CHECK_CODE(code, lino, _end);
35,358!
1110

1111
  // related fill-history stream id
1112
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,358✔
1113
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,296!
1114
  if (pStream->hTaskUid != 0) {
35,296!
UNCOV
1115
    int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
×
UNCOV
1116
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1117
  } else {
1118
    code = colDataSetVal(pColInfo, numOfRows, buf, true);
35,296✔
1119
  }
1120
  TSDB_CHECK_CODE(code, lino, _end);
35,314!
1121

1122
  // related fill-history stream id
1123
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
35,314✔
1124
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
35,314✔
1125
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,314✔
1126
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,306!
1127
  code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
35,306✔
1128
  TSDB_CHECK_CODE(code, lino, _end);
35,336!
1129

1130
  char status[20 + VARSTR_HEADER_SIZE] = {0};
35,336✔
1131
  char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
35,336✔
1132
  bool isPaused = false;
35,336✔
1133
  code = isAllTaskPaused(pStream, &isPaused);
35,336✔
1134
  TSDB_CHECK_CODE(code, lino, _end);
35,414!
1135

1136
  int8_t streamStatus = atomic_load_8(&pStream->status);
35,414✔
1137
  if (isPaused && pStream->pTaskList != NULL) {
35,410!
1138
    streamStatus = STREAM_STATUS__PAUSE;
2✔
1139
  }
1140
  mndShowStreamStatus(status2, streamStatus);
35,410✔
1141
  STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
35,404✔
1142
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,404✔
1143
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,358!
1144

1145
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
35,358✔
1146
  TSDB_CHECK_CODE(code, lino, _end);
35,370!
1147

1148
  char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
35,370✔
1149
  STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
35,370✔
1150
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,374✔
1151
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,327!
1152

1153
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
35,327✔
1154
  TSDB_CHECK_CODE(code, lino, _end);
35,372!
1155

1156
  char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
35,372✔
1157
  STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
35,372✔
1158
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,373✔
1159
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,325!
1160

1161
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
35,325✔
1162
  TSDB_CHECK_CODE(code, lino, _end);
35,356!
1163

1164
  if (pStream->targetSTbName[0] == 0) {
35,356!
UNCOV
1165
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1166
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1167

UNCOV
1168
    code = colDataSetVal(pColInfo, numOfRows, NULL, true);
×
1169
  } else {
1170
    char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
35,356✔
1171
    STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
35,356✔
1172
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,387✔
1173
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,326!
1174

1175
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
35,326✔
1176
  }
1177
  TSDB_CHECK_CODE(code, lino, _end);
35,325!
1178

1179
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,325✔
1180
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,258!
1181

1182
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
35,258✔
1183
  TSDB_CHECK_CODE(code, lino, _end);
35,263!
1184

1185
  char trigger[20 + VARSTR_HEADER_SIZE] = {0};
35,263✔
1186
  char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
35,263✔
1187
  mndShowStreamTrigger(trigger2, pStream);
35,263✔
1188
  STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
35,346✔
1189
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,346✔
1190
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,320!
1191

1192
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
35,320✔
1193
  TSDB_CHECK_CODE(code, lino, _end);
35,347!
1194

1195
  // sink_quota
1196
  char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
35,347✔
1197
  sinkQuota[0] = '0';
35,347✔
1198
  char dstStr[20] = {0};
35,347✔
1199
  STR_TO_VARSTR(dstStr, sinkQuota)
35,347✔
1200
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,347✔
1201
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,338!
1202

1203
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
35,338✔
1204
  TSDB_CHECK_CODE(code, lino, _end);
35,337!
1205

1206
  // checkpoint interval
1207
  char tmp[20 + VARSTR_HEADER_SIZE] = {0};
35,337✔
1208
  (void)tsnprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%d sec", tsStreamCheckpointInterval);
35,337✔
1209
  varDataSetLen(tmp, strlen(varDataVal(tmp)));
35,411✔
1210

1211
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,411✔
1212
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,321!
1213

1214
  code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
35,321✔
1215
  TSDB_CHECK_CODE(code, lino, _end);
35,347!
1216

1217
  // checkpoint backup type
1218
  char backup[20 + VARSTR_HEADER_SIZE] = {0};
35,347✔
1219
  STR_TO_VARSTR(backup, "none")
35,347✔
1220
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,347✔
1221
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,274!
1222

1223
  code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
35,274✔
1224
  TSDB_CHECK_CODE(code, lino, _end);
35,342!
1225

1226
  // history scan idle
1227
  char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
35,342✔
1228
  tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
35,342✔
1229

1230
  memset(dstStr, 0, tListLen(dstStr));
35,342✔
1231
  STR_TO_VARSTR(dstStr, scanHistoryIdle)
35,342✔
1232
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,342✔
1233
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,315!
1234

1235
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
35,315✔
1236
  TSDB_CHECK_CODE(code, lino, _end);
35,338!
1237

1238
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
35,338✔
1239
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
35,258!
1240
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
35,358✔
1241
  if (streamStatus == STREAM_STATUS__FAILED){
35,358✔
1242
    STR_TO_VARSTR(msg, pStream->reserve)
1✔
1243
  } else {
1244
    STR_TO_VARSTR(msg, " ")
35,357✔
1245
  }
1246
  code = colDataSetVal(pColInfo, numOfRows, (const char *)msg, false);
35,358✔
1247

1248
_end:
35,350✔
1249
  if (code) {
35,350!
UNCOV
1250
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1251
  }
1252
  return code;
35,350✔
1253
}
1254

1255
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows,
229,820✔
1256
                              int32_t precision) {
1257
  SColumnInfoData *pColInfo = NULL;
229,820✔
1258
  int32_t          cols = 0;
229,820✔
1259
  int32_t          code = 0;
229,820✔
1260
  int32_t          lino = 0;
229,820✔
1261

1262
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
229,820✔
1263

1264
  STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
229,820✔
1265
  if (pe == NULL) {
230,676!
UNCOV
1266
    mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
×
1267
           " no valid status/stage info",
1268
           id.taskId, pStream->name, pStream->uid, pStream->createTime);
UNCOV
1269
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1270
  }
1271

1272
  // stream name
1273
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
230,676✔
1274
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
230,676✔
1275

1276
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
230,685✔
1277
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,956!
1278

1279
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
229,956✔
1280
  TSDB_CHECK_CODE(code, lino, _end);
229,941!
1281

1282
  // task id
1283
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,941✔
1284
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,465!
1285

1286
  char idstr[128] = {0};
229,465✔
1287
  int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
229,465✔
1288
  code = colDataSetVal(pColInfo, numOfRows, idstr, false);
230,732✔
1289
  TSDB_CHECK_CODE(code, lino, _end);
229,998!
1290

1291
  // node type
1292
  char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
229,998✔
1293
  varDataSetLen(nodeType, 5);
229,998✔
1294
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,998✔
1295
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,503!
1296

1297
  if (pTask->info.nodeId > 0) {
229,635✔
1298
    memcpy(varDataVal(nodeType), "vnode", 5);
211,560✔
1299
  } else {
1300
    memcpy(varDataVal(nodeType), "snode", 5);
18,075✔
1301
  }
1302
  code = colDataSetVal(pColInfo, numOfRows, nodeType, false);
229,635✔
1303
  TSDB_CHECK_CODE(code, lino, _end);
229,807!
1304

1305
  // node id
1306
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,807✔
1307
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,366!
1308

1309
  int64_t nodeId = TMAX(pTask->info.nodeId, 0);
229,366✔
1310
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
229,366✔
1311
  TSDB_CHECK_CODE(code, lino, _end);
229,310!
1312

1313
  // level
1314
  char level[20 + VARSTR_HEADER_SIZE] = {0};
229,310✔
1315
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
229,310✔
1316
    STR_WITH_SIZE_TO_VARSTR(level, "source", 6);
118,313✔
1317
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
110,997✔
1318
    STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
19,567✔
1319
  } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
91,430!
1320
    STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
92,179✔
UNCOV
1321
  } else if (pTask->info.taskLevel == TASK_LEVEL__MERGE) {
×
UNCOV
1322
    STR_WITH_SIZE_TO_VARSTR(level, "merge", 5);
×
1323
  }
1324

1325
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,310✔
1326
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,186!
1327

1328
  code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
229,186✔
1329
  TSDB_CHECK_CODE(code, lino, _end);
229,826!
1330

1331
  // status
1332
  char status[20 + VARSTR_HEADER_SIZE] = {0};
229,826✔
1333

1334
  const char *pStatus = streamTaskGetStatusStr(pe->status);
229,826✔
1335
  STR_TO_VARSTR(status, pStatus);
229,986✔
1336

1337
  // status
1338
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,986✔
1339
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,771!
1340

1341
  code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
229,771✔
1342
  TSDB_CHECK_CODE(code, lino, _end);
229,745!
1343

1344
  // stage
1345
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,745✔
1346
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,338!
1347

1348
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
229,338✔
1349
  TSDB_CHECK_CODE(code, lino, _end);
229,312!
1350

1351
  // input queue
1352
  char        vbuf[TSDB_STREAM_NOTIFY_STAT_LEN + 2] = {0};
229,312✔
1353
  char        buf[TSDB_STREAM_NOTIFY_STAT_LEN] = {0};
229,312✔
1354
  const char *queueInfoStr = "%4.2f MiB (%6.2f%)";
229,312✔
1355
  snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate);
229,312✔
1356
  STR_TO_VARSTR(vbuf, buf);
229,312✔
1357

1358
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,312✔
1359
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,999!
1360

1361
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
229,999✔
1362
  TSDB_CHECK_CODE(code, lino, _end);
229,875!
1363

1364
  // input total
1365
  const char *formatTotalMb = "%7.2f MiB";
229,875✔
1366
  const char *formatTotalGb = "%7.2f GiB";
229,875✔
1367
  if (pe->procsTotal < 1024) {
229,875!
1368
    snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal);
229,901✔
1369
  } else {
UNCOV
1370
    snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024);
×
1371
  }
1372

1373
  memset(vbuf, 0, tListLen(vbuf));
229,875✔
1374
  STR_TO_VARSTR(vbuf, buf);
229,875✔
1375

1376
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,875✔
1377
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
230,033!
1378

1379
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
230,033✔
1380
  TSDB_CHECK_CODE(code, lino, _end);
229,904!
1381

1382
  // process throughput
1383
  const char *formatKb = "%7.2f KiB/s";
229,904✔
1384
  const char *formatMb = "%7.2f MiB/s";
229,904✔
1385
  if (pe->procsThroughput < 1024) {
229,904✔
1386
    snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput);
229,802✔
1387
  } else {
1388
    snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024);
102✔
1389
  }
1390

1391
  memset(vbuf, 0, tListLen(vbuf));
229,904✔
1392
  STR_TO_VARSTR(vbuf, buf);
229,904✔
1393

1394
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,904✔
1395
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
230,051!
1396

1397
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
230,051✔
1398
  TSDB_CHECK_CODE(code, lino, _end);
229,823!
1399

1400
  // output total
1401
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,823✔
1402
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,331!
1403

1404
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
229,886✔
1405
    colDataSetNULL(pColInfo, numOfRows);
92,120!
1406
  } else {
1407
    (void)tsnprintf(buf, sizeof(buf), formatTotalMb, pe->outputTotal);
137,766✔
1408
    memset(vbuf, 0, tListLen(vbuf));
138,366✔
1409
    STR_TO_VARSTR(vbuf, buf);
138,366✔
1410

1411
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
138,366✔
1412
    TSDB_CHECK_CODE(code, lino, _end);
138,051!
1413
  }
1414

1415
  // output throughput
1416
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
230,171✔
1417
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,423!
1418

1419
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
229,923✔
1420
    colDataSetNULL(pColInfo, numOfRows);
92,099!
1421
  } else {
1422
    if (pe->outputThroughput < 1024) {
137,824✔
1423
      snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput);
137,677✔
1424
    } else {
1425
      snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024);
147✔
1426
    }
1427

1428
    memset(vbuf, 0, tListLen(vbuf));
137,824✔
1429
    STR_TO_VARSTR(vbuf, buf);
137,824✔
1430

1431
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
137,824✔
1432
    TSDB_CHECK_CODE(code, lino, _end);
138,032!
1433
  }
1434
  // info
1435
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
230,131✔
1436
    const char *sinkStr = "%.2f MiB";
92,106✔
1437
    snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
92,106✔
1438
  } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {  // offset info
138,025✔
1439
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
118,522✔
1440
      int32_t ret = taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision);
8,262✔
1441
      if (ret != 0) {
8,262!
UNCOV
1442
        mError("failed to format processed timewindow, skey:%" PRId64, pe->processedVer);
×
UNCOV
1443
        memset(buf, 0, tListLen(buf));
×
1444
      }
1445
    } else {
1446
      const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
110,260✔
1447
      snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
110,260✔
1448
    }
1449
  } else {
1450
    memset(buf, 0, tListLen(buf));
19,503✔
1451
  }
1452

1453
  STR_TO_VARSTR(vbuf, buf);
230,131✔
1454

1455
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
230,131✔
1456
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,983!
1457

1458
  code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
229,983✔
1459
  TSDB_CHECK_CODE(code, lino, _end);
229,760!
1460

1461
  // start_time
1462
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,760✔
1463
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
229,256!
1464

1465
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false);
229,256✔
1466
  TSDB_CHECK_CODE(code, lino, _end);
229,148!
1467

1468
  // start id
1469
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,148✔
1470
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,743!
1471

1472
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false);
228,743✔
1473
  TSDB_CHECK_CODE(code, lino, _end);
229,040!
1474

1475
  // start ver
1476
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,040✔
1477
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,650!
1478

1479
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false);
228,650✔
1480
  TSDB_CHECK_CODE(code, lino, _end);
228,890!
1481

1482
  // checkpoint time
1483
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
228,890✔
1484
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,577!
1485

1486
  if (pe->checkpointInfo.latestTime != 0) {
228,585✔
1487
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false);
180,548✔
1488
  } else {
1489
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
48,037✔
1490
  }
1491
  TSDB_CHECK_CODE(code, lino, _end);
228,967!
1492

1493
  // checkpoint_id
1494
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
228,967✔
1495
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,617!
1496

1497
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false);
228,617✔
1498
  TSDB_CHECK_CODE(code, lino, _end);
228,940!
1499

1500
  // checkpoint version
1501
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
228,940✔
1502
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,614!
1503

1504
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false);
228,614✔
1505
  TSDB_CHECK_CODE(code, lino, _end);
228,871!
1506

1507
  // checkpoint size
1508
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
228,871✔
1509
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,433✔
1510

1511
  colDataSetNULL(pColInfo, numOfRows);
228,431!
1512

1513
  // checkpoint backup status
1514
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
228,431✔
1515
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,465!
1516

1517
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
228,465✔
1518
  TSDB_CHECK_CODE(code, lino, _end);
229,159!
1519

1520
  // ds_err_info
1521
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,159✔
1522
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,867!
1523

1524
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
228,867✔
1525
  TSDB_CHECK_CODE(code, lino, _end);
229,293!
1526

1527
  // history_task_id
1528
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,293✔
1529
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,619!
1530

1531
  if (pe->hTaskId != 0) {
228,674✔
1532
    int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
784✔
1533
    code = colDataSetVal(pColInfo, numOfRows, idstr, false);
784✔
1534
  } else {
1535
    code = colDataSetVal(pColInfo, numOfRows, 0, true);
227,890✔
1536
  }
1537
  TSDB_CHECK_CODE(code, lino, _end);
229,313!
1538

1539
  // history_task_status
1540
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,313✔
1541
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
228,801!
1542

1543
  code = colDataSetVal(pColInfo, numOfRows, 0, true);
228,801✔
1544
  TSDB_CHECK_CODE(code, lino, _end);
229,590!
1545

1546
  // notify_event_stat
1547
  int32_t offset =0;
229,590✔
1548
  if (pe->notifyEventStat.notifyEventAddTimes > 0) {
229,590!
UNCOV
1549
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Add %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1550
                        pe->notifyEventStat.notifyEventAddTimes, pe->notifyEventStat.notifyEventAddElems,
1551
                        pe->notifyEventStat.notifyEventAddCostSec);
1552
  }
1553
  if (pe->notifyEventStat.notifyEventPushTimes > 0) {
229,590!
UNCOV
1554
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Push %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1555
                        pe->notifyEventStat.notifyEventPushTimes, pe->notifyEventStat.notifyEventPushElems,
1556
                        pe->notifyEventStat.notifyEventPushCostSec);
1557
  }
1558
  if (pe->notifyEventStat.notifyEventPackTimes > 0) {
229,590!
UNCOV
1559
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Pack %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1560
                        pe->notifyEventStat.notifyEventPackTimes, pe->notifyEventStat.notifyEventPackElems,
1561
                        pe->notifyEventStat.notifyEventPackCostSec);
1562
  }
1563
  if (pe->notifyEventStat.notifyEventSendTimes > 0) {
229,590!
UNCOV
1564
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "Send %" PRId64 "x, %" PRId64 " elems in %lfs; ",
×
1565
                        pe->notifyEventStat.notifyEventSendTimes, pe->notifyEventStat.notifyEventSendElems,
1566
                        pe->notifyEventStat.notifyEventSendCostSec);
1567
  }
1568
  if (pe->notifyEventStat.notifyEventHoldElems > 0) {
229,590!
UNCOV
1569
    offset += tsnprintf(buf + offset, sizeof(buf) - offset, "[Hold %" PRId64 " elems] ",
×
1570
                        pe->notifyEventStat.notifyEventHoldElems);
1571
  }
1572
  TSDB_CHECK_CONDITION(offset < sizeof(buf), code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
229,590!
1573
  buf[offset] = '\0';
229,590✔
1574

1575
  STR_TO_VARSTR(vbuf, buf);
229,590✔
1576

1577
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
229,590✔
1578
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
230,035!
1579

1580
  if (offset == 0) {
230,037!
1581
    colDataSetNULL(pColInfo, numOfRows);
230,037!
1582
  } else {
UNCOV
1583
    code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
×
UNCOV
1584
    TSDB_CHECK_CODE(code, lino, _end);
×
1585
  }
1586

UNCOV
1587
_end:
×
1588
  if (code) {
230,037!
UNCOV
1589
    mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code));
×
1590
  }
1591
  return code;
230,024✔
1592
}
1593

1594
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
10,548✔
1595
  const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
10,548✔
1596
  const SEp *p = GET_ACTIVE_EP(pCurrent);
10,548✔
1597

1598
  if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
10,548!
1599
    return false;
10,548✔
1600
  }
1601
  return true;
×
1602
}
1603

1604
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo) {
2,356✔
1605
  if (pInfo != NULL) {
2,356!
1606
    taosArrayDestroy(pInfo->pUpdateNodeList);
2,356✔
1607
    taosHashCleanup(pInfo->pDBMap);
2,356✔
1608
  }
1609
}
2,356✔
1610

1611
// 1. increase the replica does not affect the stream process.
1612
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
1613
// tasks on the will be removed replica.
1614
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
1615
// will handle it as mentioned in 1 & 2 items.
1616
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
2,356✔
1617
                               SVgroupChangeInfo *pInfo) {
1618
  int32_t code = 0;
2,356✔
1619
  int32_t lino = 0;
2,356✔
1620

1621
  if (pInfo == NULL) {
2,356!
UNCOV
1622
    return TSDB_CODE_INVALID_PARA;
×
1623
  }
1624

1625
  pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo));
2,356✔
1626
  pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
2,356✔
1627

1628
  if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
2,356!
UNCOV
1629
    mndDestroyVgroupChangeInfo(pInfo);
×
UNCOV
1630
    TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
×
1631
  }
1632

1633
  int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
2,356✔
1634
  for (int32_t i = 0; i < numOfNodes; ++i) {
13,667✔
1635
    SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
11,311✔
1636
    if (pPrevEntry == NULL) {
11,311!
UNCOV
1637
      continue;
×
1638
    }
1639

1640
    int32_t num = taosArrayGetSize(pNodeList);
11,311✔
1641
    for (int32_t j = 0; j < num; ++j) {
101,715✔
1642
      SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
100,961✔
1643
      if (pCurrent == NULL) {
100,961!
UNCOV
1644
        continue;
×
1645
      }
1646

1647
      if (pCurrent->nodeId == pPrevEntry->nodeId) {
100,961✔
1648
        if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
10,557!
1649
          const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
9✔
1650

1651
          char buf[256] = {0};
9✔
1652
          code = epsetToStr(&pCurrent->epset, buf, tListLen(buf));  // ignore this error
9✔
1653
          if (code) {
9!
UNCOV
1654
            mError("failed to convert epset string, code:%s", tstrerror(code));
×
UNCOV
1655
            TSDB_CHECK_CODE(code, lino, _err);
×
1656
          }
1657

1658
          mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
9✔
1659
                 pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
1660

1661
          SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
9✔
1662
          epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
9✔
1663
          epsetAssign(&updateInfo.newEp, &pCurrent->epset);
9✔
1664

1665
          void *p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
9✔
1666
          TSDB_CHECK_NULL(p, code, lino, _err, terrno);
9!
1667
        }
1668

1669
        // todo handle the snode info
1670
        if (pCurrent->nodeId != SNODE_HANDLE) {
10,557✔
1671
          SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
9,276✔
1672
          code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
9,276✔
1673
          mndReleaseVgroup(pMnode, pVgroup);
9,276✔
1674
          TSDB_CHECK_CODE(code, lino, _err);
9,276!
1675
        }
1676

1677
        break;
10,557✔
1678
      }
1679
    }
1680
  }
1681

1682
  return code;
2,356✔
1683

UNCOV
1684
_err:
×
UNCOV
1685
  mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
×
UNCOV
1686
  mndDestroyVgroupChangeInfo(pInfo);
×
UNCOV
1687
  return code;
×
1688
}
1689

1690
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
2,182✔
1691
  bool              allReady = false;
2,182✔
1692
  bool              nodeUpdated = false;
2,182✔
1693
  SVgroupChangeInfo changeInfo = {0};
2,182✔
1694

1695
  int32_t numOfNodes = extractStreamNodeList(pMnode);
2,182✔
1696

1697
  if (numOfNodes == 0) {
2,182✔
1698
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
1,104✔
1699
    execInfo.ts = taosGetTimestampSec();
1,104✔
1700
    return false;
1,104✔
1701
  }
1702

1703
  for (int32_t i = 0; i < numOfNodes; ++i) {
6,455✔
1704
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
5,394✔
1705
    if (pNodeEntry == NULL) {
5,394!
UNCOV
1706
      continue;
×
1707
    }
1708

1709
    if (pNodeEntry->stageUpdated) {
5,394✔
1710
      mDebug("stream task not ready due to node update detected, checkpoint not issued");
17✔
1711
      return true;
17✔
1712
    }
1713
  }
1714

1715
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot, NULL);
1,061✔
1716
  if (code) {
1,061!
UNCOV
1717
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
1718
  }
1719

1720
  if (!allReady) {
1,061✔
1721
    mWarn("not all vnodes ready, quit from vnodes status check");
18!
1722
    return true;
18✔
1723
  }
1724

1725
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
1,043✔
1726
  if (code) {
1,043!
UNCOV
1727
    nodeUpdated = false;
×
1728
  } else {
1729
    nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
1,043✔
1730
    if (nodeUpdated) {
1,043!
UNCOV
1731
      mDebug("stream tasks not ready due to node update");
×
1732
    }
1733
  }
1734

1735
  mndDestroyVgroupChangeInfo(&changeInfo);
1,043✔
1736
  return nodeUpdated;
1,043✔
1737
}
1738

1739
// check if the node update happens or not
1740
bool mndStreamNodeIsUpdated(SMnode *pMnode) {
2,182✔
1741
  SArray *pNodeSnapshot = NULL;
2,182✔
1742

1743
  streamMutexLock(&execInfo.lock);
2,182✔
1744
  bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
2,182✔
1745
  streamMutexUnlock(&execInfo.lock);
2,182✔
1746

1747
  taosArrayDestroy(pNodeSnapshot);
2,182✔
1748
  return updated;
2,182✔
1749
}
1750

1751
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
1,630✔
1752
  SSdb      *pSdb = pMnode->pSdb;
1,630✔
1753
  void      *pIter = NULL;
1,630✔
1754
  SSnodeObj *pObj = NULL;
1,630✔
1755

1756
  if (pSrcDb->cfg.replications == 1) {
1,630✔
1757
    return TSDB_CODE_SUCCESS;
1,627✔
1758
  } else {
1759
    while (1) {
1760
      pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
3✔
1761
      if (pIter == NULL) {
3✔
1762
        break;
2✔
1763
      }
1764

1765
      sdbRelease(pSdb, pObj);
1✔
1766
      sdbCancelFetch(pSdb, pIter);
1✔
1767
      return TSDB_CODE_SUCCESS;
1✔
1768
    }
1769

1770
    mError("snode not existed when trying to create stream in db with multiple replica");
2!
1771
    return TSDB_CODE_SNODE_NOT_DEPLOYED;
2✔
1772
  }
1773
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc