• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3626

28 Feb 2025 03:34AM UTC coverage: 63.764% (+0.1%) from 63.633%
#3626

push

travis-ci

web-flow
Merge pull request #29961 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

149233 of 299935 branches covered (49.76%)

Branch coverage included in aggregate %.

53 of 91 new or added lines in 8 files covered. (58.24%)

3267 existing lines in 138 files now uncovered.

233601 of 300457 relevant lines covered (77.75%)

17374158.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.39
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndStream.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
44

45
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
46
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
47

48
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
49
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
51
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
53
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
54
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
55
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
56
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
57
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
58
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
59
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
60
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
61
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
62
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
63
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
64
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
65
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
66
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
67

68
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
69
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
70

71
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
72
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
73
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
74
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
75
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
76

77
int32_t mndInitStream(SMnode *pMnode) {
1,920✔
78
  SSdbTable table = {
1,920✔
79
      .sdbType = SDB_STREAM,
80
      .keyType = SDB_KEY_BINARY,
81
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
82
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
83
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
84
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
85
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
86
  };
87
  SSdbTable tableSeq = {
1,920✔
88
      .sdbType = SDB_STREAM_SEQ,
89
      .keyType = SDB_KEY_BINARY,
90
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
91
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
92
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
93
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
94
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
95
  };
96

97
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,920✔
98
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,920✔
99
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
1,920✔
100

101
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
1,920✔
102
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
1,920✔
103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
1,920✔
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
1,920✔
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
1,920✔
106
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
1,920✔
107
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
1,920✔
108
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
1,920✔
109
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
1,920✔
110

111
  // for msgs inside mnode
112
  // TODO change the name
113
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
1,920✔
114
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
1,920✔
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
1,920✔
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
1,920✔
117

118
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
1,920✔
119
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
1,920✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
1,920✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
1,920✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
1,920✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
1,920✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
1,920✔
125
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
1,920✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
1,920✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
1,920✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
1,920✔
129

130
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
1,920✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
1,920✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
1,920✔
133

134
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,920✔
135
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,920✔
136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,920✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,920✔
138

139
  int32_t code = mndInitExecInfo();
1,920✔
140
  if (code) {
1,920!
141
    return code;
×
142
  }
143

144
  code = sdbSetTable(pMnode->pSdb, table);
1,920✔
145
  if (code) {
1,920!
146
    return code;
×
147
  }
148

149
  code = sdbSetTable(pMnode->pSdb, tableSeq);
1,920✔
150
  return code;
1,920✔
151
}
152

153
void mndCleanupStream(SMnode *pMnode) {
1,919✔
154
  taosArrayDestroy(execInfo.pTaskList);
1,919✔
155
  taosArrayDestroy(execInfo.pNodeList);
1,919✔
156
  taosArrayDestroy(execInfo.pKilledChkptTrans);
1,919✔
157
  taosHashCleanup(execInfo.pTaskMap);
1,919✔
158
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
1,919✔
159
  taosHashCleanup(execInfo.pTransferStateStreams);
1,919✔
160
  taosHashCleanup(execInfo.pChkptStreams);
1,919✔
161
  taosHashCleanup(execInfo.pStreamConsensus);
1,919✔
162
  (void)taosThreadMutexDestroy(&execInfo.lock);
1,919✔
163
  mDebug("mnd stream exec info cleanup");
1,919✔
164
}
1,919✔
165

166
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
6,658✔
167
  int32_t     code = 0;
6,658✔
168
  int32_t     lino = 0;
6,658✔
169
  SSdbRow    *pRow = NULL;
6,658✔
170
  SStreamObj *pStream = NULL;
6,658✔
171
  void       *buf = NULL;
6,658✔
172
  int8_t      sver = 0;
6,658✔
173
  int32_t     tlen;
174
  int32_t     dataPos = 0;
6,658✔
175

176
  code = sdbGetRawSoftVer(pRaw, &sver);
6,658✔
177
  TSDB_CHECK_CODE(code, lino, _over);
6,658!
178

179
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
6,658!
180
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
181
    goto _over;
×
182
  }
183

184
  pRow = sdbAllocRow(sizeof(SStreamObj));
6,658✔
185
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
6,658!
186

187
  pStream = sdbGetRowObj(pRow);
6,658✔
188
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
6,658!
189

190
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
6,658!
191

192
  buf = taosMemoryMalloc(tlen + 1);
6,658!
193
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,658!
194

195
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,658!
196

197
  SDecoder decoder;
198
  tDecoderInit(&decoder, buf, tlen + 1);
6,658✔
199
  code = tDecodeSStreamObj(&decoder, pStream, sver);
6,658✔
200
  tDecoderClear(&decoder);
6,658✔
201

202
  if (code < 0) {
6,658!
203
    tFreeStreamObj(pStream);
×
204
  }
205

206
_over:
6,658✔
207
  taosMemoryFreeClear(buf);
6,658!
208

209
  if (code != TSDB_CODE_SUCCESS) {
6,658!
210
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
211
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
212
    taosMemoryFreeClear(pRow);
×
213

214
    terrno = code;
×
215
    return NULL;
×
216
  } else {
217
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
6,658✔
218
           pStream->checkpointId);
219

220
    terrno = 0;
6,658✔
221
    return pRow;
6,658✔
222
  }
223
}
224

225
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
1,853✔
226
  mTrace("stream:%s, perform insert action", pStream->name);
1,853✔
227
  return 0;
1,853✔
228
}
229

230
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
6,658✔
231
  mTrace("stream:%s, perform delete action", pStream->name);
6,658✔
232
  taosWLockLatch(&pStream->lock);
6,658✔
233
  tFreeStreamObj(pStream);
6,658✔
234
  taosWUnLockLatch(&pStream->lock);
6,658✔
235
  return 0;
6,658✔
236
}
237

238
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
3,431✔
239
  mTrace("stream:%s, perform update action", pOldStream->name);
3,431✔
240
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
3,431✔
241

242
  taosWLockLatch(&pOldStream->lock);
3,431✔
243

244
  pOldStream->status = pNewStream->status;
3,431✔
245
  pOldStream->updateTime = pNewStream->updateTime;
3,431✔
246
  pOldStream->checkpointId = pNewStream->checkpointId;
3,431✔
247
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
3,431✔
248

249
  taosWUnLockLatch(&pOldStream->lock);
3,431✔
250
  return 0;
3,431✔
251
}
252

253
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
6,988✔
254
  int32_t code = 0;
6,988✔
255
  SSdb   *pSdb = pMnode->pSdb;
6,988✔
256
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
6,988✔
257
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
6,988!
258
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
2,949✔
259
  }
260
  return code;
6,988✔
261
}
262

263
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
14,766✔
264
  SSdb *pSdb = pMnode->pSdb;
14,766✔
265
  sdbRelease(pSdb, pStream);
14,766✔
266
}
14,766✔
267

268
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
269
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
270
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
271
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
272
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
273

274
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
1,761✔
275
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
1,761!
276
      pCreate->targetStbFullName[0] == 0) {
1,761!
277
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
278
  }
279
  return TSDB_CODE_SUCCESS;
1,761✔
280
}
281

282
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
1,757✔
283
  pWrapper->nCols = taosArrayGetSize(pFields);
1,757✔
284
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
1,757!
285
  if (NULL == pWrapper->pSchema) {
1,757!
286
    return terrno;
×
287
  }
288

289
  int32_t index = 0;
1,757✔
290
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
62,468✔
291
    SField *pField = (SField *)taosArrayGet(pFields, i);
60,711✔
292
    if (pField == NULL) {
60,711!
293
      return terrno;
×
294
    }
295

296
    if (TSDB_DATA_TYPE_NULL == pField->type) {
60,711!
297
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
298
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
299
    } else {
300
      pWrapper->pSchema[index].type = pField->type;
60,711✔
301
      pWrapper->pSchema[index].bytes = pField->bytes;
60,711✔
302
    }
303
    pWrapper->pSchema[index].colId = index + 1;
60,711✔
304
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
60,711✔
305
    pWrapper->pSchema[index].flags = pField->flags;
60,711✔
306
    index += 1;
60,711✔
307
  }
308

309
  return TSDB_CODE_SUCCESS;
1,757✔
310
}
311

312
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
1,757✔
313
  if (pWrapper->nCols < 2) {
1,757!
314
    return false;
×
315
  }
316
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
60,892✔
317
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
59,162✔
318
      return true;
27✔
319
    }
320
  }
321
  return false;
1,730✔
322
}
323

324
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
1,757✔
325
  SNode      *pAst = NULL;
1,757✔
326
  SQueryPlan *pPlan = NULL;
1,757✔
327
  int32_t     code = 0;
1,757✔
328

329
  mInfo("stream:%s to create", pCreate->name);
1,757!
330
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
1,757✔
331
  pObj->createTime = taosGetTimestampMs();
1,757✔
332
  pObj->updateTime = pObj->createTime;
1,757✔
333
  pObj->version = 1;
1,757✔
334

335
  if (pCreate->smaId > 0) {
1,757✔
336
    pObj->subTableWithoutMd5 = 1;
259✔
337
  }
338

339
  pObj->smaId = pCreate->smaId;
1,757✔
340
  pObj->indexForMultiAggBalance = -1;
1,757✔
341

342
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,757✔
343

344
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
1,757✔
345
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
1,757✔
346

347
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,757✔
348
  pObj->status = 0;
1,757✔
349

350
  pObj->conf.igExpired = pCreate->igExpired;
1,757✔
351
  pObj->conf.trigger = pCreate->triggerType;
1,757✔
352
  pObj->conf.triggerParam = pCreate->maxDelay;
1,757✔
353
  pObj->conf.watermark = pCreate->watermark;
1,757✔
354
  pObj->conf.fillHistory = pCreate->fillHistory;
1,757✔
355
  pObj->deleteMark = pCreate->deleteMark;
1,757✔
356
  pObj->igCheckUpdate = pCreate->igUpdate;
1,757✔
357

358
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
1,757✔
359
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
1,757✔
360
  if (pSourceDb == NULL) {
1,757!
361
    code = terrno;
×
362
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
363
          tstrerror(code));
364
    goto FAIL;
×
365
  }
366

367
  pObj->sourceDbUid = pSourceDb->uid;
1,757✔
368
  mndReleaseDb(pMnode, pSourceDb);
1,757✔
369

370
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
1,757✔
371

372
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
1,757✔
373
  if (pTargetDb == NULL) {
1,757!
374
    code = terrno;
×
375
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
376
           tstrerror(code));
377
    goto FAIL;
×
378
  }
379

380
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
1,757✔
381

382
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
1,757✔
383
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,604✔
384
  } else {
385
    pObj->targetStbUid = pCreate->targetStbUid;
153✔
386
  }
387
  pObj->targetDbUid = pTargetDb->uid;
1,757✔
388
  mndReleaseDb(pMnode, pTargetDb);
1,757✔
389

390
  pObj->sql = pCreate->sql;
1,757✔
391
  pObj->ast = pCreate->ast;
1,757✔
392

393
  pCreate->sql = NULL;
1,757✔
394
  pCreate->ast = NULL;
1,757✔
395

396
  // deserialize ast
397
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
1,757!
398
    goto FAIL;
×
399
  }
400

401
  // create output schema
402
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
1,757!
403
    goto FAIL;
×
404
  }
405

406
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
1,757✔
407
  if (numOfNULL > 0) {
1,757✔
408
    pObj->outputSchema.nCols += numOfNULL;
26✔
409
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
26!
410
    if (!pFullSchema) {
26!
411
      code = terrno;
×
412
      goto FAIL;
×
413
    }
414

415
    int32_t nullIndex = 0;
26✔
416
    int32_t dataIndex = 0;
26✔
417
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
332✔
418
      if (nullIndex >= numOfNULL) {
306!
419
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
420
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
421
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
422
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
423
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
424
        dataIndex++;
×
425
      } else {
426
        SColLocation *pos = NULL;
306✔
427
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
306!
428
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
306✔
429
        }
430

431
        if (pos == NULL) {
306!
432
          mError("invalid null column index, %d", nullIndex);
×
433
          continue;
×
434
        }
435

436
        if (i < pos->slotId) {
306✔
437
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
79✔
438
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
79✔
439
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
79✔
440
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
79✔
441
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
79✔
442
          dataIndex++;
79✔
443
        } else {
444
          pFullSchema[i].bytes = 0;
227✔
445
          pFullSchema[i].colId = pos->colId;
227✔
446
          pFullSchema[i].flags = COL_SET_NULL;
227✔
447
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
227✔
448
          pFullSchema[i].type = pos->type;
227✔
449
          nullIndex++;
227✔
450
        }
451
      }
452
    }
453

454
    taosMemoryFree(pObj->outputSchema.pSchema);
26!
455
    pObj->outputSchema.pSchema = pFullSchema;
26✔
456
  }
457

458
  SPlanContext cxt = {
1,757✔
459
      .pAstRoot = pAst,
460
      .topicQuery = false,
461
      .streamQuery = true,
462
      .triggerType =
463
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
1,757✔
464
      .watermark = pObj->conf.watermark,
1,757✔
465
      .igExpired = pObj->conf.igExpired,
1,757✔
466
      .deleteMark = pObj->deleteMark,
1,757✔
467
      .igCheckUpdate = pObj->igCheckUpdate,
1,757✔
468
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
1,757✔
469
  };
470

471
  // using ast and param to build physical plan
472
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
1,757!
473
    goto FAIL;
×
474
  }
475

476
  // save physcial plan
477
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
1,757!
478
    goto FAIL;
×
479
  }
480

481
  pObj->tagSchema.nCols = pCreate->numOfTags;
1,757✔
482
  if (pCreate->numOfTags) {
1,757✔
483
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
284!
484
    if (pObj->tagSchema.pSchema == NULL) {
284!
485
      code = terrno;
×
486
      goto FAIL;
×
487
    }
488
  }
489

490
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
491
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
3,347✔
492
    SField *pField = taosArrayGet(pCreate->pTags, i);
1,590✔
493
    if (pField == NULL) {
1,590!
494
      continue;
×
495
    }
496

497
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
1,590✔
498
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
1,590✔
499
    pObj->tagSchema.pSchema[i].flags = pField->flags;
1,590✔
500
    pObj->tagSchema.pSchema[i].type = pField->type;
1,590✔
501
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
1,590✔
502
  }
503

504
FAIL:
1,757✔
505
  if (pAst != NULL) nodesDestroyNode(pAst);
1,757!
506
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
1,757!
507
  return code;
1,757✔
508
}
509

510
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
14,041✔
511
  SEncoder encoder;
512
  tEncoderInit(&encoder, NULL, 0);
14,041✔
513

514
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
14,041!
515
    pTask->ver = SSTREAM_TASK_VER;
×
516
  }
517

518
  int32_t code = tEncodeStreamTask(&encoder, pTask);
14,041✔
519
  if (code == -1) {
14,041!
520
    tEncoderClear(&encoder);
×
521
    return TSDB_CODE_INVALID_MSG;
×
522
  }
523

524
  int32_t size = encoder.pos;
14,041✔
525
  int32_t tlen = sizeof(SMsgHead) + size;
14,041✔
526
  tEncoderClear(&encoder);
14,041✔
527

528
  void *buf = taosMemoryCalloc(1, tlen);
14,041!
529
  if (buf == NULL) {
14,041!
530
    return terrno;
×
531
  }
532

533
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
14,041✔
534

535
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
14,041✔
536
  tEncoderInit(&encoder, abuf, size);
14,041✔
537
  code = tEncodeStreamTask(&encoder, pTask);
14,041✔
538
  tEncoderClear(&encoder);
14,041✔
539

540
  if (code != 0) {
14,041!
541
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
542
    taosMemoryFree(buf);
×
543
    return code;
×
544
  }
545

546
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
14,041✔
547
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
548
  if (code) {
14,041!
549
    taosMemoryFree(buf);
×
550
  }
551

552
  return code;
14,041✔
553
}
554

555
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
1,785✔
556
  SStreamTaskIter *pIter = NULL;
1,785✔
557
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,785✔
558
  if (code) {
1,785!
559
    mError("failed to create task iter for stream:%s", pStream->name);
×
560
    return code;
×
561
  }
562

563
  while (streamTaskIterNextTask(pIter)) {
10,988✔
564
    SStreamTask *pTask = NULL;
9,203✔
565
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,203✔
566
    if (code) {
9,203!
567
      destroyStreamTaskIter(pIter);
×
568
      return code;
×
569
    }
570

571
    code = mndPersistTaskDeployReq(pTrans, pTask);
9,203✔
572
    if (code) {
9,203!
573
      destroyStreamTaskIter(pIter);
×
574
      return code;
×
575
    }
576
  }
577

578
  destroyStreamTaskIter(pIter);
1,785✔
579

580
  // persistent stream task for already stored ts data
581
  if (pStream->conf.fillHistory) {
1,785✔
582
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
834✔
583

584
    for (int32_t i = 0; i < level; i++) {
2,565✔
585
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
1,731✔
586

587
      int32_t numOfTasks = taosArrayGetSize(pLevel);
1,731✔
588
      for (int32_t j = 0; j < numOfTasks; j++) {
6,569✔
589
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
4,838✔
590
        code = mndPersistTaskDeployReq(pTrans, pTask);
4,838✔
591
        if (code) {
4,838!
592
          return code;
×
593
        }
594
      }
595
    }
596
  }
597

598
  return code;
1,785✔
599
}
600

601
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
1,785✔
602
  int32_t code = 0;
1,785✔
603
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
1,785!
604
    return code;
×
605
  }
606

607
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,785✔
608
}
609

610
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
1,604✔
611
  SStbObj *pStb = NULL;
1,604✔
612
  SDbObj  *pDb = NULL;
1,604✔
613
  int32_t  code = 0;
1,604✔
614
  int32_t  lino = 0;
1,604✔
615

616
  SMCreateStbReq createReq = {0};
1,604✔
617
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,604✔
618
  createReq.numOfColumns = pStream->outputSchema.nCols;
1,604✔
619
  createReq.numOfTags = 1;  // group id
1,604✔
620
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
1,604✔
621
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
1,604!
622

623
  // build fields
624
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
60,659✔
625
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
59,055✔
626
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
59,055!
627

628
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
59,055✔
629
    pField->flags = pStream->outputSchema.pSchema[i].flags;
59,055✔
630
    pField->type = pStream->outputSchema.pSchema[i].type;
59,055✔
631
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
59,055✔
632
    pField->compress = createDefaultColCmprByType(pField->type);
59,055✔
633
  }
634

635
  if (pStream->tagSchema.nCols == 0) {
1,604✔
636
    createReq.numOfTags = 1;
1,320✔
637
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
1,320✔
638
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
1,320!
639

640
    // build tags
641
    SField *pField = taosArrayGet(createReq.pTags, 0);
1,320✔
642
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
1,320!
643

644
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
1,320✔
645
    pField->type = TSDB_DATA_TYPE_UBIGINT;
1,320✔
646
    pField->flags = 0;
1,320✔
647
    pField->bytes = 8;
1,320✔
648
  } else {
649
    createReq.numOfTags = pStream->tagSchema.nCols;
284✔
650
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
284✔
651
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
284!
652

653
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
1,874✔
654
      SField *pField = taosArrayGet(createReq.pTags, i);
1,590✔
655
      if (pField == NULL) {
1,590!
656
        continue;
×
657
      }
658

659
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
1,590✔
660
      pField->flags = pStream->tagSchema.pSchema[i].flags;
1,590✔
661
      pField->type = pStream->tagSchema.pSchema[i].type;
1,590✔
662
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
1,590✔
663
    }
664
  }
665

666
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
1,604!
667
    goto _OVER;
×
668
  }
669

670
  pStb = mndAcquireStb(pMnode, createReq.name);
1,604✔
671
  if (pStb != NULL) {
1,604!
672
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
673
    goto _OVER;
×
674
  }
675

676
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
1,604✔
677
  if (pDb == NULL) {
1,604!
678
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
679
    goto _OVER;
×
680
  }
681

682
  int32_t numOfStbs = -1;
1,604✔
683
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
1,604!
684
    goto _OVER;
×
685
  }
686

687
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
1,604!
688
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
689
    goto _OVER;
×
690
  }
691

692
  SStbObj stbObj = {0};
1,604✔
693

694
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
1,604!
695
    goto _OVER;
×
696
  }
697

698
  stbObj.uid = pStream->targetStbUid;
1,604✔
699

700
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
1,604!
701
    mndFreeStb(&stbObj);
×
702
    goto _OVER;
×
703
  }
704

705
  tFreeSMCreateStbReq(&createReq);
1,604✔
706
  mndFreeStb(&stbObj);
1,604✔
707
  mndReleaseStb(pMnode, pStb);
1,604✔
708
  mndReleaseDb(pMnode, pDb);
1,604✔
709
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
1,604✔
710
  return code;
1,604✔
711

712
_OVER:
×
713
  tFreeSMCreateStbReq(&createReq);
×
714
  mndReleaseStb(pMnode, pStb);
×
715
  mndReleaseDb(pMnode, pDb);
×
716

717
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
718
         tstrerror(code));
719
  return code;
×
720
}
721

722
// 1. stream number check
723
// 2. target stable can not be target table of other existed streams.
724
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
1,757✔
725
  int32_t     numOfStream = 0;
1,757✔
726
  SStreamObj *pStream = NULL;
1,757✔
727
  void       *pIter = NULL;
1,757✔
728

729
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
4,069✔
730
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
2,313✔
731
      ++numOfStream;
1,630✔
732
    }
733

734
    sdbRelease(pMnode->pSdb, pStream);
2,313✔
735

736
    if (numOfStream > MND_STREAM_MAX_NUM) {
2,313!
737
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
738
             pStreamObj->name);
739
      sdbCancelFetch(pMnode->pSdb, pIter);
×
740
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
741
    }
742

743
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
2,313✔
744
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
745
             pStreamObj->name);
746
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
747
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
748
    }
749
  }
750

751
  return TSDB_CODE_SUCCESS;
1,756✔
752
}
753

754
static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
×
755

756
static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
×
757
                                       SStreamTask *pTask) {
758
  int32_t code = TSDB_CODE_SUCCESS;
×
759
  int32_t lino = 0;
×
760

761
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
762
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
763

764
  pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
×
765
  TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
×
766
  pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
×
767
  pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
×
768
  pTask->notifyInfo.streamName = taosStrdup(mndGetDbStr(createReq->name));
×
769
  TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
×
770
  pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
×
771
  TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
×
772
  pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
×
773
  TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
×
774

775
_end:
×
776
  if (code != TSDB_CODE_SUCCESS) {
×
777
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
778
  }
779
  return code;
×
780
}
781

782
static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
1,756✔
783
  int32_t code = TSDB_CODE_SUCCESS;
1,756✔
784
  int32_t lino = 0;
1,756✔
785
  int32_t level = 0;
1,756✔
786
  int32_t nTasks = 0;
1,756✔
787
  SArray *pLevel = NULL;
1,756✔
788

789
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,756!
790
  TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,756!
791

792
  if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
1,756!
793
    goto _end;
1,756✔
794
  }
795

796
  level = taosArrayGetSize(pStream->tasks);
×
797
  for (int32_t i = 0; i < level; ++i) {
×
798
    pLevel = taosArrayGetP(pStream->tasks, i);
×
799
    nTasks = taosArrayGetSize(pLevel);
×
800
    for (int32_t j = 0; j < nTasks; ++j) {
×
801
      code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
802
      TSDB_CHECK_CODE(code, lino, _end);
×
803
    }
804
  }
805

806
  if (pStream->conf.fillHistory && createReq->notifyHistory) {
×
807
    level = taosArrayGetSize(pStream->pHTasksList);
×
808
    for (int32_t i = 0; i < level; ++i) {
×
809
      pLevel = taosArrayGetP(pStream->pHTasksList, i);
×
810
      nTasks = taosArrayGetSize(pLevel);
×
811
      for (int32_t j = 0; j < nTasks; ++j) {
×
812
        code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
813
        TSDB_CHECK_CODE(code, lino, _end);
×
814
      }
815
    }
816
  }
817

818
_end:
×
819
  if (code != TSDB_CODE_SUCCESS) {
1,756!
820
    mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
×
821
  }
822
  return code;
1,756✔
823
}
824

825
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
1,761✔
826
  SMnode     *pMnode = pReq->info.node;
1,761✔
827
  SStreamObj *pStream = NULL;
1,761✔
828
  SStreamObj  streamObj = {0};
1,761✔
829
  char       *sql = NULL;
1,761✔
830
  int32_t     sqlLen = 0;
1,761✔
831
  const char *pMsg = "create stream tasks on dnodes";
1,761✔
832
  int32_t     code = TSDB_CODE_SUCCESS;
1,761✔
833
  int32_t     lino = 0;
1,761✔
834
  STrans     *pTrans = NULL;
1,761✔
835

836
  SCMCreateStreamReq createReq = {0};
1,761✔
837
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
1,761✔
838
  TSDB_CHECK_CODE(code, lino, _OVER);
1,761!
839

840
#ifdef WINDOWS
841
  code = TSDB_CODE_MND_INVALID_PLATFORM;
842
  goto _OVER;
843
#endif
844

845
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
1,761!
846
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
1,761!
847
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
848
    goto _OVER;
×
849
  }
850

851
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
1,761✔
852
  if (pStream != NULL && code == 0) {
1,761!
853
    if (createReq.igExists) {
2✔
854
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
855
      mndReleaseStream(pMnode, pStream);
1✔
856
      tFreeSCMCreateStreamReq(&createReq);
1✔
857
      return code;
1✔
858
    } else {
859
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
860
      goto _OVER;
1✔
861
    }
862
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
1,759!
863
    goto _OVER;
×
864
  }
865

866
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
1,759!
867
    goto _OVER;
×
868
  }
869

870
  if (createReq.sql != NULL) {
1,759!
871
    sql = taosStrdup(createReq.sql);
1,759!
872
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
1,759!
873
  }
874

875
  // check for the taskEp update trans
876
  if (isNodeUpdateTransActive()) {
1,759!
877
    mError("stream:%s failed to create stream, node update trans is active", createReq.name);
×
878
    code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
879
    goto _OVER;
×
880
  }
881

882
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
1,759✔
883
  if (pSourceDb == NULL) {
1,759!
884
    code = terrno;
×
885
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
886
          tstrerror(code));
887
    goto _OVER;
×
888
  }
889

890
  code = mndCheckForSnode(pMnode, pSourceDb);
1,759✔
891
  mndReleaseDb(pMnode, pSourceDb);
1,759✔
892
  if (code != 0) {
1,759✔
893
    goto _OVER;
2✔
894
  }
895

896
  // build stream obj from request
897
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
1,757!
898
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
899
    goto _OVER;
×
900
  }
901

902
  code = doStreamCheck(pMnode, &streamObj);
1,757✔
903
  TSDB_CHECK_CODE(code, lino, _OVER);
1,757✔
904

905
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
1,756✔
906
  if (pTrans == NULL || code) {
1,756!
907
    goto _OVER;
×
908
  }
909

910
  // create stb for stream
911
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
1,756✔
912
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
1,604!
913
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
914
      mndTransDrop(pTrans);
×
915
      goto _OVER;
×
916
    }
917
  } else {
918
    mDebug("stream:%s no need create stable", createReq.name);
152✔
919
  }
920

921
  // schedule stream task for stream obj
922
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
1,756✔
923
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,756!
924
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
925
    mndTransDrop(pTrans);
×
926
    goto _OVER;
×
927
  }
928

929
  // add notify info into all stream tasks
930
  code = addStreamNotifyInfo(&createReq, &streamObj);
1,756✔
931
  if (code != TSDB_CODE_SUCCESS) {
1,756!
932
    mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
×
933
    mndTransDrop(pTrans);
×
934
    goto _OVER;
×
935
  }
936

937
  // add stream to trans
938
  code = mndPersistStream(pTrans, &streamObj);
1,756✔
939
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,756!
940
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
941
    mndTransDrop(pTrans);
×
942
    goto _OVER;
×
943
  }
944

945
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
1,756!
946
    mndTransDrop(pTrans);
×
947
    goto _OVER;
×
948
  }
949

950
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
1,756!
951
    mndTransDrop(pTrans);
×
952
    goto _OVER;
×
953
  }
954

955
  // add into buffer firstly
956
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
957
  streamMutexLock(&execInfo.lock);
1,756✔
958
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
1,756✔
959
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
1,756✔
960
  streamMutexUnlock(&execInfo.lock);
1,756✔
961

962
  // execute creation
963
  code = mndTransPrepare(pMnode, pTrans);
1,756✔
964
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,756!
965
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
966
    mndTransDrop(pTrans);
×
967
    goto _OVER;
×
968
  }
969

970
  mndTransDrop(pTrans);
1,756✔
971

972
  SName dbname = {0};
1,756✔
973
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,756✔
974
  if (code) {
1,756!
975
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
976
    goto _OVER;
×
977
  }
978

979
  SName name = {0};
1,756✔
980
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
1,756✔
981
  if (code) {
1,756!
982
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
983
    goto _OVER;
×
984
  }
985

986
  // reuse this function for stream
987
  if (sql != NULL && sqlLen > 0) {
1,756!
988
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
989
  } else {
990
    char detail[1000] = {0};
1,756✔
991
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
1,756✔
992
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
1,756✔
993
  }
994

995
_OVER:
1,760✔
996
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,760!
997
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
4!
998
  } else {
999
    mDebug("stream:%s create stream completed", createReq.name);
1,756✔
1000
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,756✔
1001
  }
1002

1003
  mndReleaseStream(pMnode, pStream);
1,760✔
1004
  tFreeSCMCreateStreamReq(&createReq);
1,760✔
1005
  tFreeStreamObj(&streamObj);
1,760✔
1006

1007
  if (sql != NULL) {
1,760✔
1008
    taosMemoryFreeClear(sql);
1,759!
1009
  }
1010

1011
  return code;
1,760✔
1012
}
1013

1014
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
1015
  SMnode          *pMnode = pReq->info.node;
×
1016
  SStreamObj      *pStream = NULL;
×
1017
  int32_t          code = 0;
×
1018
  SMPauseStreamReq pauseReq = {0};
×
1019

1020
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1021
    return TSDB_CODE_INVALID_MSG;
×
1022
  }
1023

1024
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
1025
  if (pStream == NULL || code != 0) {
×
1026
    if (pauseReq.igNotExists) {
×
1027
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
1028
      return 0;
×
1029
    } else {
1030
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
1031
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1032
    }
1033
  }
1034

1035
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
1036
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
1037
    sdbRelease(pMnode->pSdb, pStream);
×
1038
    return code;
×
1039
  }
1040

1041
  // check if it is conflict with other trans in both sourceDb and targetDb.
1042
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
1043
  if (code) {
×
1044
    sdbRelease(pMnode->pSdb, pStream);
×
1045
    return code;
×
1046
  }
1047

1048
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
1049
  if (updated) {
×
1050
    mError("tasks are not ready for restart, node update detected");
×
1051
    sdbRelease(pMnode->pSdb, pStream);
×
1052
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1053
  }
1054

1055
  STrans *pTrans = NULL;
×
1056
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
1057
                       &pTrans);
1058
  if (pTrans == NULL || code) {
×
1059
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1060
    sdbRelease(pMnode->pSdb, pStream);
×
1061
    return code;
×
1062
  }
1063

1064
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
1065
  if (code) {
×
1066
    sdbRelease(pMnode->pSdb, pStream);
×
1067
    mndTransDrop(pTrans);
×
1068
    return code;
×
1069
  }
1070

1071
  // if nodeUpdate happened, not send pause trans
1072
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
1073
  if (code) {
×
1074
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
1075
    sdbRelease(pMnode->pSdb, pStream);
×
1076
    mndTransDrop(pTrans);
×
1077
    return code;
×
1078
  }
1079

1080
  code = mndTransPrepare(pMnode, pTrans);
×
1081
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1082
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
1083
    sdbRelease(pMnode->pSdb, pStream);
×
1084
    mndTransDrop(pTrans);
×
1085
    return code;
×
1086
  }
1087

1088
  sdbRelease(pMnode->pSdb, pStream);
×
1089
  mndTransDrop(pTrans);
×
1090

1091
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1092
}
1093

1094
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
1,282✔
1095
  SStreamObj *pStream = NULL;
1,282✔
1096
  void       *pIter = NULL;
1,282✔
1097
  SSdb       *pSdb = pMnode->pSdb;
1,282✔
1098
  int64_t     maxChkptId = 0;
1,282✔
1099

1100
  while (1) {
1101
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4,407✔
1102
    if (pIter == NULL) break;
4,407✔
1103

1104
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
3,125✔
1105
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
3,125✔
1106
           pStream->checkpointId);
1107
    sdbRelease(pSdb, pStream);
3,125✔
1108
  }
1109

1110
  {  // check the max checkpoint id from all vnodes.
1111
    int64_t maxCheckpointId = -1;
1,282✔
1112
    if (lock) {
1,282✔
1113
      streamMutexLock(&execInfo.lock);
547✔
1114
    }
1115

1116
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
15,124✔
1117
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
13,842✔
1118
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
13,842✔
1119
      if (p == NULL || pEntry == NULL) {
13,842!
1120
        continue;
×
1121
      }
1122

1123
      if (pEntry->checkpointInfo.failed) {
13,842!
1124
        continue;
×
1125
      }
1126

1127
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
13,842✔
1128
        maxCheckpointId = pEntry->checkpointInfo.latestId;
1,847✔
1129
      }
1130
    }
1131

1132
    if (lock) {
1,282✔
1133
      streamMutexUnlock(&execInfo.lock);
547✔
1134
    }
1135

1136
    if (maxCheckpointId > maxChkptId) {
1,282!
1137
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1138
             maxCheckpointId);
1139
      maxChkptId = maxCheckpointId;
×
1140
    }
1141
  }
1142

1143
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
1,282✔
1144
  return maxChkptId + 1;
1,282✔
1145
}
1146

1147
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
1,284✔
1148
                                               int8_t mndTrigger, bool lock) {
1149
  int32_t code = TSDB_CODE_SUCCESS;
1,284✔
1150
  bool    conflict = false;
1,284✔
1151
  int64_t ts = taosGetTimestampMs();
1,284✔
1152
  STrans *pTrans = NULL;
1,284✔
1153

1154
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
1,284!
1155
    return code;
×
1156
  }
1157

1158
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
1,284✔
1159
  if (code) {
1,284✔
1160
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
2!
1161
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
1162
    goto _ERR;
2✔
1163
  }
1164

1165
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
1,282✔
1166
                       "gen checkpoint for stream", &pTrans);
1167
  if (code) {
1,282!
1168
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1169
           tstrerror(code));
1170
    goto _ERR;
×
1171
  }
1172

1173
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
1,282✔
1174
  if (code) {
1,282!
1175
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
1176
    goto _ERR;
×
1177
  }
1178

1179
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
1,282✔
1180

1181
  taosWLockLatch(&pStream->lock);
1,282✔
1182
  pStream->currentTick = 1;
1,282✔
1183

1184
  // 1. redo action: broadcast checkpoint source msg for all source vg
1185
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
1,282✔
1186
  for (int32_t i = 0; i < totalLevel; i++) {
3,878✔
1187
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
2,596✔
1188
    SStreamTask *p = taosArrayGetP(pLevel, 0);
2,596✔
1189

1190
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
2,596✔
1191
      int32_t sz = taosArrayGetSize(pLevel);
1,282✔
1192
      for (int32_t j = 0; j < sz; j++) {
4,355✔
1193
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
3,073✔
1194
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
3,073✔
1195

1196
        if (code != TSDB_CODE_SUCCESS) {
3,073!
1197
          taosWUnLockLatch(&pStream->lock);
×
1198
          goto _ERR;
×
1199
        }
1200
      }
1201
    }
1202
  }
1203

1204
  // 2. reset tick
1205
  pStream->checkpointId = checkpointId;
1,282✔
1206
  pStream->checkpointFreq = taosGetTimestampMs();
1,282✔
1207
  pStream->currentTick = 0;
1,282✔
1208

1209
  // 3. commit log: stream checkpoint info
1210
  pStream->version = pStream->version + 1;
1,282✔
1211
  taosWUnLockLatch(&pStream->lock);
1,282✔
1212

1213
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
1,282!
1214
    goto _ERR;
×
1215
  }
1216

1217
  code = mndTransPrepare(pMnode, pTrans);
1,282✔
1218
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,282!
1219
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1220
  } else {
1221
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,282✔
1222
  }
1223

1224
_ERR:
1,284✔
1225
  mndTransDrop(pTrans);
1,284✔
1226
  return code;
1,284✔
1227
}
1228

1229
int32_t extractStreamNodeList(SMnode *pMnode) {
3,478✔
1230
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
3,478✔
1231
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
787✔
1232
    if (code) {
787!
1233
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
1234
      return code;
×
1235
    }
1236
  }
1237

1238
  return taosArrayGetSize(execInfo.pNodeList);
3,478✔
1239
}
1240

1241
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,725✔
1242
  int32_t code = 0;
1,725✔
1243
  if (mndStreamNodeIsUpdated(pMnode)) {
1,725✔
1244
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
22✔
1245
  }
1246

1247
  streamMutexLock(&execInfo.lock);
1,703✔
1248
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,703✔
1249
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
787✔
1250
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
787!
1251
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
1252
      code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1253
    }
1254
  }
1255

1256
  streamMutexUnlock(&execInfo.lock);
1,703✔
1257
  return code;
1,703✔
1258
}
1259

1260
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
1,674✔
1261
  int64_t ts = -1;
1,674✔
1262
  int32_t taskId = -1;
1,674✔
1263

1264
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
29,770✔
1265
    STaskId          *p = taosArrayGet(pTaskList, i);
28,193✔
1266
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
28,193✔
1267
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
28,193!
1268
      continue;
22,120✔
1269
    }
1270

1271
    // -1 denote not ready now or never ready till now
1272
    if (pEntry->hTaskId != 0) {
6,073✔
1273
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
3!
1274
            " exists, checkpoint not issued",
1275
            pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1276
            pEntry->hTaskId);
1277
      return -1;
3✔
1278
    }
1279

1280
    if (pEntry->status != TASK_STATUS__READY) {
6,070✔
1281
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
94!
1282
            (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1283
      return -1;
94✔
1284
    }
1285

1286
    if (ts < pEntry->startTime) {
5,976✔
1287
      ts = pEntry->startTime;
3,522✔
1288
      taskId = pEntry->id.taskId;
3,522✔
1289
    }
1290
  }
1291

1292
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
1,577✔
1293
  return ts;
1,577✔
1294
}
1295

1296
typedef struct {
1297
  int64_t streamId;
1298
  int64_t duration;
1299
} SCheckpointInterval;
1300

1301
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
590✔
1302
  const SCheckpointInterval *pInt1 = p1;
590✔
1303
  const SCheckpointInterval *pInt2 = p2;
590✔
1304
  if (pInt1->duration == pInt2->duration) {
590✔
1305
    return 0;
54✔
1306
  }
1307

1308
  return pInt1->duration > pInt2->duration ? -1 : 1;
536✔
1309
}
1310

1311
// all tasks of this stream should be ready, otherwise do nothing
1312
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
1,674✔
1313
  bool ready = false;
1,674✔
1314

1315
  streamMutexLock(&execInfo.lock);
1,674✔
1316

1317
  int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
1,674✔
1318
  if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
1,674!
1319

1320
    if (lastReadyTs != -1) {
565✔
1321
      mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64
467!
1322
            "ms less than threshold",
1323
            pStream->uid, lastReadyTs, (now - lastReadyTs));
1324
    }
1325

1326
    ready = false;
565✔
1327
  } else {
1328
    ready = true;
1,109✔
1329
  }
1330

1331
  streamMutexUnlock(&execInfo.lock);
1,674✔
1332
  return ready;
1,674✔
1333
}
1334

1335
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,725✔
1336
  SMnode     *pMnode = pReq->info.node;
1,725✔
1337
  SSdb       *pSdb = pMnode->pSdb;
1,725✔
1338
  void       *pIter = NULL;
1,725✔
1339
  SStreamObj *pStream = NULL;
1,725✔
1340
  int32_t     code = 0;
1,725✔
1341
  int32_t     numOfCheckpointTrans = 0;
1,725✔
1342
  SArray     *pLongChkpts = NULL;
1,725✔
1343
  SArray     *pList = NULL;
1,725✔
1344
  int64_t     now = taosGetTimestampMs();
1,725✔
1345

1346
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,725✔
1347
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
22✔
1348
  }
1349

1350
  pList = taosArrayInit(4, sizeof(SCheckpointInterval));
1,703✔
1351
  if (pList == NULL) {
1,703!
NEW
1352
    mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
UNCOV
1353
    return terrno;
×
1354
  }
1355

1356
  pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo));
1,703✔
1357
  if (pLongChkpts == NULL) {
1,703!
NEW
1358
    mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
NEW
1359
    taosArrayDestroy(pList);
×
NEW
1360
    return terrno;
×
1361
  }
1362

1363
  // check if ongong checkpoint trans or long chkpt trans exist.
1364
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts);
1,703✔
1365
  if (code) {
1,703!
NEW
1366
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1367

NEW
1368
    taosArrayDestroy(pList);
×
NEW
1369
    taosArrayDestroy(pLongChkpts);
×
NEW
1370
    return code;
×
1371
  }
1372

1373
  // kill long exec checkpoint and set task status
1374
  if (taosArrayGetSize(pLongChkpts) > 0) {
1,703!
NEW
1375
    killChkptAndResetStreamTask(pMnode, pLongChkpts);
×
1376

NEW
1377
    taosArrayDestroy(pList);
×
NEW
1378
    taosArrayDestroy(pLongChkpts);
×
NEW
1379
    return TSDB_CODE_SUCCESS;
×
1380
  }
1381

1382
  taosArrayDestroy(pLongChkpts);
1,703✔
1383

1384
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
4,166✔
1385
    int64_t duration = now - pStream->checkpointFreq;
2,463✔
1386
    if (duration < tsStreamCheckpointInterval * 1000) {
2,463✔
1387
      sdbRelease(pSdb, pStream);
789✔
1388
      continue;
1,354✔
1389
    }
1390

1391
    bool ready = isStreamReadyHelp(now, pStream);
1,674✔
1392
    if (!ready) {
1,674✔
1393
      sdbRelease(pSdb, pStream);
565✔
1394
      continue;
565✔
1395
    }
1396

1397
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
1,109✔
1398
    void               *p = taosArrayPush(pList, &in);
1,109✔
1399
    if (p) {
1,109!
1400
      int32_t currentSize = taosArrayGetSize(pList);
1,109✔
1401
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
1,109✔
1402
             "s), concurrently launch threshold:%d",
1403
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1404
             tsMaxConcurrentCheckpoint);
1405
    } else {
1406
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1407
    }
1408
    sdbRelease(pSdb, pStream);
1,109✔
1409
  }
1410

1411
  int32_t size = taosArrayGetSize(pList);
1,703✔
1412
  if (size == 0) {
1,703✔
1413
    taosArrayDestroy(pList);
1,156✔
1414
    return code;
1,156✔
1415
  }
1416

1417
  taosArraySort(pList, streamWaitComparFn);
547✔
1418

1419
  int32_t numOfQual = taosArrayGetSize(pList);
547✔
1420
  if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
547!
1421
    mDebug(
×
1422
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1423
        "checkpoint trans are not allowed, wait for 30s",
1424
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1425
    taosArrayDestroy(pList);
×
1426
    return code;
×
1427
  }
1428

1429
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
547✔
1430
  mDebug(
547✔
1431
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1432
      "concurrent trans threshold:%d",
1433
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1434

1435
  int32_t started = 0;
547✔
1436
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
547✔
1437

1438
  for (int32_t i = 0; i < numOfQual; ++i) {
551✔
1439
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
549✔
1440
    if (pCheckpointInfo == NULL) {
549!
1441
      continue;
×
1442
    }
1443

1444
    SStreamObj *p = NULL;
549✔
1445
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
549✔
1446
    if (p != NULL && code == 0) {
549!
1447
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
549✔
1448
      sdbRelease(pSdb, p);
549✔
1449

1450
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
549!
1451
        started += 1;
549✔
1452

1453
        if (started >= capacity) {
549✔
1454
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
545✔
1455
                 (started + numOfCheckpointTrans));
1456
          break;
545✔
1457
        }
1458
      } else {
1459
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1460
      }
1461
    }
1462
  }
1463

1464
  taosArrayDestroy(pList);
547✔
1465
  return code;
547✔
1466
}
1467

1468
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
1,415✔
1469
  SMnode     *pMnode = pReq->info.node;
1,415✔
1470
  SStreamObj *pStream = NULL;
1,415✔
1471
  int32_t     code = 0;
1,415✔
1472

1473
  SMDropStreamReq dropReq = {0};
1,415✔
1474
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
1,415!
1475
    mError("invalid drop stream msg recv, discarded");
×
1476
    code = TSDB_CODE_INVALID_MSG;
×
1477
    TAOS_RETURN(code);
×
1478
  }
1479

1480
  mDebug("recv drop stream:%s msg", dropReq.name);
1,415✔
1481

1482
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
1,415✔
1483
  if (pStream == NULL || code != 0) {
1,415!
1484
    if (dropReq.igNotExists) {
141✔
1485
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
131!
1486
      sdbRelease(pMnode->pSdb, pStream);
131✔
1487
      tFreeMDropStreamReq(&dropReq);
131✔
1488
      return 0;
131✔
1489
    } else {
1490
      mError("stream:%s not exist failed to drop it", dropReq.name);
10!
1491
      tFreeMDropStreamReq(&dropReq);
10✔
1492
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
10✔
1493
    }
1494
  }
1495

1496
  if (pStream->smaId != 0) {
1,274✔
1497
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
219!
1498

1499
    void    *pIter = NULL;
219✔
1500
    SSmaObj *pSma = NULL;
219✔
1501
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
219✔
1502
    while (pIter) {
361✔
1503
      if (pSma && pSma->uid == pStream->smaId) {
147!
1504
        sdbRelease(pMnode->pSdb, pSma);
5✔
1505
        sdbRelease(pMnode->pSdb, pStream);
5✔
1506

1507
        sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1508
        tFreeMDropStreamReq(&dropReq);
5✔
1509
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
5✔
1510

1511
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
5!
1512
               dropReq.name, pStream->uid, tstrerror(terrno));
1513
        TAOS_RETURN(code);
5✔
1514
      }
1515

1516
      if (pSma) {
142!
1517
        sdbRelease(pMnode->pSdb, pSma);
142✔
1518
      }
1519

1520
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
142✔
1521
    }
1522
  }
1523

1524
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
1,269!
1525
    sdbRelease(pMnode->pSdb, pStream);
×
1526
    tFreeMDropStreamReq(&dropReq);
×
1527
    return -1;
×
1528
  }
1529

1530
  // check if it is conflict with other trans in both sourceDb and targetDb.
1531
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
1,269✔
1532
  if (code) {
1,269!
UNCOV
1533
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1534
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1535
    return code;
×
1536
  }
1537

1538
  STrans *pTrans = NULL;
1,269✔
1539
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
1,269✔
1540
  if (pTrans == NULL || code) {
1,269!
1541
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1542
    sdbRelease(pMnode->pSdb, pStream);
×
1543
    tFreeMDropStreamReq(&dropReq);
×
1544
    TAOS_RETURN(code);
×
1545
  }
1546

1547
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
1,269✔
1548
  if (code) {
1,269!
1549
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1550
    sdbRelease(pMnode->pSdb, pStream);
×
1551
    mndTransDrop(pTrans);
×
1552
    tFreeMDropStreamReq(&dropReq);
×
1553
    TAOS_RETURN(code);
×
1554
  }
1555

1556
  // drop all tasks
1557
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
1,269✔
1558
  if (code) {
1,269!
1559
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1560
    sdbRelease(pMnode->pSdb, pStream);
×
1561
    mndTransDrop(pTrans);
×
1562
    tFreeMDropStreamReq(&dropReq);
×
1563
    TAOS_RETURN(code);
×
1564
  }
1565

1566
  // drop stream
1567
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
1,269✔
1568
  if (code) {
1,269!
1569
    sdbRelease(pMnode->pSdb, pStream);
×
1570
    mndTransDrop(pTrans);
×
1571
    tFreeMDropStreamReq(&dropReq);
×
1572
    TAOS_RETURN(code);
×
1573
  }
1574

1575
  code = mndTransPrepare(pMnode, pTrans);
1,269✔
1576
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,269!
1577
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1578
    sdbRelease(pMnode->pSdb, pStream);
×
1579
    mndTransDrop(pTrans);
×
1580
    tFreeMDropStreamReq(&dropReq);
×
1581
    TAOS_RETURN(code);
×
1582
  }
1583

1584
  // kill the related checkpoint trans
1585
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
1,269✔
1586
  if (transId != 0) {
1,269!
1587
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
1588
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1589
  }
1590

1591
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
1,269✔
1592
         pStream->uid, transId);
1593

1594
  removeStreamTasksInBuf(pStream, &execInfo);
1,269✔
1595

1596
  SName name = {0};
1,269✔
1597
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,269✔
1598
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
1,269✔
1599

1600
  sdbRelease(pMnode->pSdb, pStream);
1,269✔
1601
  mndTransDrop(pTrans);
1,269✔
1602
  tFreeMDropStreamReq(&dropReq);
1,269✔
1603

1604
  if (code == 0) {
1,269✔
1605
    return TSDB_CODE_ACTION_IN_PROGRESS;
1,259✔
1606
  } else {
1607
    TAOS_RETURN(code);
10✔
1608
  }
1609
}
1610

1611
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,950✔
1612
  SSdb   *pSdb = pMnode->pSdb;
1,950✔
1613
  void   *pIter = NULL;
1,950✔
1614
  int32_t code = 0;
1,950✔
1615

1616
  while (1) {
562✔
1617
    SStreamObj *pStream = NULL;
2,512✔
1618
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,512✔
1619
    if (pIter == NULL) break;
2,512✔
1620

1621
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
563✔
1622
      if (pStream->sourceDbUid != pStream->targetDbUid) {
83✔
1623
        sdbRelease(pSdb, pStream);
1✔
1624
        sdbCancelFetch(pSdb, pIter);
1✔
1625
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1626
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1627
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1628
      } else {
1629
        // kill the related checkpoint trans
1630
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
82✔
1631
        if (transId != 0) {
82!
1632
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
1633
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1634
        }
1635

1636
        // drop the stream obj in execInfo
1637
        removeStreamTasksInBuf(pStream, &execInfo);
82✔
1638

1639
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
82✔
1640
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
82!
1641
          sdbRelease(pSdb, pStream);
×
1642
          sdbCancelFetch(pSdb, pIter);
×
1643
          return code;
×
1644
        }
1645
      }
1646
    }
1647

1648
    sdbRelease(pSdb, pStream);
562✔
1649
  }
1650

1651
  return 0;
1,949✔
1652
}
1653

1654
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,260✔
1655
  SMnode     *pMnode = pReq->info.node;
11,260✔
1656
  SSdb       *pSdb = pMnode->pSdb;
11,260✔
1657
  int32_t     numOfRows = 0;
11,260✔
1658
  SStreamObj *pStream = NULL;
11,260✔
1659
  int32_t     code = 0;
11,260✔
1660

1661
  while (numOfRows < rows) {
44,454!
1662
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
44,454✔
1663
    if (pShow->pIter == NULL) break;
44,473✔
1664

1665
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
33,205✔
1666
    if (code == 0) {
33,127!
1667
      numOfRows++;
33,127✔
1668
    }
1669
    sdbRelease(pSdb, pStream);
33,127✔
1670
  }
1671

1672
  pShow->numOfRows += numOfRows;
11,268✔
1673
  return numOfRows;
11,268✔
1674
}
1675

1676
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1677
  SSdb *pSdb = pMnode->pSdb;
×
1678
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1679
}
×
1680

1681
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
21,511✔
1682
  SMnode     *pMnode = pReq->info.node;
21,511✔
1683
  SSdb       *pSdb = pMnode->pSdb;
21,511✔
1684
  int32_t     numOfRows = 0;
21,511✔
1685
  SStreamObj *pStream = NULL;
21,511✔
1686
  int32_t     code = 0;
21,511✔
1687

1688
  streamMutexLock(&execInfo.lock);
21,511✔
1689
  mndInitStreamExecInfo(pMnode, &execInfo);
21,522✔
1690
  streamMutexUnlock(&execInfo.lock);
21,522✔
1691

1692
  while (numOfRows < rowsCapacity) {
86,754✔
1693
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
86,704✔
1694
    if (pShow->pIter == NULL) {
86,701✔
1695
      break;
21,471✔
1696
    }
1697

1698
    // lock
1699
    taosRLockLatch(&pStream->lock);
65,230✔
1700

1701
    int32_t count = mndGetNumOfStreamTasks(pStream);
65,234✔
1702
    if (numOfRows + count > rowsCapacity) {
65,111✔
1703
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
40✔
1704
      if (code) {
40!
1705
        mError("failed to prepare the result block buffer, quit return value");
×
1706
        taosRUnLockLatch(&pStream->lock);
×
1707
        sdbRelease(pSdb, pStream);
×
1708
        continue;
×
1709
      }
1710
    }
1711

1712
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
65,111✔
1713
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
65,111✔
1714
    if (pSourceDb != NULL) {
65,204!
1715
      precision = pSourceDb->cfg.precision;
65,212✔
1716
      mndReleaseDb(pMnode, pSourceDb);
65,212✔
1717
    }
1718

1719
    // add row for each task
1720
    SStreamTaskIter *pIter = NULL;
65,229✔
1721
    code = createStreamTaskIter(pStream, &pIter);
65,229✔
1722
    if (code) {
65,208!
1723
      taosRUnLockLatch(&pStream->lock);
×
1724
      sdbRelease(pSdb, pStream);
×
1725
      mError("failed to create task iter for stream:%s", pStream->name);
×
1726
      continue;
×
1727
    }
1728

1729
    while (streamTaskIterNextTask(pIter)) {
289,279✔
1730
      SStreamTask *pTask = NULL;
223,384✔
1731
      code = streamTaskIterGetCurrent(pIter, &pTask);
223,384✔
1732
      if (code) {
223,716!
1733
        destroyStreamTaskIter(pIter);
×
1734
        break;
×
1735
      }
1736

1737
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
223,716✔
1738
      if (code == TSDB_CODE_SUCCESS) {
224,071!
1739
        numOfRows++;
224,076✔
1740
      }
1741
    }
1742

1743
    pBlock->info.rows = numOfRows;
64,314✔
1744

1745
    destroyStreamTaskIter(pIter);
64,314✔
1746
    taosRUnLockLatch(&pStream->lock);
65,201✔
1747

1748
    sdbRelease(pSdb, pStream);
65,229✔
1749
  }
1750

1751
  pShow->numOfRows += numOfRows;
21,521✔
1752
  return numOfRows;
21,521✔
1753
}
1754

1755
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
1756
  SSdb *pSdb = pMnode->pSdb;
×
1757
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1758
}
×
1759

1760
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
761✔
1761
  SMnode     *pMnode = pReq->info.node;
761✔
1762
  SStreamObj *pStream = NULL;
761✔
1763
  int32_t     code = 0;
761✔
1764

1765
  SMPauseStreamReq pauseReq = {0};
761✔
1766
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
761!
1767
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1768
  }
1769

1770
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
761✔
1771
  if (pStream == NULL || code != 0) {
761!
1772
    if (pauseReq.igNotExists) {
422✔
1773
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
169!
1774
      return 0;
169✔
1775
    } else {
1776
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
253!
1777
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
253✔
1778
    }
1779
  }
1780

1781
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
339!
1782

1783
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
339!
1784
    sdbRelease(pMnode->pSdb, pStream);
×
1785
    return code;
×
1786
  }
1787

1788
  // check if it is conflict with other trans in both sourceDb and targetDb.
1789
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
339✔
1790
  if (code) {
339!
1791
    sdbRelease(pMnode->pSdb, pStream);
×
1792
    TAOS_RETURN(code);
×
1793
  }
1794

1795
  bool updated = mndStreamNodeIsUpdated(pMnode);
339✔
1796
  if (updated) {
339!
1797
    mError("tasks are not ready for pause, node update detected");
×
1798
    sdbRelease(pMnode->pSdb, pStream);
×
1799
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1800
  }
1801

1802
  {  // check for tasks, if tasks are not ready, not allowed to pause
1803
    bool found = false;
339✔
1804
    bool readyToPause = true;
339✔
1805
    streamMutexLock(&execInfo.lock);
339✔
1806

1807
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
4,928✔
1808
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
4,589✔
1809
      if (p == NULL) {
4,589!
1810
        continue;
×
1811
      }
1812

1813
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
4,589✔
1814
      if (pEntry == NULL) {
4,589!
1815
        continue;
×
1816
      }
1817

1818
      if (pEntry->id.streamId != pStream->uid) {
4,589✔
1819
        continue;
2,967✔
1820
      }
1821

1822
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
1,622!
1823
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
192!
1824
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1825
        readyToPause = false;
192✔
1826
      }
1827

1828
      found = true;
1,622✔
1829
    }
1830

1831
    streamMutexUnlock(&execInfo.lock);
339✔
1832
    if (!found) {
339!
1833
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1834
      sdbRelease(pMnode->pSdb, pStream);
×
1835
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1836
    }
1837

1838
    if (!readyToPause) {
339✔
1839
      mError("stream:%s task not ready for pause yet", pauseReq.name);
46!
1840
      sdbRelease(pMnode->pSdb, pStream);
46✔
1841
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
46✔
1842
    }
1843
  }
1844

1845
  STrans *pTrans = NULL;
293✔
1846
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
293✔
1847
  if (pTrans == NULL || code) {
293!
1848
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1849
    sdbRelease(pMnode->pSdb, pStream);
×
1850
    return code;
×
1851
  }
1852

1853
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
293✔
1854
  if (code) {
293!
1855
    sdbRelease(pMnode->pSdb, pStream);
×
1856
    mndTransDrop(pTrans);
×
1857
    return code;
×
1858
  }
1859

1860
  // if nodeUpdate happened, not send pause trans
1861
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
293✔
1862
  if (code) {
293!
1863
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1864
    sdbRelease(pMnode->pSdb, pStream);
×
1865
    mndTransDrop(pTrans);
×
1866
    return code;
×
1867
  }
1868

1869
  // pause stream
1870
  taosWLockLatch(&pStream->lock);
293✔
1871
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
293✔
1872
  if (code) {
293!
1873
    taosWUnLockLatch(&pStream->lock);
×
1874
    sdbRelease(pMnode->pSdb, pStream);
×
1875
    mndTransDrop(pTrans);
×
1876
    return code;
×
1877
  }
1878

1879
  taosWUnLockLatch(&pStream->lock);
293✔
1880

1881
  code = mndTransPrepare(pMnode, pTrans);
293✔
1882
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
293!
1883
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1884
    sdbRelease(pMnode->pSdb, pStream);
×
1885
    mndTransDrop(pTrans);
×
1886
    return code;
×
1887
  }
1888

1889
  sdbRelease(pMnode->pSdb, pStream);
293✔
1890
  mndTransDrop(pTrans);
293✔
1891

1892
  return TSDB_CODE_ACTION_IN_PROGRESS;
293✔
1893
}
1894

1895
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
882✔
1896
  SMnode     *pMnode = pReq->info.node;
882✔
1897
  SStreamObj *pStream = NULL;
882✔
1898
  int32_t     code = 0;
882✔
1899

1900
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
882!
1901
    return code;
×
1902
  }
1903

1904
  SMResumeStreamReq resumeReq = {0};
882✔
1905
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
882!
1906
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1907
  }
1908

1909
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
882✔
1910
  if (pStream == NULL || code != 0) {
882!
1911
    if (resumeReq.igNotExists) {
338✔
1912
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
337!
1913
      sdbRelease(pMnode->pSdb, pStream);
337✔
1914
      return 0;
337✔
1915
    } else {
1916
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1917
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1918
    }
1919
  }
1920

1921
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
544!
1922
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
544!
1923
    sdbRelease(pMnode->pSdb, pStream);
×
1924
    return -1;
×
1925
  }
1926

1927
  // check if it is conflict with other trans in both sourceDb and targetDb.
1928
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
544✔
1929
  if (code) {
544!
1930
    sdbRelease(pMnode->pSdb, pStream);
×
1931
    return code;
×
1932
  }
1933

1934
  STrans *pTrans = NULL;
544✔
1935
  code =
1936
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
544✔
1937
  if (pTrans == NULL || code) {
544!
1938
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
1939
    sdbRelease(pMnode->pSdb, pStream);
×
1940
    return code;
×
1941
  }
1942

1943
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
544✔
1944
  if (code) {
544!
1945
    sdbRelease(pMnode->pSdb, pStream);
×
1946
    mndTransDrop(pTrans);
×
1947
    return code;
×
1948
  }
1949

1950
  // set the resume action
1951
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
544✔
1952
  if (code) {
544!
1953
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
1954
    sdbRelease(pMnode->pSdb, pStream);
×
1955
    mndTransDrop(pTrans);
×
1956
    return code;
×
1957
  }
1958

1959
  // resume stream
1960
  taosWLockLatch(&pStream->lock);
544✔
1961
  pStream->status = STREAM_STATUS__NORMAL;
544✔
1962
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
544!
1963
    taosWUnLockLatch(&pStream->lock);
×
1964

1965
    sdbRelease(pMnode->pSdb, pStream);
×
1966
    mndTransDrop(pTrans);
×
1967
    return code;
×
1968
  }
1969

1970
  taosWUnLockLatch(&pStream->lock);
544✔
1971
  code = mndTransPrepare(pMnode, pTrans);
544✔
1972
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
544!
1973
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1974
    sdbRelease(pMnode->pSdb, pStream);
×
1975
    mndTransDrop(pTrans);
×
1976
    return code;
×
1977
  }
1978

1979
  sdbRelease(pMnode->pSdb, pStream);
544✔
1980
  mndTransDrop(pTrans);
544✔
1981

1982
  return TSDB_CODE_ACTION_IN_PROGRESS;
544✔
1983
}
1984

1985
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
1986
  SMnode     *pMnode = pReq->info.node;
×
1987
  SStreamObj *pStream = NULL;
×
1988
  int32_t     code = 0;
×
1989

1990
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
1991
    return code;
×
1992
  }
1993

1994
  SMResetStreamReq resetReq = {0};
×
1995
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
1996
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1997
  }
1998

1999
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
2000

2001
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
2002
  if (pStream == NULL || code != 0) {
×
2003
    if (resetReq.igNotExists) {
×
2004
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
2005
      return 0;
×
2006
    } else {
2007
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
2008
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2009
    }
2010
  }
2011

2012
  //todo(liao hao jun)
2013
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
2014
}
2015

2016
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
7✔
2017
  SSdb       *pSdb = pMnode->pSdb;
7✔
2018
  SStreamObj *pStream = NULL;
7✔
2019
  void       *pIter = NULL;
7✔
2020
  STrans     *pTrans = NULL;
7✔
2021
  int32_t     code = 0;
7✔
2022

2023
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
2024
  while (1) {
2025
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
14✔
2026
    if (pIter == NULL) {
14✔
2027
      break;
7✔
2028
    }
2029

2030
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
7✔
2031
    sdbRelease(pSdb, pStream);
7✔
2032

2033
    if (code) {
7!
2034
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
2035
      sdbCancelFetch(pSdb, pIter);
×
2036
      return code;
×
2037
    }
2038
  }
2039

2040
  while (1) {
2041
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
14✔
2042
    if (pIter == NULL) {
14✔
2043
      break;
7✔
2044
    }
2045

2046
    // here create only one trans
2047
    if (pTrans == NULL) {
7!
2048
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
7✔
2049
                           "update task epsets", &pTrans);
2050
      if (pTrans == NULL || code) {
7!
2051
        sdbRelease(pSdb, pStream);
×
2052
        sdbCancelFetch(pSdb, pIter);
×
2053
        return terrno = code;
×
2054
      }
2055
    }
2056

2057
    if (!includeAllNodes) {
7!
2058
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
7✔
2059
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
7✔
2060
      if (p1 == NULL && p2 == NULL) {
7!
2061
        mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
×
2062
        sdbRelease(pSdb, pStream);
×
2063
        continue;
×
2064
      }
2065
    }
2066

2067
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
7✔
2068
           pStream->name, pTrans->id);
2069

2070
    // NOTE: for each stream, we register one trans entry for task update
2071
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
7✔
2072
    if (code) {
7!
2073
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
2074
    }
2075

2076
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
7✔
2077

2078
    // todo: not continue, drop all and retry again
2079
    if (code != TSDB_CODE_SUCCESS) {
7!
2080
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
2081
             tstrerror(code));
2082
      sdbRelease(pSdb, pStream);
×
2083
      continue;
×
2084
    }
2085

2086
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
7✔
2087
    sdbRelease(pSdb, pStream);
7✔
2088

2089
    if (code != TSDB_CODE_SUCCESS) {
7!
2090
      sdbCancelFetch(pSdb, pIter);
×
2091
      return code;
×
2092
    }
2093
  }
2094

2095
  // no need to build the trans to handle the vgroup update
2096
  if (pTrans == NULL) {
7!
2097
    return 0;
×
2098
  }
2099

2100
  code = mndTransPrepare(pMnode, pTrans);
7✔
2101
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
7!
2102
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2103
    sdbRelease(pMnode->pSdb, pStream);
×
2104
    mndTransDrop(pTrans);
×
2105
    return code;
×
2106
  }
2107

2108
  sdbRelease(pMnode->pSdb, pStream);
7✔
2109
  mndTransDrop(pTrans);
7✔
2110
  return code;
7✔
2111
}
2112

2113
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
794✔
2114
  SSdb       *pSdb = pMnode->pSdb;
794✔
2115
  SStreamObj *pStream = NULL;
794✔
2116
  void       *pIter = NULL;
794✔
2117
  int32_t     code = 0;
794✔
2118

2119
  mDebug("start to refresh node list by existed streams");
794✔
2120

2121
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
794✔
2122
  if (pHash == NULL) {
794!
2123
    return terrno;
×
2124
  }
2125

2126
  while (1) {
7✔
2127
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
801✔
2128
    if (pIter == NULL) {
801✔
2129
      break;
794✔
2130
    }
2131

2132
    taosWLockLatch(&pStream->lock);
7✔
2133

2134
    SStreamTaskIter *pTaskIter = NULL;
7✔
2135
    code = createStreamTaskIter(pStream, &pTaskIter);
7✔
2136
    if (code) {
7!
2137
      taosWUnLockLatch(&pStream->lock);
×
2138
      sdbRelease(pSdb, pStream);
×
2139
      mError("failed to create task iter for stream:%s", pStream->name);
×
2140
      continue;
×
2141
    }
2142

2143
    while (streamTaskIterNextTask(pTaskIter)) {
50✔
2144
      SStreamTask *pTask = NULL;
43✔
2145
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
43✔
2146
      if (code) {
43!
2147
        break;
×
2148
      }
2149

2150
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
43✔
2151
      epsetAssign(&entry.epset, &pTask->info.epSet);
43✔
2152
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
43✔
2153
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
43!
2154
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2155
      }
2156
    }
2157

2158
    destroyStreamTaskIter(pTaskIter);
7✔
2159
    taosWUnLockLatch(&pStream->lock);
7✔
2160

2161
    sdbRelease(pSdb, pStream);
7✔
2162
  }
2163

2164
  taosArrayClear(pNodeList);
794✔
2165

2166
  // convert to list
2167
  pIter = NULL;
794✔
2168
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
817✔
2169
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
23✔
2170

2171
    void *p = taosArrayPush(pNodeList, pEntry);
23✔
2172
    if (p == NULL) {
23!
2173
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2174
      if (code == 0) {
×
2175
        code = terrno;
×
2176
      }
2177
      continue;
×
2178
    }
2179

2180
    char    buf[256] = {0};
23✔
2181
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
23✔
2182
    if (ret != 0) {                                                // print error and continue
23!
2183
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2184
    }
2185

2186
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
23✔
2187
  }
2188

2189
  taosHashCleanup(pHash);
794✔
2190

2191
  mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
794✔
2192
  return code;
794✔
2193
}
2194

2195
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2196
  void   *pIter = NULL;
×
2197
  int32_t code = 0;
×
2198
  while (1) {
×
2199
    SVgObj *pVgroup = NULL;
×
2200
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2201
    if (pIter == NULL) {
×
2202
      break;
×
2203
    }
2204

2205
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2206
    sdbRelease(pSdb, pVgroup);
×
2207

2208
    if (code == 0) {
×
2209
      int32_t size = taosHashGetSize(pDBMap);
×
2210
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2211
    }
2212
  }
2213
}
×
2214

2215
// this function runs by only one thread, so it is not multi-thread safe
2216
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
1,414✔
2217
  int32_t code = 0;
1,414✔
2218
  bool    allReady = true;
1,414✔
2219
  SArray *pNodeSnapshot = NULL;
1,414✔
2220
  SMnode *pMnode = pMsg->info.node;
1,414✔
2221
  int64_t ts = taosGetTimestampSec();
1,414✔
2222
  bool    updateAllVgroups = false;
1,414✔
2223

2224
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
1,414✔
2225
  if (old != 0) {
1,414!
2226
    mDebug("still in checking node change");
×
2227
    return 0;
×
2228
  }
2229

2230
  mDebug("start to do node changing check");
1,414✔
2231

2232
  streamMutexLock(&execInfo.lock);
1,414✔
2233
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,414✔
2234
  streamMutexUnlock(&execInfo.lock);
1,414✔
2235

2236
  if (numOfNodes == 0) {
1,414!
2237
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2238
    execInfo.ts = ts;
×
2239
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2240
    return 0;
×
2241
  }
2242

2243
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
1,414✔
2244
  if (code) {
1,414!
2245
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2246
  }
2247

2248
  if (!allReady) {
1,414✔
2249
    taosArrayDestroy(pNodeSnapshot);
24✔
2250
    atomic_store_32(&mndNodeCheckSentinel, 0);
24✔
2251
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
24!
2252
    return 0;
24✔
2253
  }
2254

2255
  streamMutexLock(&execInfo.lock);
1,390✔
2256

2257
  code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
1,390✔
2258
  if (code) {
1,390!
2259
    goto _end;
×
2260
  }
2261

2262
  SVgroupChangeInfo changeInfo = {0};
1,390✔
2263
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
1,390✔
2264
  if (code) {
1,390!
2265
    goto _end;
×
2266
  }
2267

2268
  {
2269
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
1,390!
2270
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2271
      updateAllVgroups = true;
×
2272
      execInfo.switchFromFollower = false;  // reset the flag
×
2273
      addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
×
2274
    }
2275
  }
2276

2277
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
1,390!
2278
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2279
    killAllCheckpointTrans(pMnode, &changeInfo);
7✔
2280
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
7✔
2281

2282
    // keep the new vnode snapshot if success
2283
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
7!
2284
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
7✔
2285
      if (code) {
7!
2286
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
2287
        goto _end;
×
2288
      }
2289

2290
      execInfo.ts = ts;
7✔
2291
      mDebug("create trans successfully, update cached node list, numOfNodes:%d",
7✔
2292
             (int)taosArrayGetSize(execInfo.pNodeList));
2293
    } else {
2294
      mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
×
2295
    }
2296
  } else {
2297
    mDebug("no update found in nodeList");
1,383✔
2298
  }
2299

2300
  mndDestroyVgroupChangeInfo(&changeInfo);
1,390✔
2301

2302
_end:
1,390✔
2303
  streamMutexUnlock(&execInfo.lock);
1,390✔
2304
  taosArrayDestroy(pNodeSnapshot);
1,390✔
2305

2306
  mDebug("end to do stream task node change checking");
1,390✔
2307
  atomic_store_32(&mndNodeCheckSentinel, 0);
1,390✔
2308
  return 0;
1,390✔
2309
}
2310

2311
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
2,787✔
2312
  SMnode *pMnode = pReq->info.node;
2,787✔
2313
  SSdb   *pSdb = pMnode->pSdb;
2,787✔
2314
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
2,787✔
2315
    return 0;
1,373✔
2316
  }
2317

2318
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
1,414✔
2319
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
1,414✔
2320
  if (pMsg == NULL) {
1,414!
2321
    return terrno;
×
2322
  }
2323

2324
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
1,414✔
2325
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1,414✔
2326
}
2327

2328
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
2,034✔
2329
  SStreamTaskIter *pIter = NULL;
2,034✔
2330
  int32_t          code = createStreamTaskIter(pStream, &pIter);
2,034✔
2331
  if (code) {
2,034!
2332
    mError("failed to create task iter for stream:%s", pStream->name);
×
2333
    return;
×
2334
  }
2335

2336
  while (streamTaskIterNextTask(pIter)) {
12,178✔
2337
    SStreamTask *pTask = NULL;
10,144✔
2338
    code = streamTaskIterGetCurrent(pIter, &pTask);
10,144✔
2339
    if (code) {
10,144!
2340
      break;
×
2341
    }
2342

2343
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
10,144✔
2344
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
10,144✔
2345
    if (p == NULL) {
10,144✔
2346
      STaskStatusEntry entry = {0};
9,288✔
2347
      streamTaskStatusInit(&entry, pTask);
9,288✔
2348

2349
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
9,288✔
2350
      if (code == 0) {
9,288!
2351
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
9,288✔
2352
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
9,288✔
2353
        if (px) {
9,288!
2354
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
9,288!
2355
        } else {
2356
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2357
        }
2358
      } else {
2359
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2360
      }
2361

2362
      // add the new vgroups if not added yet
2363
      bool exist = false;
9,288✔
2364
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
47,022✔
2365
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
45,160✔
2366
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
45,160!
2367
          exist = true;
7,426✔
2368
          break;
7,426✔
2369
        }
2370
      }
2371

2372
      if (!exist) {
9,288✔
2373
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
1,862✔
2374
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
1,862✔
2375

2376
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
1,862✔
2377
        if (px) {
1,862!
2378
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
1,862!
2379
        } else {
2380
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2381
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2382
        }
2383
      }
2384
    }
2385
  }
2386

2387
  destroyStreamTaskIter(pIter);
2,034✔
2388
}
2389

2390
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
4,451✔
2391
  int32_t num = taosArrayGetSize(pList);
4,451✔
2392
  for (int32_t i = 0; i < num; ++i) {
16,381✔
2393
    int32_t *pId = taosArrayGet(pList, i);
11,932✔
2394
    if (pId == NULL) {
11,932!
2395
      continue;
×
2396
    }
2397

2398
    if (taskId == *pId) {
11,932✔
2399
      return;
2✔
2400
    }
2401
  }
2402

2403
  int32_t numOfTasks = taosArrayGetSize(pList);
4,449✔
2404
  void   *p = taosArrayPush(pList, &taskId);
4,449✔
2405
  if (p) {
4,449!
2406
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
4,449✔
2407
  } else {
2408
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2409
           uid, numOfTasks);
2410
  }
2411
}
2412

2413
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
4,451✔
2414
  SMnode                  *pMnode = pReq->info.node;
4,451✔
2415
  SStreamTaskCheckpointReq req = {0};
4,451✔
2416

2417
  SDecoder decoder = {0};
4,451✔
2418
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,451✔
2419

2420
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
4,451!
2421
    tDecoderClear(&decoder);
×
2422
    mError("invalid task checkpoint req msg received");
×
2423
    return TSDB_CODE_INVALID_MSG;
×
2424
  }
2425
  tDecoderClear(&decoder);
4,451✔
2426

2427
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
4,451✔
2428

2429
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2430
  streamMutexLock(&execInfo.lock);
4,451✔
2431

2432
  SStreamObj *pStream = NULL;
4,451✔
2433
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,451✔
2434
  if (pStream == NULL || code != 0) {
4,451!
2435
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2436
          req.streamId);
2437

2438
    // not in meta-store yet, try to acquire the task in exec buffer
2439
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2440
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2441
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2442
    if (p == NULL) {
×
2443
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2444
      streamMutexUnlock(&execInfo.lock);
×
2445
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2446
    } else {
2447
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2448
             req.streamId, req.taskId);
2449
    }
2450
  }
2451

2452
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,451!
2453

2454
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
4,451✔
2455
  if (pReqTaskList == NULL) {
4,451✔
2456
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
800✔
2457
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
800✔
2458
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
800✔
2459
    if (code) {
800!
2460
      mError("failed to put into transfer state stream map, code: out of memory");
×
2461
    }
2462
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
800✔
2463
  } else {
2464
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
3,651✔
2465
  }
2466

2467
  int32_t total = taosArrayGetSize(*pReqTaskList);
4,451✔
2468
  if (total == numOfTasks) {  // all tasks have sent the reqs
4,451✔
2469
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
735✔
2470
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
735!
2471

2472
    if (pStream != NULL) {  // TODO:handle error
735!
2473
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
735✔
2474
      if (code) {
735!
2475
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
735!
2476
      }
2477
    } else {
2478
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2479
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2480
      // sleep(500ms)
2481
    }
2482

2483
    // remove this entry
2484
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
735✔
2485

2486
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
735✔
2487
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
735✔
2488
  }
2489

2490
  if (pStream != NULL) {
4,451!
2491
    mndReleaseStream(pMnode, pStream);
4,451✔
2492
  }
2493

2494
  streamMutexUnlock(&execInfo.lock);
4,451✔
2495

2496
  {
2497
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
4,451✔
2498
    rsp.pCont = rpcMallocCont(rsp.contLen);
4,451✔
2499
    if (rsp.pCont == NULL) {
4,451!
2500
      return terrno;
×
2501
    }
2502

2503
    SMsgHead *pHead = rsp.pCont;
4,451✔
2504
    pHead->vgId = htonl(req.nodeId);
4,451✔
2505

2506
    tmsgSendRsp(&rsp);
4,451✔
2507
    pReq->info.handle = NULL;  // disable auto rsp
4,451✔
2508
  }
2509

2510
  return 0;
4,451✔
2511
}
2512

2513
// valid the info according to the HbMsg
2514
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
6,066✔
2515
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
6,066✔
2516
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
6,066✔
2517
  if (pTaskEntry == NULL) {
6,066✔
2518
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
15!
2519
    return false;
15✔
2520
  }
2521

2522
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
6,051!
2523
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2524
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2525
    return false;
×
2526
  }
2527

2528
  // now the task in checkpoint procedure
2529
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
6,051!
2530
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2531
           " discard",
2532
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2533
    return false;
×
2534
  }
2535

2536
  if (reportChkptId >= pReport->checkpointId) {
6,051!
2537
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2538
           " discard",
2539
           pReport->taskId, pReport->checkpointId, reportChkptId);
2540
    return false;
×
2541
  }
2542

2543
  return true;
6,051✔
2544
}
2545

2546
static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) {
6,066✔
2547
  bool valid = validateChkptReport(pReport, reportedChkptId);
6,066✔
2548
  if (!valid) {
6,066✔
2549
    return;
15✔
2550
  }
2551

2552
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
20,724✔
2553
    STaskChkptInfo *p = taosArrayGet(pList, i);
14,673✔
2554
    if (p == NULL) {
14,673!
2555
      continue;
×
2556
    }
2557

2558
    if (p->taskId == pReport->taskId) {
14,673!
2559
      if (p->checkpointId > pReport->checkpointId) {
×
2560
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2561
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2562
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2563
        mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2564
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2565

2566
        // update the checkpoint report info
2567
        p->checkpointId = pReport->checkpointId;
×
2568
        p->ts = pReport->checkpointTs;
×
2569
        p->version = pReport->checkpointVer;
×
2570
        p->transId = pReport->transId;
×
2571
        p->dropHTask = pReport->dropHTask;
×
2572
      } else {
2573
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2574
      }
2575
      return;
×
2576
    }
2577
  }
2578

2579
  STaskChkptInfo info = {
6,051✔
2580
      .streamId = pReport->streamId,
6,051✔
2581
      .taskId = pReport->taskId,
6,051✔
2582
      .transId = pReport->transId,
6,051✔
2583
      .dropHTask = pReport->dropHTask,
6,051✔
2584
      .version = pReport->checkpointVer,
6,051✔
2585
      .ts = pReport->checkpointTs,
6,051✔
2586
      .checkpointId = pReport->checkpointId,
6,051✔
2587
      .nodeId = pReport->nodeId,
6,051✔
2588
  };
2589

2590
  void *p = taosArrayPush(pList, &info);
6,051✔
2591
  if (p == NULL) {
6,051!
2592
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2593
  } else {
2594
    int32_t size = taosArrayGetSize(pList);
6,051✔
2595
    mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report",
6,051✔
2596
           pReport->streamId, pReport->taskId, size);
2597
  }
2598
}
2599

2600
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
6,066✔
2601
  SMnode           *pMnode = pReq->info.node;
6,066✔
2602
  SCheckpointReport req = {0};
6,066✔
2603

2604
  SDecoder decoder = {0};
6,066✔
2605
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
6,066✔
2606

2607
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
6,066!
2608
    tDecoderClear(&decoder);
×
2609
    mError("invalid task checkpoint-report msg received");
×
2610
    return TSDB_CODE_INVALID_MSG;
×
2611
  }
2612
  tDecoderClear(&decoder);
6,066✔
2613

2614
  streamMutexLock(&execInfo.lock);
6,066✔
2615
  mndInitStreamExecInfo(pMnode, &execInfo);
6,066✔
2616
  streamMutexUnlock(&execInfo.lock);
6,066✔
2617

2618
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
6,066✔
2619
         " checkpointVer:%" PRId64 " transId:%d",
2620
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2621

2622
  // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans.
2623
  streamMutexLock(&execInfo.lock);
6,066✔
2624

2625
  SStreamObj *pStream = NULL;
6,066✔
2626
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
6,066✔
2627
  if (pStream == NULL || code != 0) {
6,066!
2628
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2629

2630
    // not in meta-store yet, try to acquire the task in exec buffer
2631
    // the checkpoint req arrives too soon before the completion of the creation of stream trans.
2632
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2633
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2634
    if (p == NULL) {
×
2635
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2636
      streamMutexUnlock(&execInfo.lock);
×
2637
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2638
    } else {
2639
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2640
             req.streamId, req.taskId);
2641
    }
2642
  }
2643

2644
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
6,066!
2645

2646
  SChkptReportInfo *pInfo =
2647
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
6,066✔
2648
  if (pInfo == NULL) {
6,066✔
2649
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
735✔
2650
    if (info.pTaskList != NULL) {
735!
2651
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
735✔
2652
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
735✔
2653
      if (code) {
735!
2654
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2655
      }
2656

2657
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
735✔
2658
    }
2659
  } else {
2660
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
5,331✔
2661
  }
2662

2663
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
6,066✔
2664
  if (total == numOfTasks) {  // all tasks have sent the reqs
6,066✔
2665
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
1,249!
2666
          " will be issued soon",
2667
          req.streamId, pStream->name, total, req.checkpointId);
2668
  }
2669

2670
  if (pStream != NULL) {
6,066!
2671
    mndReleaseStream(pMnode, pStream);
6,066✔
2672
  }
2673

2674
  streamMutexUnlock(&execInfo.lock);
6,066✔
2675

2676
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
6,066✔
2677
  return code;
6,066✔
2678
}
2679

2680
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
207✔
2681
  int32_t num = 0;
207✔
2682
  int64_t chkId = INT64_MAX;
207✔
2683
  *pExistedTasks = 0;
207✔
2684
  *pAllSame = true;
207✔
2685

2686
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
6,632✔
2687
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
6,425✔
2688
    if (p == NULL) {
6,425!
2689
      continue;
×
2690
    }
2691

2692
    if (p->streamId != streamId) {
6,425✔
2693
      continue;
5,060✔
2694
    }
2695

2696
    num += 1;
1,365✔
2697
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
1,365✔
2698
    if (chkId > pe->checkpointInfo.latestId) {
1,365✔
2699
      if (chkId != INT64_MAX) {
215✔
2700
        *pAllSame = false;
8✔
2701
      }
2702
      chkId = pe->checkpointInfo.latestId;
215✔
2703
    }
2704
  }
2705

2706
  *pExistedTasks = num;
207✔
2707
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
207!
2708
    return -1;
×
2709
  }
2710

2711
  return chkId;
207✔
2712
}
2713

2714
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
6,066✔
2715
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
6,066✔
2716
  rsp.pCont = rpcMallocCont(rsp.contLen);
6,066✔
2717
  if (rsp.pCont != NULL) {
6,066!
2718
    SMsgHead *pHead = rsp.pCont;
6,066✔
2719
    pHead->vgId = htonl(vgId);
6,066✔
2720

2721
    tmsgSendRsp(&rsp);
6,066✔
2722
    pInfo->handle = NULL;  // disable auto rsp
6,066✔
2723
  }
2724
}
6,066✔
2725

2726
static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) {
43✔
2727
  int32_t alreadySend = taosArrayGetSize(pList);
43✔
2728

2729
  for (int32_t i = 0; i < alreadySend; ++i) {
242✔
2730
    int32_t *taskId = taosArrayGet(pList, i);
199✔
2731
    if (taskId == NULL) {
199!
2732
      continue;
×
2733
    }
2734

2735
    for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
199!
2736
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
199✔
2737
      if ((pe != NULL) && (pe->req.taskId == *taskId)) {
199!
2738
        taosArrayRemove(pInfo->pTaskList, k);
199✔
2739
        break;
199✔
2740
      }
2741
    }
2742
  }
2743

2744
  return alreadySend;
43✔
2745
}
2746

2747
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
12,657✔
2748
  SMnode *pMnode = pMsg->info.node;
12,657✔
2749
  int64_t now = taosGetTimestampMs();
12,657✔
2750
  bool    allReady = true;
12,657✔
2751
  SArray *pNodeSnapshot = NULL;
12,657✔
2752
  int32_t maxAllowedTrans = 50;
12,657✔
2753
  int32_t numOfTrans = 0;
12,657✔
2754
  int32_t code = 0;
12,657✔
2755
  void   *pIter = NULL;
12,657✔
2756

2757
  SArray *pList = taosArrayInit(4, sizeof(int32_t));
12,657✔
2758
  if (pList == NULL) {
12,657!
2759
    return terrno;
×
2760
  }
2761

2762
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
12,657✔
2763
  if (pStreamList == NULL) {
12,657!
2764
    taosArrayDestroy(pList);
×
2765
    return terrno;
×
2766
  }
2767

2768
  mDebug("start to process consensus-checkpointId in tmr");
12,657✔
2769

2770
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
12,657✔
2771
  taosArrayDestroy(pNodeSnapshot);
12,657✔
2772
  if (code) {
12,657✔
2773
    mError("failed to get the vgroup snapshot, ignore it and continue");
119!
2774
  }
2775

2776
  if (!allReady) {
12,657✔
2777
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1,231!
2778
    taosArrayDestroy(pStreamList);
1,231✔
2779
    taosArrayDestroy(pList);
1,231✔
2780
    return 0;
1,231✔
2781
  }
2782

2783
  streamMutexLock(&execInfo.lock);
11,426✔
2784

2785
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
11,468✔
2786
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
43✔
2787

2788
    taosArrayClear(pList);
43✔
2789

2790
    int64_t     streamId = -1;
43✔
2791
    int32_t     num = taosArrayGetSize(pInfo->pTaskList);
43✔
2792
    SStreamObj *pStream = NULL;
43✔
2793

2794
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
43✔
2795
    if (pStream == NULL || code != 0) {  // stream has been dropped already
43!
2796
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2797
      void *p = taosArrayPush(pStreamList, &pInfo->streamId);
×
2798
      if (p == NULL) {
×
2799
        mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
×
2800
               " code:%s, continue",
2801
               pInfo->streamId, tstrerror(terrno));
2802
      }
2803
      continue;
×
2804
    }
2805

2806
    for (int32_t j = 0; j < num; ++j) {
250✔
2807
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
207✔
2808
      if (pe == NULL) {
207!
2809
        continue;
×
2810
      }
2811

2812
      if (streamId == -1) {
207✔
2813
        streamId = pe->req.streamId;
43✔
2814
      }
2815

2816
      int32_t existed = 0;
207✔
2817
      bool    allSame = true;
207✔
2818
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
207✔
2819
      if (chkId == -1) {
207!
2820
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2821
               pInfo->numOfTasks, pe->req.taskId);
2822
        break;
×
2823
      }
2824

2825
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
406✔
2826
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
199✔
2827
               pe->req.startTs, (now - pe->ts) / 1000.0);
2828
        if (chkId > pe->req.checkpointId) {
199!
2829
          streamMutexUnlock(&execInfo.lock);
×
2830
          taosArrayDestroy(pStreamList);
×
2831
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2832
                 pe->req.checkpointId, chkId);
2833

2834
          mndReleaseStream(pMnode, pStream);
×
2835
          taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
2836
          return TSDB_CODE_FAILED;
×
2837
        }
2838

2839
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
199✔
2840
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
199!
2841
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2842
        }
2843

2844
        void *p = taosArrayPush(pList, &pe->req.taskId);
199✔
2845
        if (p == NULL) {
199!
2846
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2847
        }
2848
      } else {
2849
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
8!
2850
               pe->req.startTs, (now - pe->ts) / 1000.0);
2851
      }
2852
    }
2853

2854
    mndReleaseStream(pMnode, pStream);
43✔
2855

2856
    int32_t alreadySend = doCleanReqList(pList, pInfo);
43✔
2857

2858
    // clear request stream item with empty task list
2859
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
43✔
2860
      mndClearConsensusRspEntry(pInfo);
41✔
2861
      if (streamId == -1) {
41!
2862
        mError("streamId is -1, streamId:%" PRIx64" in consensus-checkpointId hashMap, cont", pInfo->streamId);
×
2863
      }
2864

2865
      void *p = taosArrayPush(pStreamList, &streamId);
41✔
2866
      if (p == NULL) {
41!
2867
        mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId);
×
2868
      }
2869
    }
2870

2871
    numOfTrans += alreadySend;
43✔
2872
    if (numOfTrans > maxAllowedTrans) {
43✔
2873
      mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend);
1!
2874
      taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
1✔
2875
      break;
1✔
2876
    }
2877
  }
2878

2879
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
11,467✔
2880
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
41✔
2881
    if (pStreamId == NULL) {
41!
2882
      continue;
×
2883
    }
2884

2885
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
41✔
2886
  }
2887

2888
  streamMutexUnlock(&execInfo.lock);
11,426✔
2889

2890
  taosArrayDestroy(pStreamList);
11,426✔
2891
  taosArrayDestroy(pList);
11,426✔
2892

2893
  mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
11,426✔
2894
  return code;
11,426✔
2895
}
2896

2897
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
259✔
2898
  int32_t code = mndProcessCreateStreamReq(pReq);
259✔
2899
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
259!
2900
    pReq->info.rsp = rpcMallocCont(1);
×
2901
    if (pReq->info.rsp == NULL) {
×
2902
      return terrno;
×
2903
    }
2904

2905
    pReq->info.rspLen = 1;
×
2906
    pReq->info.noResp = false;
×
2907
    pReq->code = code;
×
2908
  }
2909
  return code;
259✔
2910
}
2911

2912
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
224✔
2913
  int32_t code = mndProcessDropStreamReq(pReq);
224✔
2914
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
224!
2915
    pReq->info.rsp = rpcMallocCont(1);
20✔
2916
    if (pReq->info.rsp == NULL) {
20!
2917
      return terrno;
×
2918
    }
2919

2920
    pReq->info.rspLen = 1;
20✔
2921
    pReq->info.noResp = false;
20✔
2922
    pReq->code = code;
20✔
2923
  }
2924
  return code;
224✔
2925
}
2926

2927
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
53,918✔
2928
  if (pExecInfo->initTaskList || pMnode == NULL) {
53,918✔
2929
    return;
53,756✔
2930
  }
2931

2932
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
162✔
2933
  pExecInfo->initTaskList = true;
162✔
2934
}
2935

2936
void mndStreamResetInitTaskListLoadFlag() {
1,687✔
2937
  mInfo("reset task list buffer init flag for leader");
1,687!
2938
  execInfo.initTaskList = false;
1,687✔
2939
}
1,687✔
2940

2941
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
2,001✔
2942
  execInfo.switchFromFollower = false;
2,001✔
2943

2944
  if (execInfo.role == NODE_ROLE_UNINIT) {
2,001✔
2945
    execInfo.role = role;
1,821✔
2946
    if (role == NODE_ROLE_LEADER) {
1,821✔
2947
      mInfo("init mnode is set to leader");
1,634!
2948
    } else {
2949
      mInfo("init mnode is set to follower");
187!
2950
    }
2951
  } else {
2952
    if (role == NODE_ROLE_LEADER) {
180✔
2953
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
53!
2954
        execInfo.role = role;
53✔
2955
        execInfo.switchFromFollower = true;
53✔
2956
        mInfo("mnode switch to be leader from follower");
53!
2957
      } else {
2958
        mInfo("mnode remain to be leader, do nothing");
×
2959
      }
2960
    } else {  // follower's
2961
      if (execInfo.role == NODE_ROLE_LEADER) {
127✔
2962
        execInfo.role = role;
3✔
2963
        mInfo("mnode switch to be follower from leader");
3!
2964
      } else {
2965
        mInfo("mnode remain to be follower, do nothing");
124!
2966
      }
2967
    }
2968
  }
2969
}
2,001✔
2970

2971
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
162✔
2972
  SSdb       *pSdb = pMnode->pSdb;
162✔
2973
  SStreamObj *pStream = NULL;
162✔
2974
  void       *pIter = NULL;
162✔
2975

2976
  while (1) {
2977
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
440✔
2978
    if (pIter == NULL) {
440✔
2979
      break;
162✔
2980
    }
2981

2982
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
278✔
2983
    sdbRelease(pSdb, pStream);
278✔
2984
  }
2985
}
162✔
2986

2987
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
1,107✔
2988
  STrans *pTrans = NULL;
1,107✔
2989
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
1,107✔
2990
                               "update checkpoint-info", &pTrans);
2991
  if (pTrans == NULL || code) {
1,107!
2992
    sdbRelease(pMnode->pSdb, pStream);
×
2993
    return code;
×
2994
  }
2995

2996
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
1,107✔
2997
  if (code) {
1,107!
2998
    sdbRelease(pMnode->pSdb, pStream);
×
2999
    mndTransDrop(pTrans);
×
3000
    return code;
×
3001
  }
3002

3003
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
1,107✔
3004
  if (code) {
1,107!
3005
    sdbRelease(pMnode->pSdb, pStream);
×
3006
    mndTransDrop(pTrans);
×
3007
    return code;
×
3008
  }
3009

3010
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,107✔
3011
  if (code) {
1,107!
3012
    sdbRelease(pMnode->pSdb, pStream);
×
3013
    mndTransDrop(pTrans);
×
3014
    return code;
×
3015
  }
3016

3017
  code = mndTransPrepare(pMnode, pTrans);
1,107✔
3018
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,107!
3019
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
3020
    sdbRelease(pMnode->pSdb, pStream);
×
3021
    mndTransDrop(pTrans);
×
3022
    return code;
×
3023
  }
3024

3025
  sdbRelease(pMnode->pSdb, pStream);
1,107✔
3026
  mndTransDrop(pTrans);
1,107✔
3027

3028
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,107✔
3029
}
3030

3031
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
2✔
3032
  SMnode      *pMnode = pReq->info.node;
2✔
3033
  int32_t      code = 0;
2✔
3034
  SOrphanTask *pTask = NULL;
2✔
3035
  int32_t      i = 0;
2✔
3036
  STrans      *pTrans = NULL;
2✔
3037
  int32_t      numOfTasks = 0;
2✔
3038

3039
  SMStreamDropOrphanMsg msg = {0};
2✔
3040
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
2✔
3041
  if (code) {
2!
3042
    return code;
×
3043
  }
3044

3045
  numOfTasks = taosArrayGetSize(msg.pList);
2✔
3046
  if (numOfTasks == 0) {
2!
3047
    mDebug("no orphan tasks to drop, no need to create trans");
×
3048
    goto _err;
×
3049
  }
3050

3051
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
2!
3052

3053
  i = 0;
2✔
3054
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
2!
3055
    i += 1;
×
3056
  }
3057

3058
  if (pTask == NULL) {
2!
3059
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
3060
    goto _err;
×
3061
  }
3062

3063
  // check if it is conflict with other trans in both sourceDb and targetDb.
3064
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
2✔
3065
  if (code) {
2!
3066
    goto _err;
×
3067
  }
3068

3069
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
2✔
3070

3071
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
2✔
3072
  if (pTrans == NULL || code != 0) {
2!
3073
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3074
    goto _err;
×
3075
  }
3076

3077
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
2✔
3078
  if (code) {
2!
3079
    goto _err;
×
3080
  }
3081

3082
  // drop all tasks
3083
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
2!
3084
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3085
    goto _err;
×
3086
  }
3087

3088
  // drop stream
3089
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
2!
3090
    goto _err;
×
3091
  }
3092

3093
  code = mndTransPrepare(pMnode, pTrans);
2✔
3094
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
3095
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
3096
    goto _err;
×
3097
  }
3098

3099
_err:
2✔
3100
  tDestroyDropOrphanTaskMsg(&msg);
2✔
3101
  mndTransDrop(pTrans);
2✔
3102

3103
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
3104
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
2!
3105
  }
3106
  return code;
2✔
3107
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc