• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3581

15 Jan 2025 01:12AM UTC coverage: 63.809% (+0.3%) from 63.556%
#3581

push

travis-ci

web-flow
Merge pull request #29561 from taosdata/fix/TD-33504

fix:[TD-33504]add test case

141492 of 284535 branches covered (49.73%)

Branch coverage included in aggregate %.

219814 of 281694 relevant lines covered (78.03%)

19173854.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.33
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndStream.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
44

45
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
46
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
47

48
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
49
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
51
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
53
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
54
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
55
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
56
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
57
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
58
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
59
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
60
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
61
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
62
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
63
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
64
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
65
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
66
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
67

68
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
69
static void     removeExpiredNodeInfo(const SArray *pNodeSnapshot);
70
static int32_t  doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
71
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
72

73
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
74
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
75
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
77
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
78

79
int32_t mndInitStream(SMnode *pMnode) {
1,851✔
80
  SSdbTable table = {
1,851✔
81
      .sdbType = SDB_STREAM,
82
      .keyType = SDB_KEY_BINARY,
83
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
84
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
85
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
86
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
87
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
88
  };
89
  SSdbTable tableSeq = {
1,851✔
90
      .sdbType = SDB_STREAM_SEQ,
91
      .keyType = SDB_KEY_BINARY,
92
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
93
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
94
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
95
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
96
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
97
  };
98

99
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,851✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,851✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
1,851✔
102

103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
1,851✔
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
1,851✔
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
1,851✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
1,851✔
107
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
1,851✔
108
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
1,851✔
109
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
1,851✔
110
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
1,851✔
111
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
1,851✔
112

113
  // for msgs inside mnode
114
  // TODO change the name
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
1,851✔
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
1,851✔
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
1,851✔
118
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
1,851✔
119

120
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
1,851✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
1,851✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
1,851✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
1,851✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
1,851✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
1,851✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
1,851✔
127
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
1,851✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
1,851✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
1,851✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
1,851✔
131

132
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
1,851✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
1,851✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
1,851✔
135

136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,851✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,851✔
138
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,851✔
139
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,851✔
140

141
  int32_t code = mndInitExecInfo();
1,851✔
142
  if (code) {
1,851!
143
    return code;
×
144
  }
145

146
  code = sdbSetTable(pMnode->pSdb, table);
1,851✔
147
  if (code) {
1,851!
148
    return code;
×
149
  }
150

151
  code = sdbSetTable(pMnode->pSdb, tableSeq);
1,851✔
152
  return code;
1,851✔
153
}
154

155
void mndCleanupStream(SMnode *pMnode) {
1,850✔
156
  taosArrayDestroy(execInfo.pTaskList);
1,850✔
157
  taosArrayDestroy(execInfo.pNodeList);
1,850✔
158
  taosArrayDestroy(execInfo.pKilledChkptTrans);
1,850✔
159
  taosHashCleanup(execInfo.pTaskMap);
1,850✔
160
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
1,850✔
161
  taosHashCleanup(execInfo.pTransferStateStreams);
1,850✔
162
  taosHashCleanup(execInfo.pChkptStreams);
1,850✔
163
  taosHashCleanup(execInfo.pStreamConsensus);
1,850✔
164
  (void)taosThreadMutexDestroy(&execInfo.lock);
1,850✔
165
  mDebug("mnd stream exec info cleanup");
1,850✔
166
}
1,850✔
167

168
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
6,826✔
169
  int32_t     code = 0;
6,826✔
170
  int32_t     lino = 0;
6,826✔
171
  SSdbRow    *pRow = NULL;
6,826✔
172
  SStreamObj *pStream = NULL;
6,826✔
173
  void       *buf = NULL;
6,826✔
174
  int8_t      sver = 0;
6,826✔
175
  int32_t     tlen;
176
  int32_t     dataPos = 0;
6,826✔
177

178
  code = sdbGetRawSoftVer(pRaw, &sver);
6,826✔
179
  TSDB_CHECK_CODE(code, lino, _over);
6,826!
180

181
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
6,826!
182
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
183
    goto _over;
×
184
  }
185

186
  pRow = sdbAllocRow(sizeof(SStreamObj));
6,826✔
187
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
6,826!
188

189
  pStream = sdbGetRowObj(pRow);
6,826✔
190
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
6,826!
191

192
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
6,826!
193

194
  buf = taosMemoryMalloc(tlen + 1);
6,826!
195
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,826!
196

197
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,826!
198

199
  SDecoder decoder;
200
  tDecoderInit(&decoder, buf, tlen + 1);
6,826✔
201
  code = tDecodeSStreamObj(&decoder, pStream, sver);
6,826✔
202
  tDecoderClear(&decoder);
6,826✔
203

204
  if (code < 0) {
6,826!
205
    tFreeStreamObj(pStream);
×
206
  }
207

208
_over:
6,826✔
209
  taosMemoryFreeClear(buf);
6,826!
210

211
  if (code != TSDB_CODE_SUCCESS) {
6,826!
212
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
213
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
214
    taosMemoryFreeClear(pRow);
×
215

216
    terrno = code;
×
217
    return NULL;
×
218
  } else {
219
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
6,826✔
220
           pStream->checkpointId);
221

222
    terrno = 0;
6,826✔
223
    return pRow;
6,826✔
224
  }
225
}
226

227
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
1,836✔
228
  mTrace("stream:%s, perform insert action", pStream->name);
1,836✔
229
  return 0;
1,836✔
230
}
231

232
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
6,826✔
233
  mTrace("stream:%s, perform delete action", pStream->name);
6,826✔
234
  taosWLockLatch(&pStream->lock);
6,826✔
235
  tFreeStreamObj(pStream);
6,826✔
236
  taosWUnLockLatch(&pStream->lock);
6,826✔
237
  return 0;
6,826✔
238
}
239

240
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
3,615✔
241
  mTrace("stream:%s, perform update action", pOldStream->name);
3,615✔
242
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
3,615✔
243

244
  taosWLockLatch(&pOldStream->lock);
3,615✔
245

246
  pOldStream->status = pNewStream->status;
3,615✔
247
  pOldStream->updateTime = pNewStream->updateTime;
3,615✔
248
  pOldStream->checkpointId = pNewStream->checkpointId;
3,615✔
249
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
3,615✔
250

251
  taosWUnLockLatch(&pOldStream->lock);
3,615✔
252
  return 0;
3,615✔
253
}
254

255
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
6,937✔
256
  int32_t code = 0;
6,937✔
257
  SSdb   *pSdb = pMnode->pSdb;
6,937✔
258
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
6,937✔
259
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
6,937!
260
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
2,869✔
261
  }
262
  return code;
6,937✔
263
}
264

265
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
14,968✔
266
  SSdb *pSdb = pMnode->pSdb;
14,968✔
267
  sdbRelease(pSdb, pStream);
14,968✔
268
}
14,968✔
269

270
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
271
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
272
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
273
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
274
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
275

276
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
1,744✔
277
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
1,744!
278
      pCreate->targetStbFullName[0] == 0) {
1,744!
279
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
280
  }
281
  return TSDB_CODE_SUCCESS;
1,744✔
282
}
283

284
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
1,740✔
285
  pWrapper->nCols = taosArrayGetSize(pFields);
1,740✔
286
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
1,740!
287
  if (NULL == pWrapper->pSchema) {
1,740!
288
    return terrno;
×
289
  }
290

291
  int32_t index = 0;
1,740✔
292
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
62,400✔
293
    SField *pField = (SField *)taosArrayGet(pFields, i);
60,660✔
294
    if (pField == NULL) {
60,660!
295
      return terrno;
×
296
    }
297

298
    if (TSDB_DATA_TYPE_NULL == pField->type) {
60,660!
299
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
300
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
301
    } else {
302
      pWrapper->pSchema[index].type = pField->type;
60,660✔
303
      pWrapper->pSchema[index].bytes = pField->bytes;
60,660✔
304
    }
305
    pWrapper->pSchema[index].colId = index + 1;
60,660✔
306
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
60,660✔
307
    pWrapper->pSchema[index].flags = pField->flags;
60,660✔
308
    index += 1;
60,660✔
309
  }
310

311
  return TSDB_CODE_SUCCESS;
1,740✔
312
}
313

314
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
1,740✔
315
  if (pWrapper->nCols < 2) {
1,740!
316
    return false;
×
317
  }
318
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
60,841✔
319
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
59,128✔
320
      return true;
27✔
321
    }
322
  }
323
  return false;
1,713✔
324
}
325

326
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
1,740✔
327
  SNode      *pAst = NULL;
1,740✔
328
  SQueryPlan *pPlan = NULL;
1,740✔
329
  int32_t     code = 0;
1,740✔
330

331
  mInfo("stream:%s to create", pCreate->name);
1,740!
332
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
1,740✔
333
  pObj->createTime = taosGetTimestampMs();
1,740✔
334
  pObj->updateTime = pObj->createTime;
1,740✔
335
  pObj->version = 1;
1,740✔
336

337
  if (pCreate->smaId > 0) {
1,740✔
338
    pObj->subTableWithoutMd5 = 1;
259✔
339
  }
340

341
  pObj->smaId = pCreate->smaId;
1,740✔
342
  pObj->indexForMultiAggBalance = -1;
1,740✔
343

344
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,740✔
345

346
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
1,740✔
347
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
1,740✔
348

349
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,740✔
350
  pObj->status = 0;
1,740✔
351

352
  pObj->conf.igExpired = pCreate->igExpired;
1,740✔
353
  pObj->conf.trigger = pCreate->triggerType;
1,740✔
354
  pObj->conf.triggerParam = pCreate->maxDelay;
1,740✔
355
  pObj->conf.watermark = pCreate->watermark;
1,740✔
356
  pObj->conf.fillHistory = pCreate->fillHistory;
1,740✔
357
  pObj->deleteMark = pCreate->deleteMark;
1,740✔
358
  pObj->igCheckUpdate = pCreate->igUpdate;
1,740✔
359

360
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
1,740✔
361
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
1,740✔
362
  if (pSourceDb == NULL) {
1,740!
363
    code = terrno;
×
364
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
365
          tstrerror(code));
366
    goto FAIL;
×
367
  }
368

369
  pObj->sourceDbUid = pSourceDb->uid;
1,740✔
370
  mndReleaseDb(pMnode, pSourceDb);
1,740✔
371

372
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
1,740✔
373

374
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
1,740✔
375
  if (pTargetDb == NULL) {
1,740!
376
    code = terrno;
×
377
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
378
           tstrerror(code));
379
    goto FAIL;
×
380
  }
381

382
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
1,740✔
383

384
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
1,740✔
385
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,587✔
386
  } else {
387
    pObj->targetStbUid = pCreate->targetStbUid;
153✔
388
  }
389
  pObj->targetDbUid = pTargetDb->uid;
1,740✔
390
  mndReleaseDb(pMnode, pTargetDb);
1,740✔
391

392
  pObj->sql = pCreate->sql;
1,740✔
393
  pObj->ast = pCreate->ast;
1,740✔
394

395
  pCreate->sql = NULL;
1,740✔
396
  pCreate->ast = NULL;
1,740✔
397

398
  // deserialize ast
399
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
1,740!
400
    goto FAIL;
×
401
  }
402

403
  // create output schema
404
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
1,740!
405
    goto FAIL;
×
406
  }
407

408
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
1,740✔
409
  if (numOfNULL > 0) {
1,740✔
410
    pObj->outputSchema.nCols += numOfNULL;
26✔
411
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
26!
412
    if (!pFullSchema) {
26!
413
      code = terrno;
×
414
      goto FAIL;
×
415
    }
416

417
    int32_t nullIndex = 0;
26✔
418
    int32_t dataIndex = 0;
26✔
419
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
332✔
420
      if (nullIndex >= numOfNULL) {
306!
421
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
422
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
423
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
424
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
425
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
426
        dataIndex++;
×
427
      } else {
428
        SColLocation *pos = NULL;
306✔
429
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
306!
430
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
306✔
431
        }
432

433
        if (pos == NULL) {
306!
434
          mError("invalid null column index, %d", nullIndex);
×
435
          continue;
×
436
        }
437

438
        if (i < pos->slotId) {
306✔
439
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
79✔
440
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
79✔
441
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
79✔
442
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
79✔
443
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
79✔
444
          dataIndex++;
79✔
445
        } else {
446
          pFullSchema[i].bytes = 0;
227✔
447
          pFullSchema[i].colId = pos->colId;
227✔
448
          pFullSchema[i].flags = COL_SET_NULL;
227✔
449
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
227✔
450
          pFullSchema[i].type = pos->type;
227✔
451
          nullIndex++;
227✔
452
        }
453
      }
454
    }
455

456
    taosMemoryFree(pObj->outputSchema.pSchema);
26!
457
    pObj->outputSchema.pSchema = pFullSchema;
26✔
458
  }
459

460
  SPlanContext cxt = {
1,740✔
461
      .pAstRoot = pAst,
462
      .topicQuery = false,
463
      .streamQuery = true,
464
      .triggerType =
465
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
1,740✔
466
      .watermark = pObj->conf.watermark,
1,740✔
467
      .igExpired = pObj->conf.igExpired,
1,740✔
468
      .deleteMark = pObj->deleteMark,
1,740✔
469
      .igCheckUpdate = pObj->igCheckUpdate,
1,740✔
470
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
1,740✔
471
  };
472

473
  // using ast and param to build physical plan
474
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
1,740!
475
    goto FAIL;
×
476
  }
477

478
  // save physcial plan
479
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
1,740!
480
    goto FAIL;
×
481
  }
482

483
  pObj->tagSchema.nCols = pCreate->numOfTags;
1,740✔
484
  if (pCreate->numOfTags) {
1,740✔
485
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
284!
486
    if (pObj->tagSchema.pSchema == NULL) {
284!
487
      code = terrno;
×
488
      goto FAIL;
×
489
    }
490
  }
491

492
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
493
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
3,330✔
494
    SField *pField = taosArrayGet(pCreate->pTags, i);
1,590✔
495
    if (pField == NULL) {
1,590!
496
      continue;
×
497
    }
498

499
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
1,590✔
500
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
1,590✔
501
    pObj->tagSchema.pSchema[i].flags = pField->flags;
1,590✔
502
    pObj->tagSchema.pSchema[i].type = pField->type;
1,590✔
503
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
1,590✔
504
  }
505

506
FAIL:
1,740✔
507
  if (pAst != NULL) nodesDestroyNode(pAst);
1,740!
508
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
1,740!
509
  return code;
1,740✔
510
}
511

512
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
13,967✔
513
  SEncoder encoder;
514
  tEncoderInit(&encoder, NULL, 0);
13,967✔
515

516
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
13,967!
517
    pTask->ver = SSTREAM_TASK_VER;
×
518
  }
519

520
  int32_t code = tEncodeStreamTask(&encoder, pTask);
13,967✔
521
  if (code == -1) {
13,967!
522
    tEncoderClear(&encoder);
×
523
    return TSDB_CODE_INVALID_MSG;
×
524
  }
525

526
  int32_t size = encoder.pos;
13,967✔
527
  int32_t tlen = sizeof(SMsgHead) + size;
13,967✔
528
  tEncoderClear(&encoder);
13,967✔
529

530
  void *buf = taosMemoryCalloc(1, tlen);
13,967!
531
  if (buf == NULL) {
13,967!
532
    return terrno;
×
533
  }
534

535
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
13,967✔
536

537
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
13,967✔
538
  tEncoderInit(&encoder, abuf, size);
13,967✔
539
  code = tEncodeStreamTask(&encoder, pTask);
13,967✔
540
  tEncoderClear(&encoder);
13,967✔
541

542
  if (code != 0) {
13,967!
543
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
544
    taosMemoryFree(buf);
×
545
    return code;
×
546
  }
547

548
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
13,967✔
549
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
550
  if (code) {
13,967!
551
    taosMemoryFree(buf);
×
552
  }
553

554
  return code;
13,967✔
555
}
556

557
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
1,768✔
558
  SStreamTaskIter *pIter = NULL;
1,768✔
559
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,768✔
560
  if (code) {
1,768!
561
    mError("failed to create task iter for stream:%s", pStream->name);
×
562
    return code;
×
563
  }
564

565
  while (streamTaskIterNextTask(pIter)) {
10,904✔
566
    SStreamTask *pTask = NULL;
9,136✔
567
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,136✔
568
    if (code) {
9,136!
569
      destroyStreamTaskIter(pIter);
×
570
      return code;
×
571
    }
572

573
    code = mndPersistTaskDeployReq(pTrans, pTask);
9,136✔
574
    if (code) {
9,136!
575
      destroyStreamTaskIter(pIter);
×
576
      return code;
×
577
    }
578
  }
579

580
  destroyStreamTaskIter(pIter);
1,768✔
581

582
  // persistent stream task for already stored ts data
583
  if (pStream->conf.fillHistory) {
1,768✔
584
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
833✔
585

586
    for (int32_t i = 0; i < level; i++) {
2,561✔
587
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
1,728✔
588

589
      int32_t numOfTasks = taosArrayGetSize(pLevel);
1,728✔
590
      for (int32_t j = 0; j < numOfTasks; j++) {
6,559✔
591
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
4,831✔
592
        code = mndPersistTaskDeployReq(pTrans, pTask);
4,831✔
593
        if (code) {
4,831!
594
          return code;
×
595
        }
596
      }
597
    }
598
  }
599

600
  return code;
1,768✔
601
}
602

603
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
1,768✔
604
  int32_t code = 0;
1,768✔
605
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
1,768!
606
    return code;
×
607
  }
608

609
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,768✔
610
}
611

612
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
1,587✔
613
  SStbObj *pStb = NULL;
1,587✔
614
  SDbObj  *pDb = NULL;
1,587✔
615
  int32_t  code = 0;
1,587✔
616
  int32_t  lino = 0;
1,587✔
617

618
  SMCreateStbReq createReq = {0};
1,587✔
619
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,587✔
620
  createReq.numOfColumns = pStream->outputSchema.nCols;
1,587✔
621
  createReq.numOfTags = 1;  // group id
1,587✔
622
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
1,587✔
623
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
1,587!
624

625
  // build fields
626
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
60,591✔
627
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
59,004✔
628
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
59,004!
629

630
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
59,004✔
631
    pField->flags = pStream->outputSchema.pSchema[i].flags;
59,004✔
632
    pField->type = pStream->outputSchema.pSchema[i].type;
59,004✔
633
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
59,004✔
634
    pField->compress = createDefaultColCmprByType(pField->type);
59,004✔
635
  }
636

637
  if (pStream->tagSchema.nCols == 0) {
1,587✔
638
    createReq.numOfTags = 1;
1,303✔
639
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
1,303✔
640
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
1,303!
641

642
    // build tags
643
    SField *pField = taosArrayGet(createReq.pTags, 0);
1,303✔
644
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
1,303!
645

646
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
1,303✔
647
    pField->type = TSDB_DATA_TYPE_UBIGINT;
1,303✔
648
    pField->flags = 0;
1,303✔
649
    pField->bytes = 8;
1,303✔
650
  } else {
651
    createReq.numOfTags = pStream->tagSchema.nCols;
284✔
652
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
284✔
653
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
284!
654

655
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
1,874✔
656
      SField *pField = taosArrayGet(createReq.pTags, i);
1,590✔
657
      if (pField == NULL) {
1,590!
658
        continue;
×
659
      }
660

661
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
1,590✔
662
      pField->flags = pStream->tagSchema.pSchema[i].flags;
1,590✔
663
      pField->type = pStream->tagSchema.pSchema[i].type;
1,590✔
664
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
1,590✔
665
    }
666
  }
667

668
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
1,587!
669
    goto _OVER;
×
670
  }
671

672
  pStb = mndAcquireStb(pMnode, createReq.name);
1,587✔
673
  if (pStb != NULL) {
1,587!
674
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
675
    goto _OVER;
×
676
  }
677

678
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
1,587✔
679
  if (pDb == NULL) {
1,587!
680
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
681
    goto _OVER;
×
682
  }
683

684
  int32_t numOfStbs = -1;
1,587✔
685
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
1,587!
686
    goto _OVER;
×
687
  }
688

689
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
1,587!
690
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
691
    goto _OVER;
×
692
  }
693

694
  SStbObj stbObj = {0};
1,587✔
695

696
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
1,587!
697
    goto _OVER;
×
698
  }
699

700
  stbObj.uid = pStream->targetStbUid;
1,587✔
701

702
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
1,587!
703
    mndFreeStb(&stbObj);
×
704
    goto _OVER;
×
705
  }
706

707
  tFreeSMCreateStbReq(&createReq);
1,587✔
708
  mndFreeStb(&stbObj);
1,587✔
709
  mndReleaseStb(pMnode, pStb);
1,587✔
710
  mndReleaseDb(pMnode, pDb);
1,587✔
711
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
1,587✔
712
  return code;
1,587✔
713

714
_OVER:
×
715
  tFreeSMCreateStbReq(&createReq);
×
716
  mndReleaseStb(pMnode, pStb);
×
717
  mndReleaseDb(pMnode, pDb);
×
718

719
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
720
         tstrerror(code));
721
  return code;
×
722
}
723

724
// 1. stream number check
725
// 2. target stable can not be target table of other existed streams.
726
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
1,740✔
727
  int32_t     numOfStream = 0;
1,740✔
728
  SStreamObj *pStream = NULL;
1,740✔
729
  void       *pIter = NULL;
1,740✔
730

731
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,986✔
732
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
2,247✔
733
      ++numOfStream;
1,564✔
734
    }
735

736
    sdbRelease(pMnode->pSdb, pStream);
2,247✔
737

738
    if (numOfStream > MND_STREAM_MAX_NUM) {
2,247!
739
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
740
             pStreamObj->name);
741
      sdbCancelFetch(pMnode->pSdb, pIter);
×
742
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
743
    }
744

745
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
2,247✔
746
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
747
             pStreamObj->name);
748
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
749
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
750
    }
751
  }
752

753
  return TSDB_CODE_SUCCESS;
1,739✔
754
}
755

756
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
1,744✔
757
  SMnode     *pMnode = pReq->info.node;
1,744✔
758
  SStreamObj *pStream = NULL;
1,744✔
759
  SStreamObj  streamObj = {0};
1,744✔
760
  char       *sql = NULL;
1,744✔
761
  int32_t     sqlLen = 0;
1,744✔
762
  const char *pMsg = "create stream tasks on dnodes";
1,744✔
763
  int32_t     code = TSDB_CODE_SUCCESS;
1,744✔
764
  int32_t     lino = 0;
1,744✔
765
  STrans     *pTrans = NULL;
1,744✔
766

767
  SCMCreateStreamReq createReq = {0};
1,744✔
768
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
1,744✔
769
  TSDB_CHECK_CODE(code, lino, _OVER);
1,744!
770

771
#ifdef WINDOWS
772
  code = TSDB_CODE_MND_INVALID_PLATFORM;
773
  goto _OVER;
774
#endif
775

776
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
1,744!
777
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
1,744!
778
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
779
    goto _OVER;
×
780
  }
781

782
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
1,744✔
783
  if (pStream != NULL && code == 0) {
1,744!
784
    if (createReq.igExists) {
2✔
785
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
786
      mndReleaseStream(pMnode, pStream);
1✔
787
      tFreeSCMCreateStreamReq(&createReq);
1✔
788
      return code;
1✔
789
    } else {
790
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
791
      goto _OVER;
1✔
792
    }
793
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
1,742!
794
    goto _OVER;
×
795
  }
796

797
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
1,742!
798
    goto _OVER;
×
799
  }
800

801
  if (createReq.sql != NULL) {
1,742!
802
    sql = taosStrdup(createReq.sql);
1,742!
803
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
1,742!
804
  }
805

806
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
1,742✔
807
  if (pSourceDb == NULL) {
1,742!
808
    code = terrno;
×
809
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
810
          tstrerror(code));
811
    goto _OVER;
×
812
  }
813

814
  code = mndCheckForSnode(pMnode, pSourceDb);
1,742✔
815
  mndReleaseDb(pMnode, pSourceDb);
1,742✔
816
  if (code != 0) {
1,742✔
817
    goto _OVER;
2✔
818
  }
819

820
  // build stream obj from request
821
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
1,740!
822
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
823
    goto _OVER;
×
824
  }
825

826
  code = doStreamCheck(pMnode, &streamObj);
1,740✔
827
  TSDB_CHECK_CODE(code, lino, _OVER);
1,740✔
828

829
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
1,739✔
830
  if (pTrans == NULL || code) {
1,739!
831
    goto _OVER;
×
832
  }
833

834
  // create stb for stream
835
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
1,739✔
836
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
1,587!
837
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
838
      mndTransDrop(pTrans);
×
839
      goto _OVER;
×
840
    }
841
  } else {
842
    mDebug("stream:%s no need create stable", createReq.name);
152✔
843
  }
844

845
  // schedule stream task for stream obj
846
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
1,739✔
847
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,739!
848
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
849
    mndTransDrop(pTrans);
×
850
    goto _OVER;
×
851
  }
852

853
  // add stream to trans
854
  code = mndPersistStream(pTrans, &streamObj);
1,739✔
855
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,739!
856
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
857
    mndTransDrop(pTrans);
×
858
    goto _OVER;
×
859
  }
860

861
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
1,739!
862
    mndTransDrop(pTrans);
×
863
    goto _OVER;
×
864
  }
865

866
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
1,739!
867
    mndTransDrop(pTrans);
×
868
    goto _OVER;
×
869
  }
870

871
  // add into buffer firstly
872
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
873
  streamMutexLock(&execInfo.lock);
1,739✔
874
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
1,739✔
875
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
1,739✔
876
  streamMutexUnlock(&execInfo.lock);
1,739✔
877

878
  // execute creation
879
  code = mndTransPrepare(pMnode, pTrans);
1,739✔
880
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,739!
881
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
882
    mndTransDrop(pTrans);
×
883
    goto _OVER;
×
884
  }
885

886
  mndTransDrop(pTrans);
1,739✔
887

888
  SName dbname = {0};
1,739✔
889
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,739✔
890
  if (code) {
1,739!
891
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
892
    goto _OVER;
×
893
  }
894

895
  SName name = {0};
1,739✔
896
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
1,739✔
897
  if (code) {
1,739!
898
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
899
    goto _OVER;
×
900
  }
901

902
  // reuse this function for stream
903
  if (sql != NULL && sqlLen > 0) {
1,739!
904
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
905
  } else {
906
    char detail[1000] = {0};
1,739✔
907
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
1,739✔
908
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
1,739✔
909
  }
910

911
_OVER:
1,743✔
912
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,743!
913
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
4!
914
  } else {
915
    mDebug("stream:%s create stream completed", createReq.name);
1,739✔
916
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,739✔
917
  }
918

919
  mndReleaseStream(pMnode, pStream);
1,743✔
920
  tFreeSCMCreateStreamReq(&createReq);
1,743✔
921
  tFreeStreamObj(&streamObj);
1,743✔
922

923
  if (sql != NULL) {
1,743✔
924
    taosMemoryFreeClear(sql);
1,742!
925
  }
926

927
  return code;
1,743✔
928
}
929

930
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
931
  SMnode          *pMnode = pReq->info.node;
×
932
  SStreamObj      *pStream = NULL;
×
933
  int32_t          code = 0;
×
934
  SMPauseStreamReq pauseReq = {0};
×
935

936
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
937
    return TSDB_CODE_INVALID_MSG;
×
938
  }
939

940
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
941
  if (pStream == NULL || code != 0) {
×
942
    if (pauseReq.igNotExists) {
×
943
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
944
      return 0;
×
945
    } else {
946
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
947
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
948
    }
949
  }
950

951
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
952
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
953
    sdbRelease(pMnode->pSdb, pStream);
×
954
    return code;
×
955
  }
956

957
  // check if it is conflict with other trans in both sourceDb and targetDb.
958
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
959
  if (code) {
×
960
    sdbRelease(pMnode->pSdb, pStream);
×
961
    return code;
×
962
  }
963

964
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
965
  if (updated) {
×
966
    mError("tasks are not ready for restart, node update detected");
×
967
    sdbRelease(pMnode->pSdb, pStream);
×
968
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
969
  }
970

971
  STrans *pTrans = NULL;
×
972
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
973
                       &pTrans);
974
  if (pTrans == NULL || code) {
×
975
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
976
    sdbRelease(pMnode->pSdb, pStream);
×
977
    return code;
×
978
  }
979

980
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
981
  if (code) {
×
982
    sdbRelease(pMnode->pSdb, pStream);
×
983
    mndTransDrop(pTrans);
×
984
    return code;
×
985
  }
986

987
  // if nodeUpdate happened, not send pause trans
988
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
989
  if (code) {
×
990
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
991
    sdbRelease(pMnode->pSdb, pStream);
×
992
    mndTransDrop(pTrans);
×
993
    return code;
×
994
  }
995

996
  code = mndTransPrepare(pMnode, pTrans);
×
997
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
998
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
999
    sdbRelease(pMnode->pSdb, pStream);
×
1000
    mndTransDrop(pTrans);
×
1001
    return code;
×
1002
  }
1003

1004
  sdbRelease(pMnode->pSdb, pStream);
×
1005
  mndTransDrop(pTrans);
×
1006

1007
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1008
}
1009

1010
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
1,365✔
1011
  SStreamObj *pStream = NULL;
1,365✔
1012
  void       *pIter = NULL;
1,365✔
1013
  SSdb       *pSdb = pMnode->pSdb;
1,365✔
1014
  int64_t     maxChkptId = 0;
1,365✔
1015

1016
  while (1) {
1017
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4,785✔
1018
    if (pIter == NULL) break;
4,785✔
1019

1020
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
3,420✔
1021
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
3,420✔
1022
           pStream->checkpointId);
1023
    sdbRelease(pSdb, pStream);
3,420✔
1024
  }
1025

1026
  {  // check the max checkpoint id from all vnodes.
1027
    int64_t maxCheckpointId = -1;
1,365✔
1028
    if (lock) {
1,365✔
1029
      streamMutexLock(&execInfo.lock);
623✔
1030
    }
1031

1032
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
16,161✔
1033
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
14,796✔
1034
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
14,796✔
1035
      if (p == NULL || pEntry == NULL) {
14,796!
1036
        continue;
×
1037
      }
1038

1039
      if (pEntry->checkpointInfo.failed) {
14,796!
1040
        continue;
×
1041
      }
1042

1043
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
14,796✔
1044
        maxCheckpointId = pEntry->checkpointInfo.latestId;
1,820✔
1045
      }
1046
    }
1047

1048
    if (lock) {
1,365✔
1049
      streamMutexUnlock(&execInfo.lock);
623✔
1050
    }
1051

1052
    if (maxCheckpointId > maxChkptId) {
1,365!
1053
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1054
             maxCheckpointId);
1055
      maxChkptId = maxCheckpointId;
×
1056
    }
1057
  }
1058

1059
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
1,365✔
1060
  return maxChkptId + 1;
1,365✔
1061
}
1062

1063
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
1,367✔
1064
                                               int8_t mndTrigger, bool lock) {
1065
  int32_t code = TSDB_CODE_SUCCESS;
1,367✔
1066
  bool    conflict = false;
1,367✔
1067
  int64_t ts = taosGetTimestampMs();
1,367✔
1068
  STrans *pTrans = NULL;
1,367✔
1069

1070
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
1,367!
1071
    return code;
×
1072
  }
1073

1074
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
1,367✔
1075
  if (code) {
1,367!
1076
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1077
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
1078
    goto _ERR;
×
1079
  }
1080

1081
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
1,367✔
1082
                       "gen checkpoint for stream", &pTrans);
1083
  if (code) {
1,367!
1084
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1085
           tstrerror(code));
1086
    goto _ERR;
×
1087
  }
1088

1089
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
1,367✔
1090
  if (code) {
1,367!
1091
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
1092
    goto _ERR;
×
1093
  }
1094

1095
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
1,367✔
1096

1097
  taosWLockLatch(&pStream->lock);
1,367✔
1098
  pStream->currentTick = 1;
1,367✔
1099

1100
  // 1. redo action: broadcast checkpoint source msg for all source vg
1101
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
1,367✔
1102
  for (int32_t i = 0; i < totalLevel; i++) {
4,133✔
1103
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
2,766✔
1104
    SStreamTask *p = taosArrayGetP(pLevel, 0);
2,766✔
1105

1106
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
2,766✔
1107
      int32_t sz = taosArrayGetSize(pLevel);
1,367✔
1108
      for (int32_t j = 0; j < sz; j++) {
4,565✔
1109
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
3,198✔
1110
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
3,198✔
1111

1112
        if (code != TSDB_CODE_SUCCESS) {
3,198!
1113
          taosWUnLockLatch(&pStream->lock);
×
1114
          goto _ERR;
×
1115
        }
1116
      }
1117
    }
1118
  }
1119

1120
  // 2. reset tick
1121
  pStream->checkpointId = checkpointId;
1,367✔
1122
  pStream->checkpointFreq = taosGetTimestampMs();
1,367✔
1123
  pStream->currentTick = 0;
1,367✔
1124

1125
  // 3. commit log: stream checkpoint info
1126
  pStream->version = pStream->version + 1;
1,367✔
1127
  taosWUnLockLatch(&pStream->lock);
1,367✔
1128

1129
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
1,367!
1130
    goto _ERR;
×
1131
  }
1132

1133
  code = mndTransPrepare(pMnode, pTrans);
1,367✔
1134
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,367!
1135
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1136
  } else {
1137
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,367✔
1138
  }
1139

1140
_ERR:
1,367✔
1141
  mndTransDrop(pTrans);
1,367✔
1142
  return code;
1,367✔
1143
}
1144

1145
int32_t extractStreamNodeList(SMnode *pMnode) {
3,557✔
1146
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
3,557✔
1147
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
727✔
1148
    if (code) {
727!
1149
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
1150
      return code;
×
1151
    }
1152
  }
1153

1154
  return taosArrayGetSize(execInfo.pNodeList);
3,557✔
1155
}
1156

1157
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,725✔
1158
  bool ready = true;
1,725✔
1159
  if (mndStreamNodeIsUpdated(pMnode)) {
1,725✔
1160
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
34✔
1161
  }
1162

1163
  streamMutexLock(&execInfo.lock);
1,691✔
1164
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,691✔
1165
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
727✔
1166
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
727!
1167
      streamMutexUnlock(&execInfo.lock);
×
1168
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
1169
      return TSDB_CODE_FAILED;
×
1170
    }
1171
  }
1172

1173
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
11,064✔
1174
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
9,507✔
1175
    if (p == NULL) {
9,507!
1176
      continue;
×
1177
    }
1178

1179
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
9,507✔
1180
    if (pEntry == NULL) {
9,507!
1181
      continue;
×
1182
    }
1183

1184
    if (pEntry->status != TASK_STATUS__READY) {
9,507✔
1185
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
117✔
1186
             (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1187
      ready = false;
117✔
1188
      break;
117✔
1189
    }
1190

1191
    if (pEntry->hTaskId != 0) {
9,390✔
1192
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
17✔
1193
             " exists, checkpoint not issued",
1194
             pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1195
             pEntry->hTaskId);
1196
      ready = false;
17✔
1197
      break;
17✔
1198
    }
1199
  }
1200

1201
  streamMutexUnlock(&execInfo.lock);
1,691✔
1202
  return ready ? 0 : -1;
1,691✔
1203
}
1204

1205
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
1,710✔
1206
  int64_t ts = -1;
1,710✔
1207
  int32_t taskId = -1;
1,710✔
1208

1209
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
29,304✔
1210
    STaskId          *p = taosArrayGet(pTaskList, i);
27,594✔
1211
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
27,594✔
1212
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
27,594!
1213
      continue;
21,259✔
1214
    }
1215

1216
    if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
6,335!
1217
      ts = pEntry->startTime;
3,268✔
1218
      taskId = pEntry->id.taskId;
3,268✔
1219
    }
1220
  }
1221

1222
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
1,710✔
1223
  return ts;
1,710✔
1224
}
1225

1226
typedef struct {
1227
  int64_t streamId;
1228
  int64_t duration;
1229
} SCheckpointInterval;
1230

1231
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
678✔
1232
  const SCheckpointInterval *pInt1 = p1;
678✔
1233
  const SCheckpointInterval *pInt2 = p2;
678✔
1234
  if (pInt1->duration == pInt2->duration) {
678✔
1235
    return 0;
54✔
1236
  }
1237

1238
  return pInt1->duration > pInt2->duration ? -1 : 1;
624✔
1239
}
1240

1241
// all tasks of this stream should be ready, otherwise do nothing
1242
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
1,710✔
1243
  bool ready = false;
1,710✔
1244

1245
  streamMutexLock(&execInfo.lock);
1,710✔
1246

1247
  int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
1,710✔
1248
  if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
1,710!
1249
    if (lastReadyTs != -1) {
436✔
1250
      mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold",
434!
1251
            pStream->uid, lastReadyTs, now - lastReadyTs);
1252
    } else {
1253
      mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid);
2!
1254
    }
1255

1256
    ready = false;
436✔
1257
  } else {
1258
    ready = true;
1,274✔
1259
  }
1260

1261
  streamMutexUnlock(&execInfo.lock);
1,710✔
1262
  return ready;
1,710✔
1263
}
1264

1265
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,725✔
1266
  SMnode     *pMnode = pReq->info.node;
1,725✔
1267
  SSdb       *pSdb = pMnode->pSdb;
1,725✔
1268
  void       *pIter = NULL;
1,725✔
1269
  SStreamObj *pStream = NULL;
1,725✔
1270
  int32_t     code = 0;
1,725✔
1271
  int32_t     numOfCheckpointTrans = 0;
1,725✔
1272

1273
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,725✔
1274
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
168✔
1275
  }
1276

1277
  SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
1,557✔
1278
  if (pList == NULL) {
1,557!
1279
    return terrno;
×
1280
  }
1281

1282
  int64_t now = taosGetTimestampMs();
1,557✔
1283

1284
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,990✔
1285
    int64_t duration = now - pStream->checkpointFreq;
2,433✔
1286
    if (duration < tsStreamCheckpointInterval * 1000) {
2,433✔
1287
      sdbRelease(pSdb, pStream);
723✔
1288
      continue;
1,159✔
1289
    }
1290

1291
    bool ready = isStreamReadyHelp(now, pStream);
1,710✔
1292
    if (!ready) {
1,710✔
1293
      sdbRelease(pSdb, pStream);
436✔
1294
      continue;
436✔
1295
    }
1296

1297
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
1,274✔
1298
    void               *p = taosArrayPush(pList, &in);
1,274✔
1299
    if (p) {
1,274!
1300
      int32_t currentSize = taosArrayGetSize(pList);
1,274✔
1301
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
1,274✔
1302
             "s), concurrently launch threshold:%d",
1303
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1304
             tsMaxConcurrentCheckpoint);
1305
    } else {
1306
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1307
    }
1308
    sdbRelease(pSdb, pStream);
1,274✔
1309
  }
1310

1311
  int32_t size = taosArrayGetSize(pList);
1,557✔
1312
  if (size == 0) {
1,557✔
1313
    taosArrayDestroy(pList);
934✔
1314
    return code;
934✔
1315
  }
1316

1317
  taosArraySort(pList, streamWaitComparFn);
623✔
1318
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
623✔
1319
  if (code) {
623!
1320
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1321
    taosArrayDestroy(pList);
×
1322
    return code;
×
1323
  }
1324

1325
  int32_t numOfQual = taosArrayGetSize(pList);
623✔
1326
  if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
623!
1327
    mDebug(
×
1328
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1329
        "checkpoint trans are not allowed, wait for 30s",
1330
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1331
    taosArrayDestroy(pList);
×
1332
    return code;
×
1333
  }
1334

1335
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
623✔
1336
  mDebug(
623✔
1337
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1338
      "concurrent trans threshold:%d",
1339
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1340

1341
  int32_t started = 0;
623✔
1342
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
623✔
1343

1344
  for (int32_t i = 0; i < numOfQual; ++i) {
627✔
1345
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
625✔
1346
    if (pCheckpointInfo == NULL) {
625!
1347
      continue;
×
1348
    }
1349

1350
    SStreamObj *p = NULL;
625✔
1351
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
625✔
1352
    if (p != NULL && code == 0) {
625!
1353
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
625✔
1354
      sdbRelease(pSdb, p);
625✔
1355

1356
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
625!
1357
        started += 1;
625✔
1358

1359
        if (started >= capacity) {
625✔
1360
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
621✔
1361
                 (started + numOfCheckpointTrans));
1362
          break;
621✔
1363
        }
1364
      } else {
1365
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1366
      }
1367
    }
1368
  }
1369

1370
  taosArrayDestroy(pList);
623✔
1371
  return code;
623✔
1372
}
1373

1374
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
1,416✔
1375
  SMnode     *pMnode = pReq->info.node;
1,416✔
1376
  SStreamObj *pStream = NULL;
1,416✔
1377
  int32_t     code = 0;
1,416✔
1378

1379
  SMDropStreamReq dropReq = {0};
1,416✔
1380
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
1,416!
1381
    mError("invalid drop stream msg recv, discarded");
×
1382
    code = TSDB_CODE_INVALID_MSG;
×
1383
    TAOS_RETURN(code);
×
1384
  }
1385

1386
  mDebug("recv drop stream:%s msg", dropReq.name);
1,416✔
1387

1388
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
1,416✔
1389
  if (pStream == NULL || code != 0) {
1,416!
1390
    if (dropReq.igNotExists) {
141✔
1391
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
131!
1392
      sdbRelease(pMnode->pSdb, pStream);
131✔
1393
      tFreeMDropStreamReq(&dropReq);
131✔
1394
      return 0;
131✔
1395
    } else {
1396
      mError("stream:%s not exist failed to drop it", dropReq.name);
10!
1397
      tFreeMDropStreamReq(&dropReq);
10✔
1398
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
10✔
1399
    }
1400
  }
1401

1402
  if (pStream->smaId != 0) {
1,275✔
1403
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
219!
1404

1405
    void    *pIter = NULL;
219✔
1406
    SSmaObj *pSma = NULL;
219✔
1407
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
219✔
1408
    while (pIter) {
361✔
1409
      if (pSma && pSma->uid == pStream->smaId) {
147!
1410
        sdbRelease(pMnode->pSdb, pSma);
5✔
1411
        sdbRelease(pMnode->pSdb, pStream);
5✔
1412

1413
        sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1414
        tFreeMDropStreamReq(&dropReq);
5✔
1415
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
5✔
1416

1417
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
5!
1418
               dropReq.name, pStream->uid, tstrerror(terrno));
1419
        TAOS_RETURN(code);
5✔
1420
      }
1421

1422
      if (pSma) {
142!
1423
        sdbRelease(pMnode->pSdb, pSma);
142✔
1424
      }
1425

1426
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
142✔
1427
    }
1428
  }
1429

1430
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
1,270!
1431
    sdbRelease(pMnode->pSdb, pStream);
×
1432
    tFreeMDropStreamReq(&dropReq);
×
1433
    return -1;
×
1434
  }
1435

1436
  // check if it is conflict with other trans in both sourceDb and targetDb.
1437
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
1,270✔
1438
  if (code) {
1,270!
1439
    sdbRelease(pMnode->pSdb, pStream);
×
1440
    tFreeMDropStreamReq(&dropReq);
×
1441
    return code;
×
1442
  }
1443

1444
  STrans *pTrans = NULL;
1,270✔
1445
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
1,270✔
1446
  if (pTrans == NULL || code) {
1,270!
1447
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1448
    sdbRelease(pMnode->pSdb, pStream);
×
1449
    tFreeMDropStreamReq(&dropReq);
×
1450
    TAOS_RETURN(code);
×
1451
  }
1452

1453
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
1,270✔
1454
  if (code) {
1,270!
1455
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1456
    sdbRelease(pMnode->pSdb, pStream);
×
1457
    mndTransDrop(pTrans);
×
1458
    tFreeMDropStreamReq(&dropReq);
×
1459
    TAOS_RETURN(code);
×
1460
  }
1461

1462
  // drop all tasks
1463
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
1,270✔
1464
  if (code) {
1,270!
1465
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1466
    sdbRelease(pMnode->pSdb, pStream);
×
1467
    mndTransDrop(pTrans);
×
1468
    tFreeMDropStreamReq(&dropReq);
×
1469
    TAOS_RETURN(code);
×
1470
  }
1471

1472
  // drop stream
1473
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
1,270✔
1474
  if (code) {
1,270!
1475
    sdbRelease(pMnode->pSdb, pStream);
×
1476
    mndTransDrop(pTrans);
×
1477
    tFreeMDropStreamReq(&dropReq);
×
1478
    TAOS_RETURN(code);
×
1479
  }
1480

1481
  code = mndTransPrepare(pMnode, pTrans);
1,270✔
1482
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,270!
1483
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1484
    sdbRelease(pMnode->pSdb, pStream);
×
1485
    mndTransDrop(pTrans);
×
1486
    tFreeMDropStreamReq(&dropReq);
×
1487
    TAOS_RETURN(code);
×
1488
  }
1489

1490
  // kill the related checkpoint trans
1491
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
1,270✔
1492
  if (transId != 0) {
1,270!
1493
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
1494
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1495
  }
1496

1497
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
1,270✔
1498
         pStream->uid, transId);
1499

1500
  removeStreamTasksInBuf(pStream, &execInfo);
1,270✔
1501

1502
  SName name = {0};
1,270✔
1503
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,270✔
1504
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
1,270✔
1505

1506
  sdbRelease(pMnode->pSdb, pStream);
1,270✔
1507
  mndTransDrop(pTrans);
1,270✔
1508
  tFreeMDropStreamReq(&dropReq);
1,270✔
1509

1510
  if (code == 0) {
1,270✔
1511
    return TSDB_CODE_ACTION_IN_PROGRESS;
1,260✔
1512
  } else {
1513
    TAOS_RETURN(code);
10✔
1514
  }
1515
}
1516

1517
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,883✔
1518
  SSdb   *pSdb = pMnode->pSdb;
1,883✔
1519
  void   *pIter = NULL;
1,883✔
1520
  int32_t code = 0;
1,883✔
1521

1522
  while (1) {
568✔
1523
    SStreamObj *pStream = NULL;
2,451✔
1524
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,451✔
1525
    if (pIter == NULL) break;
2,451✔
1526

1527
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
569✔
1528
      if (pStream->sourceDbUid != pStream->targetDbUid) {
83✔
1529
        sdbRelease(pSdb, pStream);
1✔
1530
        sdbCancelFetch(pSdb, pIter);
1✔
1531
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1532
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1533
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1534
      } else {
1535
        // kill the related checkpoint trans
1536
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
82✔
1537
        if (transId != 0) {
82!
1538
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
1539
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1540
        }
1541

1542
        // drop the stream obj in execInfo
1543
        removeStreamTasksInBuf(pStream, &execInfo);
82✔
1544

1545
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
82✔
1546
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
82!
1547
          sdbRelease(pSdb, pStream);
×
1548
          sdbCancelFetch(pSdb, pIter);
×
1549
          return code;
×
1550
        }
1551
      }
1552
    }
1553

1554
    sdbRelease(pSdb, pStream);
568✔
1555
  }
1556

1557
  return 0;
1,882✔
1558
}
1559

1560
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,258✔
1561
  SMnode     *pMnode = pReq->info.node;
11,258✔
1562
  SSdb       *pSdb = pMnode->pSdb;
11,258✔
1563
  int32_t     numOfRows = 0;
11,258✔
1564
  SStreamObj *pStream = NULL;
11,258✔
1565
  int32_t     code = 0;
11,258✔
1566

1567
  while (numOfRows < rows) {
44,458!
1568
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
44,458✔
1569
    if (pShow->pIter == NULL) break;
44,468✔
1570

1571
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
33,199✔
1572
    if (code == 0) {
33,067!
1573
      numOfRows++;
33,067✔
1574
    }
1575
    sdbRelease(pSdb, pStream);
33,067✔
1576
  }
1577

1578
  pShow->numOfRows += numOfRows;
11,269✔
1579
  return numOfRows;
11,269✔
1580
}
1581

1582
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1583
  SSdb *pSdb = pMnode->pSdb;
×
1584
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1585
}
×
1586

1587
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
20,770✔
1588
  SMnode     *pMnode = pReq->info.node;
20,770✔
1589
  SSdb       *pSdb = pMnode->pSdb;
20,770✔
1590
  int32_t     numOfRows = 0;
20,770✔
1591
  SStreamObj *pStream = NULL;
20,770✔
1592
  int32_t     code = 0;
20,770✔
1593

1594
  streamMutexLock(&execInfo.lock);
20,770✔
1595
  mndInitStreamExecInfo(pMnode, &execInfo);
20,782✔
1596
  streamMutexUnlock(&execInfo.lock);
20,782✔
1597

1598
  while (numOfRows < rowsCapacity) {
83,727✔
1599
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
83,677✔
1600
    if (pShow->pIter == NULL) {
83,666✔
1601
      break;
20,732✔
1602
    }
1603

1604
    // lock
1605
    taosRLockLatch(&pStream->lock);
62,934✔
1606

1607
    int32_t count = mndGetNumOfStreamTasks(pStream);
62,946✔
1608
    if (numOfRows + count > rowsCapacity) {
62,829✔
1609
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
40✔
1610
      if (code) {
40!
1611
        mError("failed to prepare the result block buffer, quit return value");
×
1612
        taosRUnLockLatch(&pStream->lock);
×
1613
        sdbRelease(pSdb, pStream);
×
1614
        continue;
×
1615
      }
1616
    }
1617

1618
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
62,829✔
1619
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
62,829✔
1620
    if (pSourceDb != NULL) {
62,912!
1621
      precision = pSourceDb->cfg.precision;
62,918✔
1622
      mndReleaseDb(pMnode, pSourceDb);
62,918✔
1623
    }
1624

1625
    // add row for each task
1626
    SStreamTaskIter *pIter = NULL;
62,937✔
1627
    code = createStreamTaskIter(pStream, &pIter);
62,937✔
1628
    if (code) {
62,943✔
1629
      taosRUnLockLatch(&pStream->lock);
1✔
1630
      sdbRelease(pSdb, pStream);
×
1631
      mError("failed to create task iter for stream:%s", pStream->name);
×
1632
      continue;
×
1633
    }
1634

1635
    while (streamTaskIterNextTask(pIter)) {
278,706✔
1636
      SStreamTask *pTask = NULL;
215,914✔
1637
      code = streamTaskIterGetCurrent(pIter, &pTask);
215,914✔
1638
      if (code) {
215,962!
1639
        destroyStreamTaskIter(pIter);
×
1640
        break;
×
1641
      }
1642

1643
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
215,962✔
1644
      if (code == TSDB_CODE_SUCCESS) {
215,764!
1645
        numOfRows++;
215,768✔
1646
      }
1647
    }
1648

1649
    pBlock->info.rows = numOfRows;
62,266✔
1650

1651
    destroyStreamTaskIter(pIter);
62,266✔
1652
    taosRUnLockLatch(&pStream->lock);
62,922✔
1653

1654
    sdbRelease(pSdb, pStream);
62,938✔
1655
  }
1656

1657
  pShow->numOfRows += numOfRows;
20,782✔
1658
  return numOfRows;
20,782✔
1659
}
1660

1661
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
1662
  SSdb *pSdb = pMnode->pSdb;
×
1663
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1664
}
×
1665

1666
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
734✔
1667
  SMnode     *pMnode = pReq->info.node;
734✔
1668
  SStreamObj *pStream = NULL;
734✔
1669
  int32_t     code = 0;
734✔
1670

1671
  SMPauseStreamReq pauseReq = {0};
734✔
1672
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
734!
1673
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1674
  }
1675

1676
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
734✔
1677
  if (pStream == NULL || code != 0) {
734!
1678
    if (pauseReq.igNotExists) {
401✔
1679
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
148!
1680
      return 0;
148✔
1681
    } else {
1682
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
253!
1683
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
253✔
1684
    }
1685
  }
1686

1687
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
333!
1688

1689
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
333!
1690
    sdbRelease(pMnode->pSdb, pStream);
×
1691
    return code;
×
1692
  }
1693

1694
  // check if it is conflict with other trans in both sourceDb and targetDb.
1695
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
333✔
1696
  if (code) {
333!
1697
    sdbRelease(pMnode->pSdb, pStream);
×
1698
    TAOS_RETURN(code);
×
1699
  }
1700

1701
  bool updated = mndStreamNodeIsUpdated(pMnode);
333✔
1702
  if (updated) {
333!
1703
    mError("tasks are not ready for pause, node update detected");
×
1704
    sdbRelease(pMnode->pSdb, pStream);
×
1705
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1706
  }
1707

1708
  {  // check for tasks, if tasks are not ready, not allowed to pause
1709
    bool found = false;
333✔
1710
    bool readyToPause = true;
333✔
1711
    streamMutexLock(&execInfo.lock);
333✔
1712

1713
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
4,869✔
1714
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
4,536✔
1715
      if (p == NULL) {
4,536!
1716
        continue;
×
1717
      }
1718

1719
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
4,536✔
1720
      if (pEntry == NULL) {
4,536!
1721
        continue;
×
1722
      }
1723

1724
      if (pEntry->id.streamId != pStream->uid) {
4,536✔
1725
        continue;
2,955✔
1726
      }
1727

1728
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
1,581!
1729
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
152!
1730
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1731
        readyToPause = false;
152✔
1732
      }
1733

1734
      found = true;
1,581✔
1735
    }
1736

1737
    streamMutexUnlock(&execInfo.lock);
333✔
1738
    if (!found) {
333!
1739
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1740
      sdbRelease(pMnode->pSdb, pStream);
×
1741
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1742
    }
1743

1744
    if (!readyToPause) {
333✔
1745
      mError("stream:%s task not ready for pause yet", pauseReq.name);
45!
1746
      sdbRelease(pMnode->pSdb, pStream);
45✔
1747
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
45✔
1748
    }
1749
  }
1750

1751
  STrans *pTrans = NULL;
288✔
1752
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
288✔
1753
  if (pTrans == NULL || code) {
288!
1754
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1755
    sdbRelease(pMnode->pSdb, pStream);
×
1756
    return code;
×
1757
  }
1758

1759
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
288✔
1760
  if (code) {
288!
1761
    sdbRelease(pMnode->pSdb, pStream);
×
1762
    mndTransDrop(pTrans);
×
1763
    return code;
×
1764
  }
1765

1766
  // if nodeUpdate happened, not send pause trans
1767
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
288✔
1768
  if (code) {
288!
1769
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1770
    sdbRelease(pMnode->pSdb, pStream);
×
1771
    mndTransDrop(pTrans);
×
1772
    return code;
×
1773
  }
1774

1775
  // pause stream
1776
  taosWLockLatch(&pStream->lock);
288✔
1777
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
288✔
1778
  if (code) {
288!
1779
    taosWUnLockLatch(&pStream->lock);
×
1780
    sdbRelease(pMnode->pSdb, pStream);
×
1781
    mndTransDrop(pTrans);
×
1782
    return code;
×
1783
  }
1784

1785
  taosWUnLockLatch(&pStream->lock);
288✔
1786

1787
  code = mndTransPrepare(pMnode, pTrans);
288✔
1788
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
288!
1789
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1790
    sdbRelease(pMnode->pSdb, pStream);
×
1791
    mndTransDrop(pTrans);
×
1792
    return code;
×
1793
  }
1794

1795
  sdbRelease(pMnode->pSdb, pStream);
288✔
1796
  mndTransDrop(pTrans);
288✔
1797

1798
  return TSDB_CODE_ACTION_IN_PROGRESS;
288✔
1799
}
1800

1801
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
835✔
1802
  SMnode     *pMnode = pReq->info.node;
835✔
1803
  SStreamObj *pStream = NULL;
835✔
1804
  int32_t     code = 0;
835✔
1805

1806
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
835!
1807
    return code;
×
1808
  }
1809

1810
  SMResumeStreamReq resumeReq = {0};
835✔
1811
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
835!
1812
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1813
  }
1814

1815
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
835✔
1816
  if (pStream == NULL || code != 0) {
835!
1817
    if (resumeReq.igNotExists) {
296✔
1818
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
295!
1819
      sdbRelease(pMnode->pSdb, pStream);
295✔
1820
      return 0;
295✔
1821
    } else {
1822
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1823
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1824
    }
1825
  }
1826

1827
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
539!
1828
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
539!
1829
    sdbRelease(pMnode->pSdb, pStream);
×
1830
    return -1;
×
1831
  }
1832

1833
  // check if it is conflict with other trans in both sourceDb and targetDb.
1834
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
539✔
1835
  if (code) {
539!
1836
    sdbRelease(pMnode->pSdb, pStream);
×
1837
    return code;
×
1838
  }
1839

1840
  STrans *pTrans = NULL;
539✔
1841
  code =
1842
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
539✔
1843
  if (pTrans == NULL || code) {
539!
1844
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
1845
    sdbRelease(pMnode->pSdb, pStream);
×
1846
    return code;
×
1847
  }
1848

1849
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
539✔
1850
  if (code) {
539!
1851
    sdbRelease(pMnode->pSdb, pStream);
×
1852
    mndTransDrop(pTrans);
×
1853
    return code;
×
1854
  }
1855

1856
  // set the resume action
1857
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
539✔
1858
  if (code) {
539!
1859
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
1860
    sdbRelease(pMnode->pSdb, pStream);
×
1861
    mndTransDrop(pTrans);
×
1862
    return code;
×
1863
  }
1864

1865
  // resume stream
1866
  taosWLockLatch(&pStream->lock);
539✔
1867
  pStream->status = STREAM_STATUS__NORMAL;
539✔
1868
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
539!
1869
    taosWUnLockLatch(&pStream->lock);
×
1870

1871
    sdbRelease(pMnode->pSdb, pStream);
×
1872
    mndTransDrop(pTrans);
×
1873
    return code;
×
1874
  }
1875

1876
  taosWUnLockLatch(&pStream->lock);
539✔
1877
  code = mndTransPrepare(pMnode, pTrans);
539✔
1878
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
539!
1879
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1880
    sdbRelease(pMnode->pSdb, pStream);
×
1881
    mndTransDrop(pTrans);
×
1882
    return code;
×
1883
  }
1884

1885
  sdbRelease(pMnode->pSdb, pStream);
539✔
1886
  mndTransDrop(pTrans);
539✔
1887

1888
  return TSDB_CODE_ACTION_IN_PROGRESS;
539✔
1889
}
1890

1891
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
1892
  SMnode     *pMnode = pReq->info.node;
×
1893
  SStreamObj *pStream = NULL;
×
1894
  int32_t     code = 0;
×
1895

1896
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
1897
    return code;
×
1898
  }
1899

1900
  SMResetStreamReq resetReq = {0};
×
1901
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
1902
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1903
  }
1904

1905
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
1906

1907
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
1908
  if (pStream == NULL || code != 0) {
×
1909
    if (resetReq.igNotExists) {
×
1910
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
1911
      return 0;
×
1912
    } else {
1913
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
1914
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1915
    }
1916
  }
1917

1918
  //todo(liao hao jun)
1919
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1920
}
1921

1922
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
12✔
1923
  SSdb       *pSdb = pMnode->pSdb;
12✔
1924
  SStreamObj *pStream = NULL;
12✔
1925
  void       *pIter = NULL;
12✔
1926
  STrans     *pTrans = NULL;
12✔
1927
  int32_t     code = 0;
12✔
1928

1929
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
1930
  while (1) {
1931
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
24✔
1932
    if (pIter == NULL) {
24✔
1933
      break;
12✔
1934
    }
1935

1936
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
12✔
1937
    sdbRelease(pSdb, pStream);
12✔
1938

1939
    if (code) {
12!
1940
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
1941
      sdbCancelFetch(pSdb, pIter);
×
1942
      return code;
×
1943
    }
1944
  }
1945

1946
  while (1) {
1947
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
24✔
1948
    if (pIter == NULL) {
24✔
1949
      break;
12✔
1950
    }
1951

1952
    // here create only one trans
1953
    if (pTrans == NULL) {
12!
1954
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
12✔
1955
                           "update task epsets", &pTrans);
1956
      if (pTrans == NULL || code) {
12!
1957
        sdbRelease(pSdb, pStream);
×
1958
        sdbCancelFetch(pSdb, pIter);
×
1959
        return terrno = code;
×
1960
      }
1961
    }
1962

1963
    if (!includeAllNodes) {
12!
1964
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
12✔
1965
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
12✔
1966
      if (p1 == NULL && p2 == NULL) {
12!
1967
        mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
×
1968
        sdbRelease(pSdb, pStream);
×
1969
        continue;
×
1970
      }
1971
    }
1972

1973
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
12✔
1974
           pStream->name, pTrans->id);
1975

1976
    // NOTE: for each stream, we register one trans entry for task update
1977
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
12✔
1978
    if (code) {
12!
1979
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
1980
    }
1981

1982
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
12✔
1983

1984
    // todo: not continue, drop all and retry again
1985
    if (code != TSDB_CODE_SUCCESS) {
12!
1986
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
1987
             tstrerror(code));
1988
      sdbRelease(pSdb, pStream);
×
1989
      continue;
×
1990
    }
1991

1992
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
12✔
1993
    sdbRelease(pSdb, pStream);
12✔
1994

1995
    if (code != TSDB_CODE_SUCCESS) {
12!
1996
      sdbCancelFetch(pSdb, pIter);
×
1997
      return code;
×
1998
    }
1999
  }
2000

2001
  // no need to build the trans to handle the vgroup update
2002
  if (pTrans == NULL) {
12!
2003
    return 0;
×
2004
  }
2005

2006
  code = mndTransPrepare(pMnode, pTrans);
12✔
2007
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
12!
2008
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2009
    sdbRelease(pMnode->pSdb, pStream);
×
2010
    mndTransDrop(pTrans);
×
2011
    return code;
×
2012
  }
2013

2014
  sdbRelease(pMnode->pSdb, pStream);
12✔
2015
  mndTransDrop(pTrans);
12✔
2016
  return code;
12✔
2017
}
2018

2019
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
739✔
2020
  SSdb       *pSdb = pMnode->pSdb;
739✔
2021
  SStreamObj *pStream = NULL;
739✔
2022
  void       *pIter = NULL;
739✔
2023
  int32_t     code = 0;
739✔
2024

2025
  mDebug("start to refresh node list by existed streams");
739✔
2026

2027
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
739✔
2028
  if (pHash == NULL) {
739!
2029
    return terrno;
×
2030
  }
2031

2032
  while (1) {
12✔
2033
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
751✔
2034
    if (pIter == NULL) {
751✔
2035
      break;
739✔
2036
    }
2037

2038
    taosWLockLatch(&pStream->lock);
12✔
2039

2040
    SStreamTaskIter *pTaskIter = NULL;
12✔
2041
    code = createStreamTaskIter(pStream, &pTaskIter);
12✔
2042
    if (code) {
12!
2043
      taosWUnLockLatch(&pStream->lock);
×
2044
      sdbRelease(pSdb, pStream);
×
2045
      mError("failed to create task iter for stream:%s", pStream->name);
×
2046
      continue;
×
2047
    }
2048

2049
    while (streamTaskIterNextTask(pTaskIter)) {
84✔
2050
      SStreamTask *pTask = NULL;
72✔
2051
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
72✔
2052
      if (code) {
72!
2053
        break;
×
2054
      }
2055

2056
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
72✔
2057
      epsetAssign(&entry.epset, &pTask->info.epSet);
72✔
2058
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
72✔
2059
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
72!
2060
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2061
      }
2062
    }
2063

2064
    destroyStreamTaskIter(pTaskIter);
12✔
2065
    taosWUnLockLatch(&pStream->lock);
12✔
2066

2067
    sdbRelease(pSdb, pStream);
12✔
2068
  }
2069

2070
  taosArrayClear(pNodeList);
739✔
2071

2072
  // convert to list
2073
  pIter = NULL;
739✔
2074
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
776✔
2075
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
37✔
2076

2077
    void *p = taosArrayPush(pNodeList, pEntry);
37✔
2078
    if (p == NULL) {
37!
2079
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2080
      if (code == 0) {
×
2081
        code = terrno;
×
2082
      }
2083
      continue;
×
2084
    }
2085

2086
    char    buf[256] = {0};
37✔
2087
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
37✔
2088
    if (ret != 0) {                                                // print error and continue
37!
2089
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2090
    }
2091

2092
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
37✔
2093
  }
2094

2095
  taosHashCleanup(pHash);
739✔
2096

2097
  mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
739✔
2098
  return code;
739✔
2099
}
2100

2101
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2102
  void   *pIter = NULL;
×
2103
  int32_t code = 0;
×
2104
  while (1) {
×
2105
    SVgObj *pVgroup = NULL;
×
2106
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2107
    if (pIter == NULL) {
×
2108
      break;
×
2109
    }
2110

2111
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2112
    sdbRelease(pSdb, pVgroup);
×
2113

2114
    if (code == 0) {
×
2115
      int32_t size = taosHashGetSize(pDBMap);
×
2116
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2117
    }
2118
  }
2119
}
×
2120

2121
// this function runs by only one thread, so it is not multi-thread safe
2122
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
1,499✔
2123
  int32_t code = 0;
1,499✔
2124
  bool    allReady = true;
1,499✔
2125
  SArray *pNodeSnapshot = NULL;
1,499✔
2126
  SMnode *pMnode = pMsg->info.node;
1,499✔
2127
  int64_t ts = taosGetTimestampSec();
1,499✔
2128
  bool    updateAllVgroups = false;
1,499✔
2129

2130
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
1,499✔
2131
  if (old != 0) {
1,499!
2132
    mDebug("still in checking node change");
×
2133
    return 0;
×
2134
  }
2135

2136
  mDebug("start to do node changing check");
1,499✔
2137

2138
  streamMutexLock(&execInfo.lock);
1,499✔
2139
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,499✔
2140
  streamMutexUnlock(&execInfo.lock);
1,499✔
2141

2142
  if (numOfNodes == 0) {
1,499!
2143
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2144
    execInfo.ts = ts;
×
2145
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2146
    return 0;
×
2147
  }
2148

2149
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
1,499✔
2150
  if (code) {
1,499!
2151
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2152
  }
2153

2154
  if (!allReady) {
1,499✔
2155
    taosArrayDestroy(pNodeSnapshot);
34✔
2156
    atomic_store_32(&mndNodeCheckSentinel, 0);
34✔
2157
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
34!
2158
    return 0;
34✔
2159
  }
2160

2161
  streamMutexLock(&execInfo.lock);
1,465✔
2162

2163
  code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
1,465✔
2164
  if (code) {
1,465!
2165
    goto _end;
×
2166
  }
2167

2168
  SVgroupChangeInfo changeInfo = {0};
1,465✔
2169
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
1,465✔
2170
  if (code) {
1,465!
2171
    goto _end;
×
2172
  }
2173

2174
  {
2175
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
1,465!
2176
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2177
      updateAllVgroups = true;
×
2178
      execInfo.switchFromFollower = false;  // reset the flag
×
2179
      addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
×
2180
    }
2181
  }
2182

2183
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
1,465!
2184
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2185
    killAllCheckpointTrans(pMnode, &changeInfo);
12✔
2186
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
12✔
2187

2188
    // keep the new vnode snapshot if success
2189
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
12!
2190
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
12✔
2191
      if (code) {
12!
2192
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
2193
        goto _end;
×
2194
      }
2195

2196
      execInfo.ts = ts;
12✔
2197
      mDebug("create trans successfully, update cached node list, numOfNodes:%d",
12✔
2198
             (int)taosArrayGetSize(execInfo.pNodeList));
2199
    } else {
2200
      mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
×
2201
    }
2202
  } else {
2203
    mDebug("no update found in nodeList");
1,453✔
2204
  }
2205

2206
  mndDestroyVgroupChangeInfo(&changeInfo);
1,465✔
2207

2208
_end:
1,465✔
2209
  streamMutexUnlock(&execInfo.lock);
1,465✔
2210
  taosArrayDestroy(pNodeSnapshot);
1,465✔
2211

2212
  mDebug("end to do stream task node change checking");
1,465✔
2213
  atomic_store_32(&mndNodeCheckSentinel, 0);
1,465✔
2214
  return 0;
1,465✔
2215
}
2216

2217
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
2,803✔
2218
  SMnode *pMnode = pReq->info.node;
2,803✔
2219
  SSdb   *pSdb = pMnode->pSdb;
2,803✔
2220
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
2,803✔
2221
    return 0;
1,304✔
2222
  }
2223

2224
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
1,499✔
2225
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
1,499✔
2226
  if (pMsg == NULL) {
1,499!
2227
    return terrno;
×
2228
  }
2229

2230
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
1,499✔
2231
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1,499✔
2232
}
2233

2234
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
2,006✔
2235
  SStreamTaskIter *pIter = NULL;
2,006✔
2236
  int32_t          code = createStreamTaskIter(pStream, &pIter);
2,006✔
2237
  if (code) {
2,006!
2238
    mError("failed to create task iter for stream:%s", pStream->name);
×
2239
    return;
×
2240
  }
2241

2242
  while (streamTaskIterNextTask(pIter)) {
12,042✔
2243
    SStreamTask *pTask = NULL;
10,036✔
2244
    code = streamTaskIterGetCurrent(pIter, &pTask);
10,036✔
2245
    if (code) {
10,036!
2246
      break;
×
2247
    }
2248

2249
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
10,036✔
2250
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
10,036✔
2251
    if (p == NULL) {
10,036✔
2252
      STaskStatusEntry entry = {0};
9,233✔
2253
      streamTaskStatusInit(&entry, pTask);
9,233✔
2254

2255
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
9,233✔
2256
      if (code == 0) {
9,233!
2257
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
9,233✔
2258
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
9,233✔
2259
        if (px) {
9,233!
2260
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
9,233!
2261
        } else {
2262
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2263
        }
2264
      } else {
2265
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2266
      }
2267

2268
      // add the new vgroups if not added yet
2269
      bool exist = false;
9,233✔
2270
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
49,761✔
2271
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
47,904✔
2272
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
47,904!
2273
          exist = true;
7,376✔
2274
          break;
7,376✔
2275
        }
2276
      }
2277

2278
      if (!exist) {
9,233✔
2279
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
1,857✔
2280
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
1,857✔
2281

2282
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
1,857✔
2283
        if (px) {
1,857!
2284
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
1,857!
2285
        } else {
2286
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2287
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2288
        }
2289
      }
2290
    }
2291
  }
2292

2293
  destroyStreamTaskIter(pIter);
2,006✔
2294
}
2295

2296
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
4,432✔
2297
  int32_t num = taosArrayGetSize(pList);
4,432✔
2298
  for (int32_t i = 0; i < num; ++i) {
16,390✔
2299
    int32_t *pId = taosArrayGet(pList, i);
11,966✔
2300
    if (pId == NULL) {
11,966!
2301
      continue;
×
2302
    }
2303

2304
    if (taskId == *pId) {
11,966✔
2305
      return;
8✔
2306
    }
2307
  }
2308

2309
  int32_t numOfTasks = taosArrayGetSize(pList);
4,424✔
2310
  void   *p = taosArrayPush(pList, &taskId);
4,424✔
2311
  if (p) {
4,424!
2312
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
4,424✔
2313
  } else {
2314
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2315
           uid, numOfTasks);
2316
  }
2317
}
2318

2319
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
4,432✔
2320
  SMnode                  *pMnode = pReq->info.node;
4,432✔
2321
  SStreamTaskCheckpointReq req = {0};
4,432✔
2322

2323
  SDecoder decoder = {0};
4,432✔
2324
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,432✔
2325

2326
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
4,432!
2327
    tDecoderClear(&decoder);
×
2328
    mError("invalid task checkpoint req msg received");
×
2329
    return TSDB_CODE_INVALID_MSG;
×
2330
  }
2331
  tDecoderClear(&decoder);
4,432✔
2332

2333
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
4,432✔
2334

2335
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2336
  streamMutexLock(&execInfo.lock);
4,432✔
2337

2338
  SStreamObj *pStream = NULL;
4,432✔
2339
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,432✔
2340
  if (pStream == NULL || code != 0) {
4,432!
2341
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2342
          req.streamId);
2343

2344
    // not in meta-store yet, try to acquire the task in exec buffer
2345
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2346
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2347
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2348
    if (p == NULL) {
×
2349
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2350
      streamMutexUnlock(&execInfo.lock);
×
2351
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2352
    } else {
2353
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2354
             req.streamId, req.taskId);
2355
    }
2356
  }
2357

2358
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,432!
2359

2360
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
4,432✔
2361
  if (pReqTaskList == NULL) {
4,432✔
2362
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
775✔
2363
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
775✔
2364
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
775✔
2365
    if (code) {
775!
2366
      mError("failed to put into transfer state stream map, code: out of memory");
×
2367
    }
2368
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
775✔
2369
  } else {
2370
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
3,657✔
2371
  }
2372

2373
  int32_t total = taosArrayGetSize(*pReqTaskList);
4,432✔
2374
  if (total == numOfTasks) {  // all tasks have sent the reqs
4,432✔
2375
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
742✔
2376
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
742!
2377

2378
    if (pStream != NULL) {  // TODO:handle error
742!
2379
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
742✔
2380
      if (code) {
742!
2381
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
742!
2382
      }
2383
    } else {
2384
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2385
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2386
      // sleep(500ms)
2387
    }
2388

2389
    // remove this entry
2390
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
742✔
2391

2392
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
742✔
2393
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
742✔
2394
  }
2395

2396
  if (pStream != NULL) {
4,432!
2397
    mndReleaseStream(pMnode, pStream);
4,432✔
2398
  }
2399

2400
  streamMutexUnlock(&execInfo.lock);
4,432✔
2401

2402
  {
2403
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
4,432✔
2404
    rsp.pCont = rpcMallocCont(rsp.contLen);
4,432✔
2405
    if (rsp.pCont == NULL) {
4,432!
2406
      return terrno;
×
2407
    }
2408

2409
    SMsgHead *pHead = rsp.pCont;
4,432✔
2410
    pHead->vgId = htonl(req.nodeId);
4,432✔
2411

2412
    tmsgSendRsp(&rsp);
4,432✔
2413
    pReq->info.handle = NULL;  // disable auto rsp
4,432✔
2414
  }
2415

2416
  return 0;
4,432✔
2417
}
2418

2419
// valid the info according to the HbMsg
2420
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
6,214✔
2421
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
6,214✔
2422
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
6,214✔
2423
  if (pTaskEntry == NULL) {
6,214✔
2424
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
24!
2425
    return false;
24✔
2426
  }
2427

2428
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
6,190!
2429
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2430
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2431
    return false;
×
2432
  }
2433

2434
  // now the task in checkpoint procedure
2435
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
6,190!
2436
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2437
           " discard",
2438
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2439
    return false;
×
2440
  }
2441

2442
  if (reportChkptId >= pReport->checkpointId) {
6,190!
2443
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2444
           " discard",
2445
           pReport->taskId, pReport->checkpointId, reportChkptId);
2446
    return false;
×
2447
  }
2448

2449
  return true;
6,190✔
2450
}
2451

2452
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) {
6,214✔
2453
  bool valid = validateChkptReport(pReport, reportChkptId);
6,214✔
2454
  if (!valid) {
6,214✔
2455
    return;
24✔
2456
  }
2457

2458
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
20,967✔
2459
    STaskChkptInfo *p = taosArrayGet(pList, i);
14,777✔
2460
    if (p == NULL) {
14,777!
2461
      continue;
×
2462
    }
2463

2464
    if (p->taskId == pReport->taskId) {
14,777!
2465
      if (p->checkpointId > pReport->checkpointId) {
×
2466
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2467
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2468
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2469
        mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2470
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2471

2472
        // update the checkpoint report info
2473
        p->checkpointId = pReport->checkpointId;
×
2474
        p->ts = pReport->checkpointTs;
×
2475
        p->version = pReport->checkpointVer;
×
2476
        p->transId = pReport->transId;
×
2477
        p->dropHTask = pReport->dropHTask;
×
2478
      } else {
2479
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2480
      }
2481
      return;
×
2482
    }
2483
  }
2484

2485
  STaskChkptInfo info = {
6,190✔
2486
      .streamId = pReport->streamId,
6,190✔
2487
      .taskId = pReport->taskId,
6,190✔
2488
      .transId = pReport->transId,
6,190✔
2489
      .dropHTask = pReport->dropHTask,
6,190✔
2490
      .version = pReport->checkpointVer,
6,190✔
2491
      .ts = pReport->checkpointTs,
6,190✔
2492
      .checkpointId = pReport->checkpointId,
6,190✔
2493
      .nodeId = pReport->nodeId,
6,190✔
2494
  };
2495

2496
  void *p = taosArrayPush(pList, &info);
6,190✔
2497
  if (p == NULL) {
6,190!
2498
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2499
  } else {
2500
    int32_t size = taosArrayGetSize(pList);
6,190✔
2501
    mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size);
6,190✔
2502
  }
2503
}
2504

2505
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
6,214✔
2506
  SMnode           *pMnode = pReq->info.node;
6,214✔
2507
  SCheckpointReport req = {0};
6,214✔
2508

2509
  SDecoder decoder = {0};
6,214✔
2510
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
6,214✔
2511

2512
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
6,214!
2513
    tDecoderClear(&decoder);
×
2514
    mError("invalid task checkpoint-report msg received");
×
2515
    return TSDB_CODE_INVALID_MSG;
×
2516
  }
2517
  tDecoderClear(&decoder);
6,214✔
2518

2519
  streamMutexLock(&execInfo.lock);
6,214✔
2520
  mndInitStreamExecInfo(pMnode, &execInfo);
6,214✔
2521
  streamMutexUnlock(&execInfo.lock);
6,214✔
2522

2523
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
6,214✔
2524
         " checkpointVer:%" PRId64 " transId:%d",
2525
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2526

2527
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2528
  streamMutexLock(&execInfo.lock);
6,214✔
2529

2530
  SStreamObj *pStream = NULL;
6,214✔
2531
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
6,214✔
2532
  if (pStream == NULL || code != 0) {
6,214!
2533
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2534

2535
    // not in meta-store yet, try to acquire the task in exec buffer
2536
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2537
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2538
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2539
    if (p == NULL) {
×
2540
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2541
      streamMutexUnlock(&execInfo.lock);
×
2542
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2543
    } else {
2544
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2545
             req.streamId, req.taskId);
2546
    }
2547
  }
2548

2549
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
6,214!
2550

2551
  SChkptReportInfo *pInfo =
2552
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
6,214✔
2553
  if (pInfo == NULL) {
6,214✔
2554
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
728✔
2555
    if (info.pTaskList != NULL) {
728!
2556
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
728✔
2557
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
728✔
2558
      if (code) {
728!
2559
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2560
      }
2561

2562
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
728✔
2563
    }
2564
  } else {
2565
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
5,486✔
2566
  }
2567

2568
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
6,214✔
2569
  if (total == numOfTasks) {  // all tasks has send the reqs
6,214✔
2570
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
1,305!
2571
          " will be issued soon",
2572
          req.streamId, pStream->name, total, req.checkpointId);
2573
  }
2574

2575
  if (pStream != NULL) {
6,214!
2576
    mndReleaseStream(pMnode, pStream);
6,214✔
2577
  }
2578

2579
  streamMutexUnlock(&execInfo.lock);
6,214✔
2580

2581
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
6,214✔
2582
  return code;
6,214✔
2583
}
2584

2585
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
238✔
2586
  int32_t num = 0;
238✔
2587
  int64_t chkId = INT64_MAX;
238✔
2588
  *pExistedTasks = 0;
238✔
2589
  *pAllSame = true;
238✔
2590

2591
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
6,846✔
2592
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
6,608✔
2593
    if (p == NULL) {
6,608!
2594
      continue;
×
2595
    }
2596

2597
    if (p->streamId != streamId) {
6,608✔
2598
      continue;
5,060✔
2599
    }
2600

2601
    num += 1;
1,548✔
2602
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
1,548✔
2603
    if (chkId > pe->checkpointInfo.latestId) {
1,548✔
2604
      if (chkId != INT64_MAX) {
246✔
2605
        *pAllSame = false;
8✔
2606
      }
2607
      chkId = pe->checkpointInfo.latestId;
246✔
2608
    }
2609
  }
2610

2611
  *pExistedTasks = num;
238✔
2612
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
238!
2613
    return -1;
×
2614
  }
2615

2616
  return chkId;
238✔
2617
}
2618

2619
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
6,214✔
2620
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
6,214✔
2621
  rsp.pCont = rpcMallocCont(rsp.contLen);
6,214✔
2622
  if (rsp.pCont != NULL) {
6,214!
2623
    SMsgHead *pHead = rsp.pCont;
6,214✔
2624
    pHead->vgId = htonl(vgId);
6,214✔
2625

2626
    tmsgSendRsp(&rsp);
6,214✔
2627
    pInfo->handle = NULL;  // disable auto rsp
6,214✔
2628
  }
2629
}
6,214✔
2630

2631
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
12,605✔
2632
  SMnode *pMnode = pMsg->info.node;
12,605✔
2633
  int64_t now = taosGetTimestampMs();
12,605✔
2634
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
12,605✔
2635
  if (pStreamList == NULL) {
12,605!
2636
    return terrno;
×
2637
  }
2638

2639
  mDebug("start to process consensus-checkpointId in tmr");
12,605✔
2640

2641
  bool    allReady = true;
12,605✔
2642
  SArray *pNodeSnapshot = NULL;
12,605✔
2643

2644
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
12,605✔
2645
  taosArrayDestroy(pNodeSnapshot);
12,605✔
2646
  if (code) {
12,605✔
2647
    mError("failed to get the vgroup snapshot, ignore it and continue");
114!
2648
  }
2649

2650
  if (!allReady) {
12,605✔
2651
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1,324!
2652
    taosArrayDestroy(pStreamList);
1,324✔
2653
    return 0;
1,324✔
2654
  }
2655

2656
  streamMutexLock(&execInfo.lock);
11,281✔
2657

2658
  void *pIter = NULL;
11,281✔
2659
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
11,333✔
2660
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
52✔
2661

2662
    int64_t streamId = -1;
52✔
2663
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
52✔
2664
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
52✔
2665
    if (pList == NULL) {
52!
2666
      continue;
×
2667
    }
2668

2669
    SStreamObj *pStream = NULL;
52✔
2670
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
52✔
2671
    if (pStream == NULL || code != 0) {  // stream has been dropped already
52!
2672
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2673
      taosArrayDestroy(pList);
×
2674
      continue;
×
2675
    }
2676

2677
    for (int32_t j = 0; j < num; ++j) {
290✔
2678
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
238✔
2679
      if (pe == NULL) {
238!
2680
        continue;
×
2681
      }
2682

2683
      streamId = pe->req.streamId;
238✔
2684

2685
      int32_t existed = 0;
238✔
2686
      bool    allSame = true;
238✔
2687
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
238✔
2688
      if (chkId == -1) {
238!
2689
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2690
               pInfo->numOfTasks, pe->req.taskId);
2691
        break;
×
2692
      }
2693

2694
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
238✔
2695
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
232✔
2696
               pe->req.startTs, (now - pe->ts) / 1000.0);
2697
        if (chkId > pe->req.checkpointId) {
232!
2698
          streamMutexUnlock(&execInfo.lock);
×
2699
          taosArrayDestroy(pStreamList);
×
2700
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2701
                 pe->req.checkpointId, chkId);
2702
          return TSDB_CODE_FAILED;
×
2703
        }
2704
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
232✔
2705
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
232!
2706
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2707
        }
2708

2709
        void *p = taosArrayPush(pList, &pe->req.taskId);
232✔
2710
        if (p == NULL) {
232!
2711
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2712
        }
2713
        streamId = pe->req.streamId;
232✔
2714
      } else {
2715
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
6!
2716
               pe->req.startTs, (now - pe->ts) / 1000.0);
2717
      }
2718
    }
2719

2720
    mndReleaseStream(pMnode, pStream);
52✔
2721

2722
    if (taosArrayGetSize(pList) > 0) {
52✔
2723
      for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
283✔
2724
        int32_t *taskId = taosArrayGet(pList, i);
232✔
2725
        if (taskId == NULL) {
232!
2726
          continue;
×
2727
        }
2728

2729
        for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
232!
2730
          SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
232✔
2731
          if ((pe != NULL) && (pe->req.taskId == *taskId)) {
232!
2732
            taosArrayRemove(pInfo->pTaskList, k);
232✔
2733
            break;
232✔
2734
          }
2735
        }
2736
      }
2737
    }
2738

2739
    taosArrayDestroy(pList);
52✔
2740

2741
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
52✔
2742
      mndClearConsensusRspEntry(pInfo);
51✔
2743
      if (streamId == -1) {
51!
2744
        streamMutexUnlock(&execInfo.lock);
×
2745
        taosArrayDestroy(pStreamList);
×
2746
        mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
×
2747
        return TSDB_CODE_FAILED;
×
2748
      }
2749
      void *p = taosArrayPush(pStreamList, &streamId);
51✔
2750
      if (p == NULL) {
51!
2751
        mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
×
2752
      }
2753
    }
2754
  }
2755

2756
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
11,332✔
2757
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
51✔
2758
    if (pStreamId == NULL) {
51!
2759
      continue;
×
2760
    }
2761

2762
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
51✔
2763
  }
2764

2765
  streamMutexUnlock(&execInfo.lock);
11,281✔
2766

2767
  taosArrayDestroy(pStreamList);
11,281✔
2768
  mDebug("end to process consensus-checkpointId in tmr");
11,281✔
2769
  return code;
11,281✔
2770
}
2771

2772
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
259✔
2773
  int32_t code = mndProcessCreateStreamReq(pReq);
259✔
2774
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
259!
2775
    pReq->info.rsp = rpcMallocCont(1);
×
2776
    if (pReq->info.rsp == NULL) {
×
2777
      return terrno;
×
2778
    }
2779

2780
    pReq->info.rspLen = 1;
×
2781
    pReq->info.noResp = false;
×
2782
    pReq->code = code;
×
2783
  }
2784
  return code;
259✔
2785
}
2786

2787
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
224✔
2788
  int32_t code = mndProcessDropStreamReq(pReq);
224✔
2789
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
224!
2790
    pReq->info.rsp = rpcMallocCont(1);
20✔
2791
    if (pReq->info.rsp == NULL) {
20!
2792
      return terrno;
×
2793
    }
2794

2795
    pReq->info.rspLen = 1;
20✔
2796
    pReq->info.noResp = false;
20✔
2797
    pReq->code = code;
20✔
2798
  }
2799
  return code;
224✔
2800
}
2801

2802
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
56,482✔
2803
  if (pExecInfo->initTaskList || pMnode == NULL) {
56,482✔
2804
    return;
56,321✔
2805
  }
2806

2807
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
161✔
2808
  pExecInfo->initTaskList = true;
161✔
2809
}
2810

2811
void mndStreamResetInitTaskListLoadFlag() {
1,616✔
2812
  mInfo("reset task list buffer init flag for leader");
1,616!
2813
  execInfo.initTaskList = false;
1,616✔
2814
}
1,616✔
2815

2816
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
1,935✔
2817
  execInfo.switchFromFollower = false;
1,935✔
2818

2819
  if (execInfo.role == NODE_ROLE_UNINIT) {
1,935✔
2820
    execInfo.role = role;
1,752✔
2821
    if (role == NODE_ROLE_LEADER) {
1,752✔
2822
      mInfo("init mnode is set to leader");
1,565!
2823
    } else {
2824
      mInfo("init mnode is set to follower");
187!
2825
    }
2826
  } else {
2827
    if (role == NODE_ROLE_LEADER) {
183✔
2828
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
51!
2829
        execInfo.role = role;
51✔
2830
        execInfo.switchFromFollower = true;
51✔
2831
        mInfo("mnode switch to be leader from follower");
51!
2832
      } else {
2833
        mInfo("mnode remain to be leader, do nothing");
×
2834
      }
2835
    } else {  // follower's
2836
      if (execInfo.role == NODE_ROLE_LEADER) {
132✔
2837
        execInfo.role = role;
1✔
2838
        mInfo("mnode switch to be follower from leader");
1!
2839
      } else {
2840
        mInfo("mnode remain to be follower, do nothing");
131!
2841
      }
2842
    }
2843
  }
2844
}
1,935✔
2845

2846
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
161✔
2847
  SSdb       *pSdb = pMnode->pSdb;
161✔
2848
  SStreamObj *pStream = NULL;
161✔
2849
  void       *pIter = NULL;
161✔
2850

2851
  while (1) {
2852
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
428✔
2853
    if (pIter == NULL) {
428✔
2854
      break;
161✔
2855
    }
2856

2857
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
267✔
2858
    sdbRelease(pSdb, pStream);
267✔
2859
  }
2860
}
161✔
2861

2862
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
1,177✔
2863
  STrans *pTrans = NULL;
1,177✔
2864
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
1,177✔
2865
                               "update checkpoint-info", &pTrans);
2866
  if (pTrans == NULL || code) {
1,177!
2867
    sdbRelease(pMnode->pSdb, pStream);
×
2868
    return code;
×
2869
  }
2870

2871
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
1,177✔
2872
  if (code) {
1,177!
2873
    sdbRelease(pMnode->pSdb, pStream);
×
2874
    mndTransDrop(pTrans);
×
2875
    return code;
×
2876
  }
2877

2878
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
1,177✔
2879
  if (code) {
1,177!
2880
    sdbRelease(pMnode->pSdb, pStream);
×
2881
    mndTransDrop(pTrans);
×
2882
    return code;
×
2883
  }
2884

2885
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,177✔
2886
  if (code) {
1,177!
2887
    sdbRelease(pMnode->pSdb, pStream);
×
2888
    mndTransDrop(pTrans);
×
2889
    return code;
×
2890
  }
2891

2892
  code = mndTransPrepare(pMnode, pTrans);
1,177✔
2893
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,177!
2894
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
2895
    sdbRelease(pMnode->pSdb, pStream);
×
2896
    mndTransDrop(pTrans);
×
2897
    return code;
×
2898
  }
2899

2900
  sdbRelease(pMnode->pSdb, pStream);
1,177✔
2901
  mndTransDrop(pTrans);
1,177✔
2902

2903
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,177✔
2904
}
2905

2906
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
2✔
2907
  SMnode      *pMnode = pReq->info.node;
2✔
2908
  int32_t      code = 0;
2✔
2909
  SOrphanTask *pTask = NULL;
2✔
2910
  int32_t      i = 0;
2✔
2911
  STrans      *pTrans = NULL;
2✔
2912
  int32_t      numOfTasks = 0;
2✔
2913

2914
  SMStreamDropOrphanMsg msg = {0};
2✔
2915
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
2✔
2916
  if (code) {
2!
2917
    return code;
×
2918
  }
2919

2920
  numOfTasks = taosArrayGetSize(msg.pList);
2✔
2921
  if (numOfTasks == 0) {
2!
2922
    mDebug("no orphan tasks to drop, no need to create trans");
×
2923
    goto _err;
×
2924
  }
2925

2926
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
2!
2927

2928
  i = 0;
2✔
2929
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
2!
2930
    i += 1;
×
2931
  }
2932

2933
  if (pTask == NULL) {
2!
2934
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
2935
    goto _err;
×
2936
  }
2937

2938
  // check if it is conflict with other trans in both sourceDb and targetDb.
2939
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
2✔
2940
  if (code) {
2!
2941
    goto _err;
×
2942
  }
2943

2944
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
2✔
2945

2946
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
2✔
2947
  if (pTrans == NULL || code != 0) {
2!
2948
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
2949
    goto _err;
×
2950
  }
2951

2952
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
2✔
2953
  if (code) {
2!
2954
    goto _err;
×
2955
  }
2956

2957
  // drop all tasks
2958
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
2!
2959
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
2960
    goto _err;
×
2961
  }
2962

2963
  // drop stream
2964
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
2!
2965
    goto _err;
×
2966
  }
2967

2968
  code = mndTransPrepare(pMnode, pTrans);
2✔
2969
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2970
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
2971
    goto _err;
×
2972
  }
2973

2974
_err:
2✔
2975
  tDestroyDropOrphanTaskMsg(&msg);
2✔
2976
  mndTransDrop(pTrans);
2✔
2977

2978
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2979
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
2!
2980
  }
2981
  return code;
2✔
2982
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc