• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3572

02 Jan 2025 08:57AM UTC coverage: 63.077% (-0.2%) from 63.276%
#3572

push

travis-ci

web-flow
Merge pull request #29450 from taosdata/fix/TS-5651-skip-sync-heartbeat

fix:[TS-5651]skip-sync-heartbeat

139525 of 284348 branches covered (49.07%)

Branch coverage included in aggregate %.

217427 of 281548 relevant lines covered (77.23%)

18571459.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.11
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndStream.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
44

45
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
46
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
47

48
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
49
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
51
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
53
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
54
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
55
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
56
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
57
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
58
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
59
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
60
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
61
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
62
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
63
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
64
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
65
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
66
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
67

68
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
69
static void     removeExpiredNodeInfo(const SArray *pNodeSnapshot);
70
static int32_t  doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
71
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
72

73
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
74
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
75
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
77
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
78

79
int32_t mndInitStream(SMnode *pMnode) {
1,732✔
80
  SSdbTable table = {
1,732✔
81
      .sdbType = SDB_STREAM,
82
      .keyType = SDB_KEY_BINARY,
83
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
84
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
85
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
86
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
87
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
88
  };
89
  SSdbTable tableSeq = {
1,732✔
90
      .sdbType = SDB_STREAM_SEQ,
91
      .keyType = SDB_KEY_BINARY,
92
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
93
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
94
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
95
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
96
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
97
  };
98

99
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,732✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,732✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
1,732✔
102

103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
1,732✔
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
1,732✔
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
1,732✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
1,732✔
107
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
1,732✔
108
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
1,732✔
109
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
1,732✔
110
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
1,732✔
111
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
1,732✔
112

113
  // for msgs inside mnode
114
  // TODO change the name
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
1,732✔
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
1,732✔
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
1,732✔
118
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
1,732✔
119

120
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
1,732✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
1,732✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
1,732✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
1,732✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
1,732✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
1,732✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
1,732✔
127
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
1,732✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
1,732✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
1,732✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
1,732✔
131

132
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
1,732✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
1,732✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
1,732✔
135

136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,732✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,732✔
138
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,732✔
139
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,732✔
140

141
  int32_t code = mndInitExecInfo();
1,732✔
142
  if (code) {
1,732!
143
    return code;
×
144
  }
145

146
  code = sdbSetTable(pMnode->pSdb, table);
1,732✔
147
  if (code) {
1,732!
148
    return code;
×
149
  }
150

151
  code = sdbSetTable(pMnode->pSdb, tableSeq);
1,732✔
152
  return code;
1,732✔
153
}
154

155
void mndCleanupStream(SMnode *pMnode) {
1,731✔
156
  taosArrayDestroy(execInfo.pTaskList);
1,731✔
157
  taosArrayDestroy(execInfo.pNodeList);
1,731✔
158
  taosArrayDestroy(execInfo.pKilledChkptTrans);
1,731✔
159
  taosHashCleanup(execInfo.pTaskMap);
1,731✔
160
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
1,731✔
161
  taosHashCleanup(execInfo.pTransferStateStreams);
1,731✔
162
  taosHashCleanup(execInfo.pChkptStreams);
1,731✔
163
  taosHashCleanup(execInfo.pStreamConsensus);
1,731✔
164
  (void)taosThreadMutexDestroy(&execInfo.lock);
1,731✔
165
  mDebug("mnd stream exec info cleanup");
1,731✔
166
}
1,731✔
167

168
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
6,589✔
169
  int32_t     code = 0;
6,589✔
170
  int32_t     lino = 0;
6,589✔
171
  SSdbRow    *pRow = NULL;
6,589✔
172
  SStreamObj *pStream = NULL;
6,589✔
173
  void       *buf = NULL;
6,589✔
174
  int8_t      sver = 0;
6,589✔
175
  int32_t     tlen;
176
  int32_t     dataPos = 0;
6,589✔
177

178
  code = sdbGetRawSoftVer(pRaw, &sver);
6,589✔
179
  TSDB_CHECK_CODE(code, lino, _over);
6,589!
180

181
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
6,589!
182
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
183
    goto _over;
×
184
  }
185

186
  pRow = sdbAllocRow(sizeof(SStreamObj));
6,589✔
187
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
6,589!
188

189
  pStream = sdbGetRowObj(pRow);
6,589✔
190
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
6,589!
191

192
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
6,589!
193

194
  buf = taosMemoryMalloc(tlen + 1);
6,589!
195
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,589!
196

197
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,589!
198

199
  SDecoder decoder;
200
  tDecoderInit(&decoder, buf, tlen + 1);
6,589✔
201
  code = tDecodeSStreamObj(&decoder, pStream, sver);
6,589✔
202
  tDecoderClear(&decoder);
6,589✔
203

204
  if (code < 0) {
6,589!
205
    tFreeStreamObj(pStream);
×
206
  }
207

208
_over:
6,589✔
209
  taosMemoryFreeClear(buf);
6,589!
210

211
  if (code != TSDB_CODE_SUCCESS) {
6,589!
212
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
213
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
214
    taosMemoryFreeClear(pRow);
×
215

216
    terrno = code;
×
217
    return NULL;
×
218
  } else {
219
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
6,589✔
220
           pStream->checkpointId);
221

222
    terrno = 0;
6,589✔
223
    return pRow;
6,589✔
224
  }
225
}
226

227
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
1,837✔
228
  mTrace("stream:%s, perform insert action", pStream->name);
1,837✔
229
  return 0;
1,837✔
230
}
231

232
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
6,589✔
233
  mTrace("stream:%s, perform delete action", pStream->name);
6,589✔
234
  taosWLockLatch(&pStream->lock);
6,589✔
235
  tFreeStreamObj(pStream);
6,589✔
236
  taosWUnLockLatch(&pStream->lock);
6,589✔
237
  return 0;
6,589✔
238
}
239

240
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
3,377✔
241
  mTrace("stream:%s, perform update action", pOldStream->name);
3,377✔
242
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
3,377✔
243

244
  taosWLockLatch(&pOldStream->lock);
3,377✔
245

246
  pOldStream->status = pNewStream->status;
3,377✔
247
  pOldStream->updateTime = pNewStream->updateTime;
3,377✔
248
  pOldStream->checkpointId = pNewStream->checkpointId;
3,377✔
249
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
3,377✔
250

251
  taosWUnLockLatch(&pOldStream->lock);
3,377✔
252
  return 0;
3,377✔
253
}
254

255
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
6,880✔
256
  int32_t code = 0;
6,880✔
257
  SSdb   *pSdb = pMnode->pSdb;
6,880✔
258
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
6,880✔
259
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
6,880!
260
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
2,867✔
261
  }
262
  return code;
6,880✔
263
}
264

265
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
14,588✔
266
  SSdb *pSdb = pMnode->pSdb;
14,588✔
267
  sdbRelease(pSdb, pStream);
14,588✔
268
}
14,588✔
269

270
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
271
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
272
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
273
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
274
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
275

276
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
1,745✔
277
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
1,745!
278
      pCreate->targetStbFullName[0] == 0) {
1,745!
279
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
280
  }
281
  return TSDB_CODE_SUCCESS;
1,745✔
282
}
283

284
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
1,741✔
285
  pWrapper->nCols = taosArrayGetSize(pFields);
1,741✔
286
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
1,741!
287
  if (NULL == pWrapper->pSchema) {
1,741!
288
    return terrno;
×
289
  }
290

291
  int32_t index = 0;
1,741✔
292
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
62,404✔
293
    SField *pField = (SField *)taosArrayGet(pFields, i);
60,663✔
294
    if (pField == NULL) {
60,663!
295
      return terrno;
×
296
    }
297

298
    if (TSDB_DATA_TYPE_NULL == pField->type) {
60,663!
299
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
300
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
301
    } else {
302
      pWrapper->pSchema[index].type = pField->type;
60,663✔
303
      pWrapper->pSchema[index].bytes = pField->bytes;
60,663✔
304
    }
305
    pWrapper->pSchema[index].colId = index + 1;
60,663✔
306
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
60,663✔
307
    pWrapper->pSchema[index].flags = pField->flags;
60,663✔
308
    index += 1;
60,663✔
309
  }
310

311
  return TSDB_CODE_SUCCESS;
1,741✔
312
}
313

314
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
1,741✔
315
  if (pWrapper->nCols < 2) {
1,741!
316
    return false;
×
317
  }
318
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
60,844✔
319
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
59,130✔
320
      return true;
27✔
321
    }
322
  }
323
  return false;
1,714✔
324
}
325

326
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
1,741✔
327
  SNode      *pAst = NULL;
1,741✔
328
  SQueryPlan *pPlan = NULL;
1,741✔
329
  int32_t     code = 0;
1,741✔
330

331
  mInfo("stream:%s to create", pCreate->name);
1,741!
332
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
1,741✔
333
  pObj->createTime = taosGetTimestampMs();
1,741✔
334
  pObj->updateTime = pObj->createTime;
1,741✔
335
  pObj->version = 1;
1,741✔
336

337
  if (pCreate->smaId > 0) {
1,741✔
338
    pObj->subTableWithoutMd5 = 1;
259✔
339
  }
340

341
  pObj->smaId = pCreate->smaId;
1,741✔
342
  pObj->indexForMultiAggBalance = -1;
1,741✔
343

344
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,741✔
345

346
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
1,741✔
347
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
1,741✔
348

349
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,741✔
350
  pObj->status = 0;
1,741✔
351

352
  pObj->conf.igExpired = pCreate->igExpired;
1,741✔
353
  pObj->conf.trigger = pCreate->triggerType;
1,741✔
354
  pObj->conf.triggerParam = pCreate->maxDelay;
1,741✔
355
  pObj->conf.watermark = pCreate->watermark;
1,741✔
356
  pObj->conf.fillHistory = pCreate->fillHistory;
1,741✔
357
  pObj->deleteMark = pCreate->deleteMark;
1,741✔
358
  pObj->igCheckUpdate = pCreate->igUpdate;
1,741✔
359

360
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
1,741✔
361
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
1,741✔
362
  if (pSourceDb == NULL) {
1,741!
363
    code = terrno;
×
364
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
365
          tstrerror(code));
366
    goto FAIL;
×
367
  }
368

369
  pObj->sourceDbUid = pSourceDb->uid;
1,741✔
370
  mndReleaseDb(pMnode, pSourceDb);
1,741✔
371

372
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
1,741✔
373

374
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
1,741✔
375
  if (pTargetDb == NULL) {
1,741!
376
    code = terrno;
×
377
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
378
           tstrerror(code));
379
    goto FAIL;
×
380
  }
381

382
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
1,741✔
383

384
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
1,741✔
385
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,588✔
386
  } else {
387
    pObj->targetStbUid = pCreate->targetStbUid;
153✔
388
  }
389
  pObj->targetDbUid = pTargetDb->uid;
1,741✔
390
  mndReleaseDb(pMnode, pTargetDb);
1,741✔
391

392
  pObj->sql = pCreate->sql;
1,741✔
393
  pObj->ast = pCreate->ast;
1,741✔
394

395
  pCreate->sql = NULL;
1,741✔
396
  pCreate->ast = NULL;
1,741✔
397

398
  // deserialize ast
399
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
1,741!
400
    goto FAIL;
×
401
  }
402

403
  // create output schema
404
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
1,741!
405
    goto FAIL;
×
406
  }
407

408
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
1,741✔
409
  if (numOfNULL > 0) {
1,741✔
410
    pObj->outputSchema.nCols += numOfNULL;
26✔
411
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
26!
412
    if (!pFullSchema) {
26!
413
      code = terrno;
×
414
      goto FAIL;
×
415
    }
416

417
    int32_t nullIndex = 0;
26✔
418
    int32_t dataIndex = 0;
26✔
419
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
332✔
420
      if (nullIndex >= numOfNULL) {
306!
421
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
422
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
423
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
424
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
425
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
426
        dataIndex++;
×
427
      } else {
428
        SColLocation *pos = NULL;
306✔
429
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
306!
430
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
306✔
431
        }
432

433
        if (pos == NULL) {
306!
434
          mError("invalid null column index, %d", nullIndex);
×
435
          continue;
×
436
        }
437

438
        if (i < pos->slotId) {
306✔
439
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
79✔
440
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
79✔
441
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
79✔
442
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
79✔
443
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
79✔
444
          dataIndex++;
79✔
445
        } else {
446
          pFullSchema[i].bytes = 0;
227✔
447
          pFullSchema[i].colId = pos->colId;
227✔
448
          pFullSchema[i].flags = COL_SET_NULL;
227✔
449
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
227✔
450
          pFullSchema[i].type = pos->type;
227✔
451
          nullIndex++;
227✔
452
        }
453
      }
454
    }
455

456
    taosMemoryFree(pObj->outputSchema.pSchema);
26!
457
    pObj->outputSchema.pSchema = pFullSchema;
26✔
458
  }
459

460
  SPlanContext cxt = {
1,741✔
461
      .pAstRoot = pAst,
462
      .topicQuery = false,
463
      .streamQuery = true,
464
      .triggerType =
465
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
1,741✔
466
      .watermark = pObj->conf.watermark,
1,741✔
467
      .igExpired = pObj->conf.igExpired,
1,741✔
468
      .deleteMark = pObj->deleteMark,
1,741✔
469
      .igCheckUpdate = pObj->igCheckUpdate,
1,741✔
470
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
1,741✔
471
  };
472

473
  // using ast and param to build physical plan
474
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
1,741!
475
    goto FAIL;
×
476
  }
477

478
  // save physcial plan
479
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
1,741!
480
    goto FAIL;
×
481
  }
482

483
  pObj->tagSchema.nCols = pCreate->numOfTags;
1,741✔
484
  if (pCreate->numOfTags) {
1,741✔
485
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
284!
486
    if (pObj->tagSchema.pSchema == NULL) {
284!
487
      code = terrno;
×
488
      goto FAIL;
×
489
    }
490
  }
491

492
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
493
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
3,331✔
494
    SField *pField = taosArrayGet(pCreate->pTags, i);
1,590✔
495
    if (pField == NULL) {
1,590!
496
      continue;
×
497
    }
498

499
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
1,590✔
500
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
1,590✔
501
    pObj->tagSchema.pSchema[i].flags = pField->flags;
1,590✔
502
    pObj->tagSchema.pSchema[i].type = pField->type;
1,590✔
503
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
1,590✔
504
  }
505

506
FAIL:
1,741✔
507
  if (pAst != NULL) nodesDestroyNode(pAst);
1,741!
508
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
1,741!
509
  return code;
1,741✔
510
}
511

512
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
13,949✔
513
  SEncoder encoder;
514
  tEncoderInit(&encoder, NULL, 0);
13,949✔
515

516
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
13,949!
517
    pTask->ver = SSTREAM_TASK_VER;
×
518
  }
519

520
  int32_t code = tEncodeStreamTask(&encoder, pTask);
13,949✔
521
  if (code == -1) {
13,949!
522
    tEncoderClear(&encoder);
×
523
    return TSDB_CODE_INVALID_MSG;
×
524
  }
525

526
  int32_t size = encoder.pos;
13,949✔
527
  int32_t tlen = sizeof(SMsgHead) + size;
13,949✔
528
  tEncoderClear(&encoder);
13,949✔
529

530
  void *buf = taosMemoryCalloc(1, tlen);
13,949!
531
  if (buf == NULL) {
13,949!
532
    return terrno;
×
533
  }
534

535
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
13,949✔
536

537
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
13,949✔
538
  tEncoderInit(&encoder, abuf, size);
13,949✔
539
  code = tEncodeStreamTask(&encoder, pTask);
13,949✔
540
  tEncoderClear(&encoder);
13,949✔
541

542
  if (code != 0) {
13,949!
543
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
544
    taosMemoryFree(buf);
×
545
    return code;
×
546
  }
547

548
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
13,949✔
549
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
550
  if (code) {
13,949!
551
    taosMemoryFree(buf);
×
552
  }
553

554
  return code;
13,949✔
555
}
556

557
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
1,766✔
558
  SStreamTaskIter *pIter = NULL;
1,766✔
559
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,766✔
560
  if (code) {
1,766!
561
    mError("failed to create task iter for stream:%s", pStream->name);
×
562
    return code;
×
563
  }
564

565
  while (streamTaskIterNextTask(pIter)) {
10,889✔
566
    SStreamTask *pTask = NULL;
9,123✔
567
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,123✔
568
    if (code) {
9,123!
569
      destroyStreamTaskIter(pIter);
×
570
      return code;
×
571
    }
572

573
    code = mndPersistTaskDeployReq(pTrans, pTask);
9,123✔
574
    if (code) {
9,123!
575
      destroyStreamTaskIter(pIter);
×
576
      return code;
×
577
    }
578
  }
579

580
  destroyStreamTaskIter(pIter);
1,766✔
581

582
  // persistent stream task for already stored ts data
583
  if (pStream->conf.fillHistory) {
1,766✔
584
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
831✔
585

586
    for (int32_t i = 0; i < level; i++) {
2,553✔
587
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
1,722✔
588

589
      int32_t numOfTasks = taosArrayGetSize(pLevel);
1,722✔
590
      for (int32_t j = 0; j < numOfTasks; j++) {
6,548✔
591
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
4,826✔
592
        code = mndPersistTaskDeployReq(pTrans, pTask);
4,826✔
593
        if (code) {
4,826!
594
          return code;
×
595
        }
596
      }
597
    }
598
  }
599

600
  return code;
1,766✔
601
}
602

603
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
1,766✔
604
  int32_t code = 0;
1,766✔
605
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
1,766!
606
    return code;
×
607
  }
608

609
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,766✔
610
}
611

612
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
1,588✔
613
  SStbObj *pStb = NULL;
1,588✔
614
  SDbObj  *pDb = NULL;
1,588✔
615
  int32_t  code = 0;
1,588✔
616
  int32_t  lino = 0;
1,588✔
617

618
  SMCreateStbReq createReq = {0};
1,588✔
619
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,588✔
620
  createReq.numOfColumns = pStream->outputSchema.nCols;
1,588✔
621
  createReq.numOfTags = 1;  // group id
1,588✔
622
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
1,588✔
623
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
1,588!
624

625
  // build fields
626
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
60,595✔
627
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
59,007✔
628
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
59,007!
629

630
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
59,007✔
631
    pField->flags = pStream->outputSchema.pSchema[i].flags;
59,007✔
632
    pField->type = pStream->outputSchema.pSchema[i].type;
59,007✔
633
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
59,007✔
634
    pField->compress = createDefaultColCmprByType(pField->type);
59,007✔
635
  }
636

637
  if (pStream->tagSchema.nCols == 0) {
1,588✔
638
    createReq.numOfTags = 1;
1,304✔
639
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
1,304✔
640
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
1,304!
641

642
    // build tags
643
    SField *pField = taosArrayGet(createReq.pTags, 0);
1,304✔
644
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
1,304!
645

646
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
1,304✔
647
    pField->type = TSDB_DATA_TYPE_UBIGINT;
1,304✔
648
    pField->flags = 0;
1,304✔
649
    pField->bytes = 8;
1,304✔
650
  } else {
651
    createReq.numOfTags = pStream->tagSchema.nCols;
284✔
652
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
284✔
653
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
284!
654

655
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
1,874✔
656
      SField *pField = taosArrayGet(createReq.pTags, i);
1,590✔
657
      if (pField == NULL) {
1,590!
658
        continue;
×
659
      }
660

661
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
1,590✔
662
      pField->flags = pStream->tagSchema.pSchema[i].flags;
1,590✔
663
      pField->type = pStream->tagSchema.pSchema[i].type;
1,590✔
664
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
1,590✔
665
    }
666
  }
667

668
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
1,588!
669
    goto _OVER;
×
670
  }
671

672
  pStb = mndAcquireStb(pMnode, createReq.name);
1,588✔
673
  if (pStb != NULL) {
1,588!
674
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
675
    goto _OVER;
×
676
  }
677

678
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
1,588✔
679
  if (pDb == NULL) {
1,588!
680
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
681
    goto _OVER;
×
682
  }
683

684
  int32_t numOfStbs = -1;
1,588✔
685
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
1,588!
686
    goto _OVER;
×
687
  }
688

689
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
1,588!
690
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
691
    goto _OVER;
×
692
  }
693

694
  SStbObj stbObj = {0};
1,588✔
695

696
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
1,588!
697
    goto _OVER;
×
698
  }
699

700
  stbObj.uid = pStream->targetStbUid;
1,588✔
701

702
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
1,588!
703
    mndFreeStb(&stbObj);
×
704
    goto _OVER;
×
705
  }
706

707
  tFreeSMCreateStbReq(&createReq);
1,588✔
708
  mndFreeStb(&stbObj);
1,588✔
709
  mndReleaseStb(pMnode, pStb);
1,588✔
710
  mndReleaseDb(pMnode, pDb);
1,588✔
711
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
1,588✔
712
  return code;
1,588✔
713

714
_OVER:
×
715
  tFreeSMCreateStbReq(&createReq);
×
716
  mndReleaseStb(pMnode, pStb);
×
717
  mndReleaseDb(pMnode, pDb);
×
718

719
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
720
         tstrerror(code));
721
  return code;
×
722
}
723

724
// 1. stream number check
725
// 2. target stable can not be target table of other existed streams.
726
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
1,741✔
727
  int32_t     numOfStream = 0;
1,741✔
728
  SStreamObj *pStream = NULL;
1,741✔
729
  void       *pIter = NULL;
1,741✔
730

731
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,987✔
732
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
2,247✔
733
      ++numOfStream;
1,564✔
734
    }
735

736
    sdbRelease(pMnode->pSdb, pStream);
2,247✔
737

738
    if (numOfStream > MND_STREAM_MAX_NUM) {
2,247!
739
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
740
             pStreamObj->name);
741
      sdbCancelFetch(pMnode->pSdb, pIter);
×
742
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
743
    }
744

745
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
2,247✔
746
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
747
             pStreamObj->name);
748
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
749
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
750
    }
751
  }
752

753
  return TSDB_CODE_SUCCESS;
1,740✔
754
}
755

756
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
1,745✔
757
  SMnode     *pMnode = pReq->info.node;
1,745✔
758
  SStreamObj *pStream = NULL;
1,745✔
759
  SStreamObj  streamObj = {0};
1,745✔
760
  char       *sql = NULL;
1,745✔
761
  int32_t     sqlLen = 0;
1,745✔
762
  const char *pMsg = "create stream tasks on dnodes";
1,745✔
763
  int32_t     code = TSDB_CODE_SUCCESS;
1,745✔
764
  int32_t     lino = 0;
1,745✔
765
  STrans     *pTrans = NULL;
1,745✔
766

767
  SCMCreateStreamReq createReq = {0};
1,745✔
768
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
1,745✔
769
  TSDB_CHECK_CODE(code, lino, _OVER);
1,745!
770

771
#ifdef WINDOWS
772
  code = TSDB_CODE_MND_INVALID_PLATFORM;
773
  goto _OVER;
774
#endif
775

776
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
1,745!
777
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
1,745!
778
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
779
    goto _OVER;
×
780
  }
781

782
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
1,745✔
783
  if (pStream != NULL && code == 0) {
1,745!
784
    if (createReq.igExists) {
2✔
785
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
786
      mndReleaseStream(pMnode, pStream);
1✔
787
      tFreeSCMCreateStreamReq(&createReq);
1✔
788
      return code;
1✔
789
    } else {
790
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
791
      goto _OVER;
1✔
792
    }
793
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
1,743!
794
    goto _OVER;
×
795
  }
796

797
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
1,743!
798
    goto _OVER;
×
799
  }
800

801
  if (createReq.sql != NULL) {
1,743!
802
    sql = taosStrdup(createReq.sql);
1,743!
803
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
1,743!
804
  }
805

806
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
1,743✔
807
  if (pSourceDb == NULL) {
1,743!
808
    code = terrno;
×
809
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
810
          tstrerror(code));
811
    goto _OVER;
×
812
  }
813

814
  code = mndCheckForSnode(pMnode, pSourceDb);
1,743✔
815
  mndReleaseDb(pMnode, pSourceDb);
1,743✔
816
  if (code != 0) {
1,743✔
817
    goto _OVER;
2✔
818
  }
819

820
  // build stream obj from request
821
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
1,741!
822
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
823
    goto _OVER;
×
824
  }
825

826
  code = doStreamCheck(pMnode, &streamObj);
1,741✔
827
  TSDB_CHECK_CODE(code, lino, _OVER);
1,741✔
828

829
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
1,740✔
830
  if (pTrans == NULL || code) {
1,740!
831
    goto _OVER;
×
832
  }
833

834
  // create stb for stream
835
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
1,740✔
836
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
1,588!
837
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
838
      mndTransDrop(pTrans);
×
839
      goto _OVER;
×
840
    }
841
  } else {
842
    mDebug("stream:%s no need create stable", createReq.name);
152✔
843
  }
844

845
  // schedule stream task for stream obj
846
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
1,740✔
847
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,740!
848
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
849
    mndTransDrop(pTrans);
×
850
    goto _OVER;
×
851
  }
852

853
  // add stream to trans
854
  code = mndPersistStream(pTrans, &streamObj);
1,740✔
855
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,740!
856
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
857
    mndTransDrop(pTrans);
×
858
    goto _OVER;
×
859
  }
860

861
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
1,740!
862
    mndTransDrop(pTrans);
×
863
    goto _OVER;
×
864
  }
865

866
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
1,740!
867
    mndTransDrop(pTrans);
×
868
    goto _OVER;
×
869
  }
870

871
  // add into buffer firstly
872
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
873
  streamMutexLock(&execInfo.lock);
1,740✔
874
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
1,740✔
875
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
1,740✔
876
  streamMutexUnlock(&execInfo.lock);
1,740✔
877

878
  // execute creation
879
  code = mndTransPrepare(pMnode, pTrans);
1,740✔
880
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,740!
881
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
882
    mndTransDrop(pTrans);
×
883
    goto _OVER;
×
884
  }
885

886
  mndTransDrop(pTrans);
1,740✔
887

888
  SName dbname = {0};
1,740✔
889
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,740✔
890
  if (code) {
1,740!
891
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
892
    goto _OVER;
×
893
  }
894

895
  SName name = {0};
1,740✔
896
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
1,740✔
897
  if (code) {
1,740!
898
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
899
    goto _OVER;
×
900
  }
901

902
  // reuse this function for stream
903
  if (sql != NULL && sqlLen > 0) {
1,740!
904
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
905
  } else {
906
    char detail[1000] = {0};
1,740✔
907
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
1,740✔
908
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
1,740✔
909
  }
910

911
_OVER:
1,744✔
912
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,744!
913
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
4!
914
  } else {
915
    mDebug("stream:%s create stream completed", createReq.name);
1,740✔
916
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,740✔
917
  }
918

919
  mndReleaseStream(pMnode, pStream);
1,744✔
920
  tFreeSCMCreateStreamReq(&createReq);
1,744✔
921
  tFreeStreamObj(&streamObj);
1,744✔
922

923
  if (sql != NULL) {
1,744✔
924
    taosMemoryFreeClear(sql);
1,743!
925
  }
926

927
  return code;
1,744✔
928
}
929

930
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
931
  SMnode          *pMnode = pReq->info.node;
×
932
  SStreamObj      *pStream = NULL;
×
933
  int32_t          code = 0;
×
934
  SMPauseStreamReq pauseReq = {0};
×
935

936
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
937
    return TSDB_CODE_INVALID_MSG;
×
938
  }
939

940
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
941
  if (pStream == NULL || code != 0) {
×
942
    if (pauseReq.igNotExists) {
×
943
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
944
      return 0;
×
945
    } else {
946
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
947
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
948
    }
949
  }
950

951
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
952
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
953
    sdbRelease(pMnode->pSdb, pStream);
×
954
    return code;
×
955
  }
956

957
  // check if it is conflict with other trans in both sourceDb and targetDb.
958
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
959
  if (code) {
×
960
    sdbRelease(pMnode->pSdb, pStream);
×
961
    return code;
×
962
  }
963

964
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
965
  if (updated) {
×
966
    mError("tasks are not ready for restart, node update detected");
×
967
    sdbRelease(pMnode->pSdb, pStream);
×
968
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
969
  }
970

971
  STrans *pTrans = NULL;
×
972
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
973
                       &pTrans);
974
  if (pTrans == NULL || code) {
×
975
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
976
    sdbRelease(pMnode->pSdb, pStream);
×
977
    return code;
×
978
  }
979

980
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
981
  if (code) {
×
982
    sdbRelease(pMnode->pSdb, pStream);
×
983
    mndTransDrop(pTrans);
×
984
    return code;
×
985
  }
986

987
  // if nodeUpdate happened, not send pause trans
988
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
989
  if (code) {
×
990
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
991
    sdbRelease(pMnode->pSdb, pStream);
×
992
    mndTransDrop(pTrans);
×
993
    return code;
×
994
  }
995

996
  code = mndTransPrepare(pMnode, pTrans);
×
997
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
998
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
999
    sdbRelease(pMnode->pSdb, pStream);
×
1000
    mndTransDrop(pTrans);
×
1001
    return code;
×
1002
  }
1003

1004
  sdbRelease(pMnode->pSdb, pStream);
×
1005
  mndTransDrop(pTrans);
×
1006

1007
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1008
}
1009

1010
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
1,274✔
1011
  SStreamObj *pStream = NULL;
1,274✔
1012
  void       *pIter = NULL;
1,274✔
1013
  SSdb       *pSdb = pMnode->pSdb;
1,274✔
1014
  int64_t     maxChkptId = 0;
1,274✔
1015

1016
  while (1) {
1017
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4,410✔
1018
    if (pIter == NULL) break;
4,410✔
1019

1020
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
3,136✔
1021
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
3,136✔
1022
           pStream->checkpointId);
1023
    sdbRelease(pSdb, pStream);
3,136✔
1024
  }
1025

1026
  {  // check the max checkpoint id from all vnodes.
1027
    int64_t maxCheckpointId = -1;
1,274✔
1028
    if (lock) {
1,274✔
1029
      streamMutexLock(&execInfo.lock);
529✔
1030
    }
1031

1032
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
15,149✔
1033
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
13,875✔
1034
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
13,875✔
1035
      if (p == NULL || pEntry == NULL) {
13,875!
1036
        continue;
×
1037
      }
1038

1039
      if (pEntry->checkpointInfo.failed) {
13,875!
1040
        continue;
×
1041
      }
1042

1043
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
13,875✔
1044
        maxCheckpointId = pEntry->checkpointInfo.latestId;
1,835✔
1045
      }
1046
    }
1047

1048
    if (lock) {
1,274✔
1049
      streamMutexUnlock(&execInfo.lock);
529✔
1050
    }
1051

1052
    if (maxCheckpointId > maxChkptId) {
1,274!
1053
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1054
             maxCheckpointId);
1055
      maxChkptId = maxCheckpointId;
×
1056
    }
1057
  }
1058

1059
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
1,274✔
1060
  return maxChkptId + 1;
1,274✔
1061
}
1062

1063
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
1,276✔
1064
                                               int8_t mndTrigger, bool lock) {
1065
  int32_t code = TSDB_CODE_SUCCESS;
1,276✔
1066
  bool    conflict = false;
1,276✔
1067
  int64_t ts = taosGetTimestampMs();
1,276✔
1068
  STrans *pTrans = NULL;
1,276✔
1069

1070
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
1,276!
1071
    return code;
×
1072
  }
1073

1074
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
1,276✔
1075
  if (code) {
1,276✔
1076
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
1!
1077
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
1078
    goto _ERR;
1✔
1079
  }
1080

1081
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
1,275✔
1082
                       "gen checkpoint for stream", &pTrans);
1083
  if (code) {
1,275!
1084
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1085
           tstrerror(code));
1086
    goto _ERR;
×
1087
  }
1088

1089
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
1,275✔
1090
  if (code) {
1,275!
1091
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
1092
    goto _ERR;
×
1093
  }
1094

1095
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
1,275✔
1096

1097
  taosWLockLatch(&pStream->lock);
1,275✔
1098
  pStream->currentTick = 1;
1,275✔
1099

1100
  // 1. redo action: broadcast checkpoint source msg for all source vg
1101
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
1,275✔
1102
  for (int32_t i = 0; i < totalLevel; i++) {
3,858✔
1103
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
2,583✔
1104
    SStreamTask *p = taosArrayGetP(pLevel, 0);
2,583✔
1105

1106
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
2,583✔
1107
      int32_t sz = taosArrayGetSize(pLevel);
1,275✔
1108
      for (int32_t j = 0; j < sz; j++) {
4,322✔
1109
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
3,047✔
1110
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
3,047✔
1111

1112
        if (code != TSDB_CODE_SUCCESS) {
3,047!
1113
          taosWUnLockLatch(&pStream->lock);
×
1114
          goto _ERR;
×
1115
        }
1116
      }
1117
    }
1118
  }
1119

1120
  // 2. reset tick
1121
  pStream->checkpointId = checkpointId;
1,275✔
1122
  pStream->checkpointFreq = taosGetTimestampMs();
1,275✔
1123
  pStream->currentTick = 0;
1,275✔
1124

1125
  // 3. commit log: stream checkpoint info
1126
  pStream->version = pStream->version + 1;
1,275✔
1127
  taosWUnLockLatch(&pStream->lock);
1,275✔
1128

1129
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
1,275!
1130
    goto _ERR;
×
1131
  }
1132

1133
  code = mndTransPrepare(pMnode, pTrans);
1,275✔
1134
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,275!
1135
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1136
  } else {
1137
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,275✔
1138
  }
1139

1140
_ERR:
1,276✔
1141
  mndTransDrop(pTrans);
1,276✔
1142
  return code;
1,276✔
1143
}
1144

1145
int32_t extractStreamNodeList(SMnode *pMnode) {
3,307✔
1146
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
3,307✔
1147
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
709✔
1148
    if (code) {
709!
1149
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
1150
      return code;
×
1151
    }
1152
  }
1153

1154
  return taosArrayGetSize(execInfo.pNodeList);
3,307✔
1155
}
1156

1157
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,608✔
1158
  bool ready = true;
1,608✔
1159
  if (mndStreamNodeIsUpdated(pMnode)) {
1,608✔
1160
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
32✔
1161
  }
1162

1163
  streamMutexLock(&execInfo.lock);
1,576✔
1164
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,576✔
1165
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
709✔
1166
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
709!
1167
      streamMutexUnlock(&execInfo.lock);
×
1168
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
1169
      return TSDB_CODE_FAILED;
×
1170
    }
1171
  }
1172

1173
  SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
1,576✔
1174

1175
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
9,837✔
1176
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
8,404✔
1177
    if (p == NULL) {
8,404!
1178
      continue;
×
1179
    }
1180

1181
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
8,404✔
1182
    if (pEntry == NULL) {
8,404!
1183
      continue;
×
1184
    }
1185

1186
    if (pEntry->status == TASK_STATUS__STOP) {
8,404✔
1187
      for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
68!
1188
        STaskId *pId = taosArrayGet(pInvalidList, j);
×
1189
        if (pId == NULL) {
×
1190
          continue;
×
1191
        }
1192

1193
        if (pEntry->id.streamId == pId->streamId) {
×
1194
          void *px = taosArrayPush(pInvalidList, &pEntry->id);
×
1195
          if (px == NULL) {
×
1196
            mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
1197
          }
1198
          break;
×
1199
        }
1200
      }
1201
    }
1202

1203
    if (pEntry->status != TASK_STATUS__READY) {
8,404✔
1204
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
129✔
1205
             (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1206
      ready = false;
129✔
1207
      break;
129✔
1208
    }
1209

1210
    if (pEntry->hTaskId != 0) {
8,275✔
1211
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
14✔
1212
             " exists, checkpoint not issued",
1213
             pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1214
             pEntry->hTaskId);
1215
      ready = false;
14✔
1216
      break;
14✔
1217
    }
1218
  }
1219

1220
  removeTasksInBuf(pInvalidList, &execInfo);
1,576✔
1221
  taosArrayDestroy(pInvalidList);
1,576✔
1222

1223
  streamMutexUnlock(&execInfo.lock);
1,576✔
1224
  return ready ? 0 : -1;
1,576✔
1225
}
1226

1227
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
1,509✔
1228
  int64_t ts = -1;
1,509✔
1229
  int32_t taskId = -1;
1,509✔
1230

1231
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
26,999✔
1232
    STaskId          *p = taosArrayGet(pTaskList, i);
25,490✔
1233
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
25,490✔
1234
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
25,490!
1235
      continue;
19,886✔
1236
    }
1237

1238
    if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
5,604!
1239
      ts = pEntry->startTime;
3,681✔
1240
      taskId = pEntry->id.taskId;
3,681✔
1241
    }
1242
  }
1243

1244
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
1,509✔
1245
  return ts;
1,509✔
1246
}
1247

1248
typedef struct {
1249
  int64_t streamId;
1250
  int64_t duration;
1251
} SCheckpointInterval;
1252

1253
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
584✔
1254
  const SCheckpointInterval *pInt1 = p1;
584✔
1255
  const SCheckpointInterval *pInt2 = p2;
584✔
1256
  if (pInt1->duration == pInt2->duration) {
584✔
1257
    return 0;
54✔
1258
  }
1259

1260
  return pInt1->duration > pInt2->duration ? -1 : 1;
530✔
1261
}
1262

1263
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,608✔
1264
  SMnode     *pMnode = pReq->info.node;
1,608✔
1265
  SSdb       *pSdb = pMnode->pSdb;
1,608✔
1266
  void       *pIter = NULL;
1,608✔
1267
  SStreamObj *pStream = NULL;
1,608✔
1268
  int32_t     code = 0;
1,608✔
1269
  int32_t     numOfCheckpointTrans = 0;
1,608✔
1270

1271
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,608✔
1272
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
175✔
1273
  }
1274

1275
  SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
1,433✔
1276
  if (pList == NULL) {
1,433!
1277
    return terrno;
×
1278
  }
1279

1280
  int64_t now = taosGetTimestampMs();
1,433✔
1281

1282
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,556✔
1283
    int64_t duration = now - pStream->checkpointFreq;
2,123✔
1284
    if (duration < tsStreamCheckpointInterval * 1000) {
2,123✔
1285
      sdbRelease(pSdb, pStream);
614✔
1286
      continue;
1,037✔
1287
    }
1288

1289
    streamMutexLock(&execInfo.lock);
1,509✔
1290
    int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
1,509✔
1291
    if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
1,509✔
1292
      streamMutexUnlock(&execInfo.lock);
423✔
1293
      sdbRelease(pSdb, pStream);
423✔
1294
      continue;
423✔
1295
    }
1296
    streamMutexUnlock(&execInfo.lock);
1,086✔
1297

1298
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
1,086✔
1299
    void               *p = taosArrayPush(pList, &in);
1,086✔
1300
    if (p) {
1,086!
1301
      int32_t currentSize = taosArrayGetSize(pList);
1,086✔
1302
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
1,086✔
1303
             "s), concurrently launch threshold:%d",
1304
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1305
             tsMaxConcurrentCheckpoint);
1306
    } else {
1307
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1308
    }
1309
    sdbRelease(pSdb, pStream);
1,086✔
1310
  }
1311

1312
  int32_t size = taosArrayGetSize(pList);
1,433✔
1313
  if (size == 0) {
1,433✔
1314
    taosArrayDestroy(pList);
904✔
1315
    return code;
904✔
1316
  }
1317

1318
  taosArraySort(pList, streamWaitComparFn);
529✔
1319
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
529✔
1320
  if (code) {
529!
1321
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1322
    taosArrayDestroy(pList);
×
1323
    return code;
×
1324
  }
1325

1326
  int32_t numOfQual = taosArrayGetSize(pList);
529✔
1327
  if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
529!
1328
    mDebug(
×
1329
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1330
        "checkpoint trans are not allowed, wait for 30s",
1331
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1332
    taosArrayDestroy(pList);
×
1333
    return code;
×
1334
  }
1335

1336
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
529✔
1337
  mDebug(
529✔
1338
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1339
      "concurrent trans threshold:%d",
1340
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1341

1342
  int32_t started = 0;
529✔
1343
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
529✔
1344

1345
  for (int32_t i = 0; i < numOfQual; ++i) {
534✔
1346
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
531✔
1347
    if (pCheckpointInfo == NULL) {
531!
1348
      continue;
×
1349
    }
1350

1351
    SStreamObj *p = NULL;
531✔
1352
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
531✔
1353
    if (p != NULL && code == 0) {
531!
1354
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
531✔
1355
      sdbRelease(pSdb, p);
531✔
1356

1357
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
531!
1358
        started += 1;
530✔
1359

1360
        if (started >= capacity) {
530✔
1361
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
526✔
1362
                 (started + numOfCheckpointTrans));
1363
          break;
526✔
1364
        }
1365
      } else {
1366
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
1!
1367
      }
1368
    }
1369
  }
1370

1371
  taosArrayDestroy(pList);
529✔
1372
  return code;
529✔
1373
}
1374

1375
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
1,416✔
1376
  SMnode     *pMnode = pReq->info.node;
1,416✔
1377
  SStreamObj *pStream = NULL;
1,416✔
1378
  int32_t     code = 0;
1,416✔
1379

1380
  SMDropStreamReq dropReq = {0};
1,416✔
1381
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
1,416!
1382
    mError("invalid drop stream msg recv, discarded");
×
1383
    code = TSDB_CODE_INVALID_MSG;
×
1384
    TAOS_RETURN(code);
×
1385
  }
1386

1387
  mDebug("recv drop stream:%s msg", dropReq.name);
1,416✔
1388

1389
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
1,416✔
1390
  if (pStream == NULL || code != 0) {
1,416!
1391
    if (dropReq.igNotExists) {
141✔
1392
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
131!
1393
      sdbRelease(pMnode->pSdb, pStream);
131✔
1394
      tFreeMDropStreamReq(&dropReq);
131✔
1395
      return 0;
131✔
1396
    } else {
1397
      mError("stream:%s not exist failed to drop it", dropReq.name);
10!
1398
      tFreeMDropStreamReq(&dropReq);
10✔
1399
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
10✔
1400
    }
1401
  }
1402

1403
  if (pStream->smaId != 0) {
1,275✔
1404
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
219!
1405

1406
    void    *pIter = NULL;
219✔
1407
    SSmaObj *pSma = NULL;
219✔
1408
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
219✔
1409
    while (pIter) {
361✔
1410
      if (pSma && pSma->uid == pStream->smaId) {
147!
1411
        sdbRelease(pMnode->pSdb, pSma);
5✔
1412
        sdbRelease(pMnode->pSdb, pStream);
5✔
1413

1414
        sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1415
        tFreeMDropStreamReq(&dropReq);
5✔
1416
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
5✔
1417

1418
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
5!
1419
               dropReq.name, pStream->uid, tstrerror(terrno));
1420
        TAOS_RETURN(code);
5✔
1421
      }
1422

1423
      if (pSma) {
142!
1424
        sdbRelease(pMnode->pSdb, pSma);
142✔
1425
      }
1426

1427
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
142✔
1428
    }
1429
  }
1430

1431
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
1,270!
1432
    sdbRelease(pMnode->pSdb, pStream);
×
1433
    tFreeMDropStreamReq(&dropReq);
×
1434
    return -1;
×
1435
  }
1436

1437
  // check if it is conflict with other trans in both sourceDb and targetDb.
1438
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
1,270✔
1439
  if (code) {
1,270!
1440
    sdbRelease(pMnode->pSdb, pStream);
×
1441
    tFreeMDropStreamReq(&dropReq);
×
1442
    return code;
×
1443
  }
1444

1445
  STrans *pTrans = NULL;
1,270✔
1446
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
1,270✔
1447
  if (pTrans == NULL || code) {
1,270!
1448
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1449
    sdbRelease(pMnode->pSdb, pStream);
×
1450
    tFreeMDropStreamReq(&dropReq);
×
1451
    TAOS_RETURN(code);
×
1452
  }
1453

1454
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
1,270✔
1455
  if (code) {
1,270!
1456
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1457
    sdbRelease(pMnode->pSdb, pStream);
×
1458
    mndTransDrop(pTrans);
×
1459
    tFreeMDropStreamReq(&dropReq);
×
1460
    TAOS_RETURN(code);
×
1461
  }
1462

1463
  // drop all tasks
1464
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
1,270✔
1465
  if (code) {
1,270!
1466
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1467
    sdbRelease(pMnode->pSdb, pStream);
×
1468
    mndTransDrop(pTrans);
×
1469
    tFreeMDropStreamReq(&dropReq);
×
1470
    TAOS_RETURN(code);
×
1471
  }
1472

1473
  // drop stream
1474
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
1,270✔
1475
  if (code) {
1,270!
1476
    sdbRelease(pMnode->pSdb, pStream);
×
1477
    mndTransDrop(pTrans);
×
1478
    tFreeMDropStreamReq(&dropReq);
×
1479
    TAOS_RETURN(code);
×
1480
  }
1481

1482
  code = mndTransPrepare(pMnode, pTrans);
1,270✔
1483
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,270!
1484
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1485
    sdbRelease(pMnode->pSdb, pStream);
×
1486
    mndTransDrop(pTrans);
×
1487
    tFreeMDropStreamReq(&dropReq);
×
1488
    TAOS_RETURN(code);
×
1489
  }
1490

1491
  // kill the related checkpoint trans
1492
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
1,270✔
1493
  if (transId != 0) {
1,270!
1494
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
1495
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1496
  }
1497

1498
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
1,270✔
1499
         pStream->uid, transId);
1500

1501
  removeStreamTasksInBuf(pStream, &execInfo);
1,270✔
1502

1503
  SName name = {0};
1,270✔
1504
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,270✔
1505
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
1,270✔
1506

1507
  sdbRelease(pMnode->pSdb, pStream);
1,270✔
1508
  mndTransDrop(pTrans);
1,270✔
1509
  tFreeMDropStreamReq(&dropReq);
1,270✔
1510

1511
  if (code == 0) {
1,270✔
1512
    return TSDB_CODE_ACTION_IN_PROGRESS;
1,260✔
1513
  } else {
1514
    TAOS_RETURN(code);
10✔
1515
  }
1516
}
1517

1518
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,768✔
1519
  SSdb   *pSdb = pMnode->pSdb;
1,768✔
1520
  void   *pIter = NULL;
1,768✔
1521
  int32_t code = 0;
1,768✔
1522

1523
  while (1) {
568✔
1524
    SStreamObj *pStream = NULL;
2,336✔
1525
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,336✔
1526
    if (pIter == NULL) break;
2,336✔
1527

1528
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
569✔
1529
      if (pStream->sourceDbUid != pStream->targetDbUid) {
83✔
1530
        sdbRelease(pSdb, pStream);
1✔
1531
        sdbCancelFetch(pSdb, pIter);
1✔
1532
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1533
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1534
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1535
      } else {
1536
        // kill the related checkpoint trans
1537
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
82✔
1538
        if (transId != 0) {
82!
1539
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
1540
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1541
        }
1542

1543
        // drop the stream obj in execInfo
1544
        removeStreamTasksInBuf(pStream, &execInfo);
82✔
1545

1546
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
82✔
1547
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
82!
1548
          sdbRelease(pSdb, pStream);
×
1549
          sdbCancelFetch(pSdb, pIter);
×
1550
          return code;
×
1551
        }
1552
      }
1553
    }
1554

1555
    sdbRelease(pSdb, pStream);
568✔
1556
  }
1557

1558
  return 0;
1,767✔
1559
}
1560

1561
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,263✔
1562
  SMnode     *pMnode = pReq->info.node;
11,263✔
1563
  SSdb       *pSdb = pMnode->pSdb;
11,263✔
1564
  int32_t     numOfRows = 0;
11,263✔
1565
  SStreamObj *pStream = NULL;
11,263✔
1566
  int32_t     code = 0;
11,263✔
1567

1568
  while (numOfRows < rows) {
44,465!
1569
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
44,465✔
1570
    if (pShow->pIter == NULL) break;
44,471✔
1571

1572
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
33,203✔
1573
    if (code == 0) {
33,083!
1574
      numOfRows++;
33,084✔
1575
    }
1576
    sdbRelease(pSdb, pStream);
33,083✔
1577
  }
1578

1579
  pShow->numOfRows += numOfRows;
11,268✔
1580
  return numOfRows;
11,268✔
1581
}
1582

1583
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1584
  SSdb *pSdb = pMnode->pSdb;
×
1585
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1586
}
×
1587

1588
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
20,775✔
1589
  SMnode     *pMnode = pReq->info.node;
20,775✔
1590
  SSdb       *pSdb = pMnode->pSdb;
20,775✔
1591
  int32_t     numOfRows = 0;
20,775✔
1592
  SStreamObj *pStream = NULL;
20,775✔
1593
  int32_t     code = 0;
20,775✔
1594

1595
  streamMutexLock(&execInfo.lock);
20,775✔
1596
  mndInitStreamExecInfo(pMnode, &execInfo);
20,788✔
1597
  streamMutexUnlock(&execInfo.lock);
20,788✔
1598

1599
  while (numOfRows < rowsCapacity) {
83,736✔
1600
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
83,686✔
1601
    if (pShow->pIter == NULL) {
83,685✔
1602
      break;
20,738✔
1603
    }
1604

1605
    // lock
1606
    taosRLockLatch(&pStream->lock);
62,947✔
1607

1608
    int32_t count = mndGetNumOfStreamTasks(pStream);
62,949✔
1609
    if (numOfRows + count > rowsCapacity) {
62,900✔
1610
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
40✔
1611
      if (code) {
40!
1612
        mError("failed to prepare the result block buffer, quit return value");
×
1613
        taosRUnLockLatch(&pStream->lock);
×
1614
        sdbRelease(pSdb, pStream);
×
1615
        continue;
×
1616
      }
1617
    }
1618

1619
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
62,900✔
1620
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
62,900✔
1621
    if (pSourceDb != NULL) {
62,926!
1622
      precision = pSourceDb->cfg.precision;
62,930✔
1623
      mndReleaseDb(pMnode, pSourceDb);
62,930✔
1624
    }
1625

1626
    // add row for each task
1627
    SStreamTaskIter *pIter = NULL;
62,945✔
1628
    code = createStreamTaskIter(pStream, &pIter);
62,945✔
1629
    if (code) {
62,938!
1630
      taosRUnLockLatch(&pStream->lock);
×
1631
      sdbRelease(pSdb, pStream);
×
1632
      mError("failed to create task iter for stream:%s", pStream->name);
×
1633
      continue;
×
1634
    }
1635

1636
    while (streamTaskIterNextTask(pIter)) {
278,728✔
1637
      SStreamTask *pTask = NULL;
215,889✔
1638
      code = streamTaskIterGetCurrent(pIter, &pTask);
215,889✔
1639
      if (code) {
216,097!
1640
        destroyStreamTaskIter(pIter);
×
1641
        break;
×
1642
      }
1643

1644
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
216,097✔
1645
      if (code == TSDB_CODE_SUCCESS) {
215,789!
1646
        numOfRows++;
215,797✔
1647
      }
1648
    }
1649

1650
    pBlock->info.rows = numOfRows;
62,266✔
1651

1652
    destroyStreamTaskIter(pIter);
62,266✔
1653
    taosRUnLockLatch(&pStream->lock);
62,924✔
1654

1655
    sdbRelease(pSdb, pStream);
62,943✔
1656
  }
1657

1658
  pShow->numOfRows += numOfRows;
20,788✔
1659
  return numOfRows;
20,788✔
1660
}
1661

1662
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
1663
  SSdb *pSdb = pMnode->pSdb;
×
1664
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1665
}
×
1666

1667
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
740✔
1668
  SMnode     *pMnode = pReq->info.node;
740✔
1669
  SStreamObj *pStream = NULL;
740✔
1670
  int32_t     code = 0;
740✔
1671

1672
  SMPauseStreamReq pauseReq = {0};
740✔
1673
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
740!
1674
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1675
  }
1676

1677
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
740✔
1678
  if (pStream == NULL || code != 0) {
740!
1679
    if (pauseReq.igNotExists) {
401✔
1680
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
148!
1681
      return 0;
148✔
1682
    } else {
1683
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
253!
1684
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
253✔
1685
    }
1686
  }
1687

1688
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
339!
1689

1690
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
339!
1691
    sdbRelease(pMnode->pSdb, pStream);
×
1692
    return code;
×
1693
  }
1694

1695
  // check if it is conflict with other trans in both sourceDb and targetDb.
1696
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
339✔
1697
  if (code) {
339!
1698
    sdbRelease(pMnode->pSdb, pStream);
×
1699
    TAOS_RETURN(code);
×
1700
  }
1701

1702
  bool updated = mndStreamNodeIsUpdated(pMnode);
339✔
1703
  if (updated) {
339!
1704
    mError("tasks are not ready for pause, node update detected");
×
1705
    sdbRelease(pMnode->pSdb, pStream);
×
1706
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1707
  }
1708

1709
  {  // check for tasks, if tasks are not ready, not allowed to pause
1710
    bool found = false;
339✔
1711
    bool readyToPause = true;
339✔
1712
    streamMutexLock(&execInfo.lock);
339✔
1713

1714
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
4,916✔
1715
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
4,577✔
1716
      if (p == NULL) {
4,577!
1717
        continue;
×
1718
      }
1719

1720
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
4,577✔
1721
      if (pEntry == NULL) {
4,577!
1722
        continue;
×
1723
      }
1724

1725
      if (pEntry->id.streamId != pStream->uid) {
4,577✔
1726
        continue;
2,959✔
1727
      }
1728

1729
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
1,618!
1730
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
141!
1731
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1732
        readyToPause = false;
141✔
1733
      }
1734

1735
      found = true;
1,618✔
1736
    }
1737

1738
    streamMutexUnlock(&execInfo.lock);
339✔
1739
    if (!found) {
339!
1740
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1741
      sdbRelease(pMnode->pSdb, pStream);
×
1742
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1743
    }
1744

1745
    if (!readyToPause) {
339✔
1746
      mError("stream:%s task not ready for pause yet", pauseReq.name);
46!
1747
      sdbRelease(pMnode->pSdb, pStream);
46✔
1748
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
46✔
1749
    }
1750
  }
1751

1752
  STrans *pTrans = NULL;
293✔
1753
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
293✔
1754
  if (pTrans == NULL || code) {
293!
1755
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1756
    sdbRelease(pMnode->pSdb, pStream);
×
1757
    return code;
×
1758
  }
1759

1760
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
293✔
1761
  if (code) {
293!
1762
    sdbRelease(pMnode->pSdb, pStream);
×
1763
    mndTransDrop(pTrans);
×
1764
    return code;
×
1765
  }
1766

1767
  // if nodeUpdate happened, not send pause trans
1768
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
293✔
1769
  if (code) {
293!
1770
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1771
    sdbRelease(pMnode->pSdb, pStream);
×
1772
    mndTransDrop(pTrans);
×
1773
    return code;
×
1774
  }
1775

1776
  // pause stream
1777
  taosWLockLatch(&pStream->lock);
293✔
1778
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
293✔
1779
  if (code) {
293!
1780
    taosWUnLockLatch(&pStream->lock);
×
1781
    sdbRelease(pMnode->pSdb, pStream);
×
1782
    mndTransDrop(pTrans);
×
1783
    return code;
×
1784
  }
1785

1786
  taosWUnLockLatch(&pStream->lock);
293✔
1787

1788
  code = mndTransPrepare(pMnode, pTrans);
293✔
1789
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
293!
1790
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1791
    sdbRelease(pMnode->pSdb, pStream);
×
1792
    mndTransDrop(pTrans);
×
1793
    return code;
×
1794
  }
1795

1796
  sdbRelease(pMnode->pSdb, pStream);
293✔
1797
  mndTransDrop(pTrans);
293✔
1798

1799
  return TSDB_CODE_ACTION_IN_PROGRESS;
293✔
1800
}
1801

1802
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
840✔
1803
  SMnode     *pMnode = pReq->info.node;
840✔
1804
  SStreamObj *pStream = NULL;
840✔
1805
  int32_t     code = 0;
840✔
1806

1807
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
840!
1808
    return code;
×
1809
  }
1810

1811
  SMResumeStreamReq resumeReq = {0};
840✔
1812
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
840!
1813
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1814
  }
1815

1816
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
840✔
1817
  if (pStream == NULL || code != 0) {
840!
1818
    if (resumeReq.igNotExists) {
296✔
1819
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
295!
1820
      sdbRelease(pMnode->pSdb, pStream);
295✔
1821
      return 0;
295✔
1822
    } else {
1823
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1824
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1825
    }
1826
  }
1827

1828
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
544!
1829
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
544!
1830
    sdbRelease(pMnode->pSdb, pStream);
×
1831
    return -1;
×
1832
  }
1833

1834
  // check if it is conflict with other trans in both sourceDb and targetDb.
1835
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
544✔
1836
  if (code) {
544!
1837
    sdbRelease(pMnode->pSdb, pStream);
×
1838
    return code;
×
1839
  }
1840

1841
  STrans *pTrans = NULL;
544✔
1842
  code =
1843
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
544✔
1844
  if (pTrans == NULL || code) {
544!
1845
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
1846
    sdbRelease(pMnode->pSdb, pStream);
×
1847
    return code;
×
1848
  }
1849

1850
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
544✔
1851
  if (code) {
544!
1852
    sdbRelease(pMnode->pSdb, pStream);
×
1853
    mndTransDrop(pTrans);
×
1854
    return code;
×
1855
  }
1856

1857
  // set the resume action
1858
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
544✔
1859
  if (code) {
544!
1860
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
1861
    sdbRelease(pMnode->pSdb, pStream);
×
1862
    mndTransDrop(pTrans);
×
1863
    return code;
×
1864
  }
1865

1866
  // resume stream
1867
  taosWLockLatch(&pStream->lock);
544✔
1868
  pStream->status = STREAM_STATUS__NORMAL;
544✔
1869
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
544!
1870
    taosWUnLockLatch(&pStream->lock);
×
1871

1872
    sdbRelease(pMnode->pSdb, pStream);
×
1873
    mndTransDrop(pTrans);
×
1874
    return code;
×
1875
  }
1876

1877
  taosWUnLockLatch(&pStream->lock);
544✔
1878
  code = mndTransPrepare(pMnode, pTrans);
544✔
1879
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
544!
1880
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1881
    sdbRelease(pMnode->pSdb, pStream);
×
1882
    mndTransDrop(pTrans);
×
1883
    return code;
×
1884
  }
1885

1886
  sdbRelease(pMnode->pSdb, pStream);
544✔
1887
  mndTransDrop(pTrans);
544✔
1888

1889
  return TSDB_CODE_ACTION_IN_PROGRESS;
544✔
1890
}
1891

1892
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
1893
  SMnode     *pMnode = pReq->info.node;
×
1894
  SStreamObj *pStream = NULL;
×
1895
  int32_t     code = 0;
×
1896

1897
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
1898
    return code;
×
1899
  }
1900

1901
  SMResetStreamReq resetReq = {0};
×
1902
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
1903
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1904
  }
1905

1906
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
1907

1908
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
1909
  if (pStream == NULL || code != 0) {
×
1910
    if (resetReq.igNotExists) {
×
1911
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
1912
      return 0;
×
1913
    } else {
1914
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
1915
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1916
    }
1917
  }
1918

1919
  //todo(liao hao jun)
1920
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1921
}
1922

1923
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
9✔
1924
  SSdb       *pSdb = pMnode->pSdb;
9✔
1925
  SStreamObj *pStream = NULL;
9✔
1926
  void       *pIter = NULL;
9✔
1927
  STrans     *pTrans = NULL;
9✔
1928
  int32_t     code = 0;
9✔
1929

1930
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
1931
  while (1) {
1932
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
18✔
1933
    if (pIter == NULL) {
18✔
1934
      break;
9✔
1935
    }
1936

1937
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
9✔
1938
    sdbRelease(pSdb, pStream);
9✔
1939

1940
    if (code) {
9!
1941
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
1942
      sdbCancelFetch(pSdb, pIter);
×
1943
      return code;
×
1944
    }
1945
  }
1946

1947
  while (1) {
1948
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
18✔
1949
    if (pIter == NULL) {
18✔
1950
      break;
9✔
1951
    }
1952

1953
    // here create only one trans
1954
    if (pTrans == NULL) {
9!
1955
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
9✔
1956
                           "update task epsets", &pTrans);
1957
      if (pTrans == NULL || code) {
9!
1958
        sdbRelease(pSdb, pStream);
×
1959
        sdbCancelFetch(pSdb, pIter);
×
1960
        return terrno = code;
×
1961
      }
1962
    }
1963

1964
    if (!includeAllNodes) {
9!
1965
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
9✔
1966
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
9✔
1967
      if (p1 == NULL && p2 == NULL) {
9!
1968
        mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
×
1969
        sdbRelease(pSdb, pStream);
×
1970
        continue;
×
1971
      }
1972
    }
1973

1974
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
9✔
1975
           pStream->name, pTrans->id);
1976

1977
    // NOTE: for each stream, we register one trans entry for task update
1978
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
9✔
1979
    if (code) {
9!
1980
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
1981
    }
1982

1983
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
9✔
1984

1985
    // todo: not continue, drop all and retry again
1986
    if (code != TSDB_CODE_SUCCESS) {
9!
1987
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
1988
             tstrerror(code));
1989
      sdbRelease(pSdb, pStream);
×
1990
      continue;
×
1991
    }
1992

1993
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
9✔
1994
    sdbRelease(pSdb, pStream);
9✔
1995

1996
    if (code != TSDB_CODE_SUCCESS) {
9!
1997
      sdbCancelFetch(pSdb, pIter);
×
1998
      return code;
×
1999
    }
2000
  }
2001

2002
  // no need to build the trans to handle the vgroup update
2003
  if (pTrans == NULL) {
9!
2004
    return 0;
×
2005
  }
2006

2007
  code = mndTransPrepare(pMnode, pTrans);
9✔
2008
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
9!
2009
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2010
    sdbRelease(pMnode->pSdb, pStream);
×
2011
    mndTransDrop(pTrans);
×
2012
    return code;
×
2013
  }
2014

2015
  sdbRelease(pMnode->pSdb, pStream);
9✔
2016
  mndTransDrop(pTrans);
9✔
2017
  return code;
9✔
2018
}
2019

2020
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
718✔
2021
  SSdb       *pSdb = pMnode->pSdb;
718✔
2022
  SStreamObj *pStream = NULL;
718✔
2023
  void       *pIter = NULL;
718✔
2024
  int32_t     code = 0;
718✔
2025

2026
  mDebug("start to refresh node list by existed streams");
718✔
2027

2028
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
718✔
2029
  if (pHash == NULL) {
718!
2030
    return terrno;
×
2031
  }
2032

2033
  while (1) {
9✔
2034
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
727✔
2035
    if (pIter == NULL) {
727✔
2036
      break;
718✔
2037
    }
2038

2039
    taosWLockLatch(&pStream->lock);
9✔
2040

2041
    SStreamTaskIter *pTaskIter = NULL;
9✔
2042
    code = createStreamTaskIter(pStream, &pTaskIter);
9✔
2043
    if (code) {
9!
2044
      taosWUnLockLatch(&pStream->lock);
×
2045
      sdbRelease(pSdb, pStream);
×
2046
      mError("failed to create task iter for stream:%s", pStream->name);
×
2047
      continue;
×
2048
    }
2049

2050
    while (streamTaskIterNextTask(pTaskIter)) {
64✔
2051
      SStreamTask *pTask = NULL;
55✔
2052
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
55✔
2053
      if (code) {
55!
2054
        break;
×
2055
      }
2056

2057
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
55✔
2058
      epsetAssign(&entry.epset, &pTask->info.epSet);
55✔
2059
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
55✔
2060
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
55!
2061
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2062
      }
2063
    }
2064

2065
    destroyStreamTaskIter(pTaskIter);
9✔
2066
    taosWUnLockLatch(&pStream->lock);
9✔
2067

2068
    sdbRelease(pSdb, pStream);
9✔
2069
  }
2070

2071
  taosArrayClear(pNodeList);
718✔
2072

2073
  // convert to list
2074
  pIter = NULL;
718✔
2075
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
747✔
2076
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
29✔
2077

2078
    void *p = taosArrayPush(pNodeList, pEntry);
29✔
2079
    if (p == NULL) {
29!
2080
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2081
      if (code == 0) {
×
2082
        code = terrno;
×
2083
      }
2084
      continue;
×
2085
    }
2086

2087
    char    buf[256] = {0};
29✔
2088
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
29✔
2089
    if (ret != 0) {                                                // print error and continue
29!
2090
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2091
    }
2092

2093
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
29✔
2094
  }
2095

2096
  taosHashCleanup(pHash);
718✔
2097

2098
  mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
718✔
2099
  return code;
718✔
2100
}
2101

2102
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2103
  void   *pIter = NULL;
×
2104
  int32_t code = 0;
×
2105
  while (1) {
×
2106
    SVgObj *pVgroup = NULL;
×
2107
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2108
    if (pIter == NULL) {
×
2109
      break;
×
2110
    }
2111

2112
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2113
    sdbRelease(pSdb, pVgroup);
×
2114

2115
    if (code == 0) {
×
2116
      int32_t size = taosHashGetSize(pDBMap);
×
2117
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2118
    }
2119
  }
2120
}
×
2121

2122
// this function runs by only one thread, so it is not multi-thread safe
2123
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
1,360✔
2124
  int32_t code = 0;
1,360✔
2125
  bool    allReady = true;
1,360✔
2126
  SArray *pNodeSnapshot = NULL;
1,360✔
2127
  SMnode *pMnode = pMsg->info.node;
1,360✔
2128
  int64_t ts = taosGetTimestampSec();
1,360✔
2129
  bool    updateAllVgroups = false;
1,360✔
2130

2131
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
1,360✔
2132
  if (old != 0) {
1,360!
2133
    mDebug("still in checking node change");
×
2134
    return 0;
×
2135
  }
2136

2137
  mDebug("start to do node changing check");
1,360✔
2138

2139
  streamMutexLock(&execInfo.lock);
1,360✔
2140
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,360✔
2141
  streamMutexUnlock(&execInfo.lock);
1,360✔
2142

2143
  if (numOfNodes == 0) {
1,360!
2144
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2145
    execInfo.ts = ts;
×
2146
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2147
    return 0;
×
2148
  }
2149

2150
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
1,360✔
2151
  if (code) {
1,360!
2152
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2153
  }
2154

2155
  if (!allReady) {
1,360✔
2156
    taosArrayDestroy(pNodeSnapshot);
42✔
2157
    atomic_store_32(&mndNodeCheckSentinel, 0);
42✔
2158
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
42!
2159
    return 0;
42✔
2160
  }
2161

2162
  streamMutexLock(&execInfo.lock);
1,318✔
2163

2164
  code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
1,318✔
2165
  if (code) {
1,318!
2166
    goto _end;
×
2167
  }
2168

2169
  SVgroupChangeInfo changeInfo = {0};
1,318✔
2170
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
1,318✔
2171
  if (code) {
1,318!
2172
    goto _end;
×
2173
  }
2174

2175
  {
2176
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
1,318!
2177
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2178
      updateAllVgroups = true;
×
2179
      execInfo.switchFromFollower = false;  // reset the flag
×
2180
      addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
×
2181
    }
2182
  }
2183

2184
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
1,318!
2185
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2186
    killAllCheckpointTrans(pMnode, &changeInfo);
9✔
2187
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
9✔
2188

2189
    // keep the new vnode snapshot if success
2190
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
9!
2191
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
9✔
2192
      if (code) {
9!
2193
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
2194
        goto _end;
×
2195
      }
2196

2197
      execInfo.ts = ts;
9✔
2198
      mDebug("create trans successfully, update cached node list, numOfNodes:%d",
9✔
2199
             (int)taosArrayGetSize(execInfo.pNodeList));
2200
    } else {
2201
      mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
×
2202
    }
2203
  } else {
2204
    mDebug("no update found in nodeList");
1,309✔
2205
  }
2206

2207
  mndDestroyVgroupChangeInfo(&changeInfo);
1,318✔
2208

2209
_end:
1,318✔
2210
  streamMutexUnlock(&execInfo.lock);
1,318✔
2211
  taosArrayDestroy(pNodeSnapshot);
1,318✔
2212

2213
  mDebug("end to do stream task node change checking");
1,318✔
2214
  atomic_store_32(&mndNodeCheckSentinel, 0);
1,318✔
2215
  return 0;
1,318✔
2216
}
2217

2218
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
2,596✔
2219
  SMnode *pMnode = pReq->info.node;
2,596✔
2220
  SSdb   *pSdb = pMnode->pSdb;
2,596✔
2221
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
2,596✔
2222
    return 0;
1,236✔
2223
  }
2224

2225
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
1,360✔
2226
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
1,360✔
2227
  if (pMsg == NULL) {
1,360!
2228
    return terrno;
×
2229
  }
2230

2231
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
1,360✔
2232
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1,360✔
2233
}
2234

2235
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
2,003✔
2236
  SStreamTaskIter *pIter = NULL;
2,003✔
2237
  int32_t          code = createStreamTaskIter(pStream, &pIter);
2,003✔
2238
  if (code) {
2,003!
2239
    mError("failed to create task iter for stream:%s", pStream->name);
×
2240
    return;
×
2241
  }
2242

2243
  while (streamTaskIterNextTask(pIter)) {
12,027✔
2244
    SStreamTask *pTask = NULL;
10,024✔
2245
    code = streamTaskIterGetCurrent(pIter, &pTask);
10,024✔
2246
    if (code) {
10,024!
2247
      break;
×
2248
    }
2249

2250
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
10,024✔
2251
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
10,024✔
2252
    if (p == NULL) {
10,024✔
2253
      STaskStatusEntry entry = {0};
9,220✔
2254
      streamTaskStatusInit(&entry, pTask);
9,220✔
2255

2256
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
9,220✔
2257
      if (code == 0) {
9,220!
2258
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
9,220✔
2259
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
9,220✔
2260
        if (px) {
9,220!
2261
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
9,220!
2262
        } else {
2263
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2264
        }
2265
      } else {
2266
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2267
      }
2268

2269
      // add the new vgroups if not added yet
2270
      bool exist = false;
9,220✔
2271
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
49,872✔
2272
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
48,018✔
2273
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
48,018!
2274
          exist = true;
7,366✔
2275
          break;
7,366✔
2276
        }
2277
      }
2278

2279
      if (!exist) {
9,220✔
2280
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
1,854✔
2281
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
1,854✔
2282

2283
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
1,854✔
2284
        if (px) {
1,854!
2285
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
1,854!
2286
        } else {
2287
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2288
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2289
        }
2290
      }
2291
    }
2292
  }
2293

2294
  destroyStreamTaskIter(pIter);
2,003✔
2295
}
2296

2297
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
4,436✔
2298
  int32_t num = taosArrayGetSize(pList);
4,436✔
2299
  for (int32_t i = 0; i < num; ++i) {
16,419✔
2300
    int32_t *pId = taosArrayGet(pList, i);
11,992✔
2301
    if (pId == NULL) {
11,992!
2302
      continue;
×
2303
    }
2304

2305
    if (taskId == *pId) {
11,992✔
2306
      return;
9✔
2307
    }
2308
  }
2309

2310
  int32_t numOfTasks = taosArrayGetSize(pList);
4,427✔
2311
  void   *p = taosArrayPush(pList, &taskId);
4,427✔
2312
  if (p) {
4,427!
2313
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
4,427✔
2314
  } else {
2315
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2316
           uid, numOfTasks);
2317
  }
2318
}
2319

2320
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
4,436✔
2321
  SMnode                  *pMnode = pReq->info.node;
4,436✔
2322
  SStreamTaskCheckpointReq req = {0};
4,436✔
2323

2324
  SDecoder decoder = {0};
4,436✔
2325
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,436✔
2326

2327
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
4,436!
2328
    tDecoderClear(&decoder);
×
2329
    mError("invalid task checkpoint req msg received");
×
2330
    return TSDB_CODE_INVALID_MSG;
×
2331
  }
2332
  tDecoderClear(&decoder);
4,436✔
2333

2334
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
4,436✔
2335

2336
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2337
  streamMutexLock(&execInfo.lock);
4,436✔
2338

2339
  SStreamObj *pStream = NULL;
4,436✔
2340
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,436✔
2341
  if (pStream == NULL || code != 0) {
4,436!
2342
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2343
          req.streamId);
2344

2345
    // not in meta-store yet, try to acquire the task in exec buffer
2346
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2347
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2348
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2349
    if (p == NULL) {
×
2350
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2351
      streamMutexUnlock(&execInfo.lock);
×
2352
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2353
    } else {
2354
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2355
             req.streamId, req.taskId);
2356
    }
2357
  }
2358

2359
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,436!
2360

2361
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
4,436✔
2362
  if (pReqTaskList == NULL) {
4,436✔
2363
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
773✔
2364
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
773✔
2365
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
773✔
2366
    if (code) {
773!
2367
      mError("failed to put into transfer state stream map, code: out of memory");
×
2368
    }
2369
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
773✔
2370
  } else {
2371
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
3,663✔
2372
  }
2373

2374
  int32_t total = taosArrayGetSize(*pReqTaskList);
4,436✔
2375
  if (total == numOfTasks) {  // all tasks have sent the reqs
4,436✔
2376
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
745✔
2377
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
745!
2378

2379
    if (pStream != NULL) {  // TODO:handle error
745!
2380
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
745✔
2381
      if (code) {
745!
2382
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
745!
2383
      }
2384
    } else {
2385
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2386
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2387
      // sleep(500ms)
2388
    }
2389

2390
    // remove this entry
2391
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
745✔
2392

2393
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
745✔
2394
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
745✔
2395
  }
2396

2397
  if (pStream != NULL) {
4,436!
2398
    mndReleaseStream(pMnode, pStream);
4,436✔
2399
  }
2400

2401
  streamMutexUnlock(&execInfo.lock);
4,436✔
2402

2403
  {
2404
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
4,436✔
2405
    rsp.pCont = rpcMallocCont(rsp.contLen);
4,436✔
2406
    if (rsp.pCont == NULL) {
4,436!
2407
      return terrno;
×
2408
    }
2409

2410
    SMsgHead *pHead = rsp.pCont;
4,436✔
2411
    pHead->vgId = htonl(req.nodeId);
4,436✔
2412

2413
    tmsgSendRsp(&rsp);
4,436✔
2414
    pReq->info.handle = NULL;  // disable auto rsp
4,436✔
2415
  }
2416

2417
  return 0;
4,436✔
2418
}
2419

2420
// valid the info according to the HbMsg
2421
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
5,983✔
2422
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
5,983✔
2423
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
5,983✔
2424
  if (pTaskEntry == NULL) {
5,983✔
2425
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
35!
2426
    return false;
35✔
2427
  }
2428

2429
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
5,948!
2430
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2431
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2432
    return false;
×
2433
  }
2434

2435
  // now the task in checkpoint procedure
2436
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
5,948!
2437
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2438
           " discard",
2439
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2440
    return false;
×
2441
  }
2442

2443
  if (reportChkptId >= pReport->checkpointId) {
5,948!
2444
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2445
           " discard",
2446
           pReport->taskId, pReport->checkpointId, reportChkptId);
2447
    return false;
×
2448
  }
2449

2450
  return true;
5,948✔
2451
}
2452

2453
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) {
5,983✔
2454
  bool valid = validateChkptReport(pReport, reportChkptId);
5,983✔
2455
  if (!valid) {
5,983✔
2456
    return;
35✔
2457
  }
2458

2459
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
20,384✔
2460
    STaskChkptInfo *p = taosArrayGet(pList, i);
14,436✔
2461
    if (p == NULL) {
14,436!
2462
      continue;
×
2463
    }
2464

2465
    if (p->taskId == pReport->taskId) {
14,436!
2466
      if (p->checkpointId > pReport->checkpointId) {
×
2467
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2468
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2469
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2470
        mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2471
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2472

2473
        // update the checkpoint report info
2474
        p->checkpointId = pReport->checkpointId;
×
2475
        p->ts = pReport->checkpointTs;
×
2476
        p->version = pReport->checkpointVer;
×
2477
        p->transId = pReport->transId;
×
2478
        p->dropHTask = pReport->dropHTask;
×
2479
      } else {
2480
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2481
      }
2482
      return;
×
2483
    }
2484
  }
2485

2486
  STaskChkptInfo info = {
5,948✔
2487
      .streamId = pReport->streamId,
5,948✔
2488
      .taskId = pReport->taskId,
5,948✔
2489
      .transId = pReport->transId,
5,948✔
2490
      .dropHTask = pReport->dropHTask,
5,948✔
2491
      .version = pReport->checkpointVer,
5,948✔
2492
      .ts = pReport->checkpointTs,
5,948✔
2493
      .checkpointId = pReport->checkpointId,
5,948✔
2494
      .nodeId = pReport->nodeId,
5,948✔
2495
  };
2496

2497
  void *p = taosArrayPush(pList, &info);
5,948✔
2498
  if (p == NULL) {
5,948!
2499
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2500
  } else {
2501
    int32_t size = taosArrayGetSize(pList);
5,948✔
2502
    mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size);
5,948✔
2503
  }
2504
}
2505

2506
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
5,983✔
2507
  SMnode           *pMnode = pReq->info.node;
5,983✔
2508
  SCheckpointReport req = {0};
5,983✔
2509

2510
  SDecoder decoder = {0};
5,983✔
2511
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
5,983✔
2512

2513
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
5,983!
2514
    tDecoderClear(&decoder);
×
2515
    mError("invalid task checkpoint-report msg received");
×
2516
    return TSDB_CODE_INVALID_MSG;
×
2517
  }
2518
  tDecoderClear(&decoder);
5,983✔
2519

2520
  streamMutexLock(&execInfo.lock);
5,983✔
2521
  mndInitStreamExecInfo(pMnode, &execInfo);
5,983✔
2522
  streamMutexUnlock(&execInfo.lock);
5,983✔
2523

2524
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
5,983✔
2525
         " checkpointVer:%" PRId64 " transId:%d",
2526
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2527

2528
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2529
  streamMutexLock(&execInfo.lock);
5,983✔
2530

2531
  SStreamObj *pStream = NULL;
5,983✔
2532
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
5,983✔
2533
  if (pStream == NULL || code != 0) {
5,983!
2534
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2535

2536
    // not in meta-store yet, try to acquire the task in exec buffer
2537
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2538
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2539
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2540
    if (p == NULL) {
×
2541
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2542
      streamMutexUnlock(&execInfo.lock);
×
2543
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2544
    } else {
2545
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2546
             req.streamId, req.taskId);
2547
    }
2548
  }
2549

2550
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
5,983!
2551

2552
  SChkptReportInfo *pInfo =
2553
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
5,983✔
2554
  if (pInfo == NULL) {
5,983✔
2555
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
744✔
2556
    if (info.pTaskList != NULL) {
744!
2557
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
744✔
2558
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
744✔
2559
      if (code) {
744!
2560
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2561
      }
2562

2563
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
744✔
2564
    }
2565
  } else {
2566
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
5,239✔
2567
  }
2568

2569
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
5,983✔
2570
  if (total == numOfTasks) {  // all tasks has send the reqs
5,983✔
2571
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
1,228!
2572
          " will be issued soon",
2573
          req.streamId, pStream->name, total, req.checkpointId);
2574
  }
2575

2576
  if (pStream != NULL) {
5,983!
2577
    mndReleaseStream(pMnode, pStream);
5,983✔
2578
  }
2579

2580
  streamMutexUnlock(&execInfo.lock);
5,983✔
2581

2582
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
5,983✔
2583
  return code;
5,983✔
2584
}
2585

2586
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
183✔
2587
  int32_t num = 0;
183✔
2588
  int64_t chkId = INT64_MAX;
183✔
2589
  *pExistedTasks = 0;
183✔
2590
  *pAllSame = true;
183✔
2591

2592
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
6,464✔
2593
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
6,281✔
2594
    if (p == NULL) {
6,281!
2595
      continue;
×
2596
    }
2597

2598
    if (p->streamId != streamId) {
6,281✔
2599
      continue;
5,060✔
2600
    }
2601

2602
    num += 1;
1,221✔
2603
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
1,221✔
2604
    if (chkId > pe->checkpointInfo.latestId) {
1,221✔
2605
      if (chkId != INT64_MAX) {
187✔
2606
        *pAllSame = false;
4✔
2607
      }
2608
      chkId = pe->checkpointInfo.latestId;
187✔
2609
    }
2610
  }
2611

2612
  *pExistedTasks = num;
183✔
2613
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
183!
2614
    return -1;
×
2615
  }
2616

2617
  return chkId;
183✔
2618
}
2619

2620
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
5,983✔
2621
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
5,983✔
2622
  rsp.pCont = rpcMallocCont(rsp.contLen);
5,983✔
2623
  if (rsp.pCont != NULL) {
5,983!
2624
    SMsgHead *pHead = rsp.pCont;
5,983✔
2625
    pHead->vgId = htonl(vgId);
5,983✔
2626

2627
    tmsgSendRsp(&rsp);
5,983✔
2628
    pInfo->handle = NULL;  // disable auto rsp
5,983✔
2629
  }
2630
}
5,983✔
2631

2632
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
11,683✔
2633
  SMnode *pMnode = pMsg->info.node;
11,683✔
2634
  int64_t now = taosGetTimestampMs();
11,683✔
2635
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
11,683✔
2636
  if (pStreamList == NULL) {
11,683!
2637
    return terrno;
×
2638
  }
2639

2640
  mDebug("start to process consensus-checkpointId in tmr");
11,683✔
2641

2642
  bool    allReady = true;
11,683✔
2643
  SArray *pNodeSnapshot = NULL;
11,683✔
2644

2645
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
11,683✔
2646
  taosArrayDestroy(pNodeSnapshot);
11,683✔
2647
  if (code) {
11,683✔
2648
    mError("failed to get the vgroup snapshot, ignore it and continue");
134!
2649
  }
2650

2651
  if (!allReady) {
11,683✔
2652
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1,304!
2653
    taosArrayDestroy(pStreamList);
1,304✔
2654
    return 0;
1,304✔
2655
  }
2656

2657
  streamMutexLock(&execInfo.lock);
10,379✔
2658

2659
  void *pIter = NULL;
10,379✔
2660
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
10,416✔
2661
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
37✔
2662

2663
    int64_t streamId = -1;
37✔
2664
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
37✔
2665
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
37✔
2666
    if (pList == NULL) {
37!
2667
      continue;
×
2668
    }
2669

2670
    SStreamObj *pStream = NULL;
37✔
2671
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
37✔
2672
    if (pStream == NULL || code != 0) {  // stream has been dropped already
37!
2673
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2674
      taosArrayDestroy(pList);
×
2675
      continue;
×
2676
    }
2677

2678
    for (int32_t j = 0; j < num; ++j) {
220✔
2679
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
183✔
2680
      if (pe == NULL) {
183!
2681
        continue;
×
2682
      }
2683

2684
      streamId = pe->req.streamId;
183✔
2685

2686
      int32_t existed = 0;
183✔
2687
      bool    allSame = true;
183✔
2688
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
183✔
2689
      if (chkId == -1) {
183!
2690
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2691
               pInfo->numOfTasks, pe->req.taskId);
2692
        break;
×
2693
      }
2694

2695
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
183✔
2696
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
179✔
2697
               pe->req.startTs, (now - pe->ts) / 1000.0);
2698
        if (chkId > pe->req.checkpointId) {
179!
2699
          streamMutexUnlock(&execInfo.lock);
×
2700
          taosArrayDestroy(pStreamList);
×
2701
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2702
                 pe->req.checkpointId, chkId);
2703
          return TSDB_CODE_FAILED;
×
2704
        }
2705
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
179✔
2706
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
179!
2707
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2708
        }
2709

2710
        void *p = taosArrayPush(pList, &pe->req.taskId);
179✔
2711
        if (p == NULL) {
179!
2712
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2713
        }
2714
        streamId = pe->req.streamId;
179✔
2715
      } else {
2716
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
4!
2717
               pe->req.startTs, (now - pe->ts) / 1000.0);
2718
      }
2719
    }
2720

2721
    mndReleaseStream(pMnode, pStream);
37✔
2722

2723
    if (taosArrayGetSize(pList) > 0) {
37✔
2724
      for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
215✔
2725
        int32_t *taskId = taosArrayGet(pList, i);
179✔
2726
        if (taskId == NULL) {
179!
2727
          continue;
×
2728
        }
2729

2730
        for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
179!
2731
          SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
179✔
2732
          if ((pe != NULL) && (pe->req.taskId == *taskId)) {
179!
2733
            taosArrayRemove(pInfo->pTaskList, k);
179✔
2734
            break;
179✔
2735
          }
2736
        }
2737
      }
2738
    }
2739

2740
    taosArrayDestroy(pList);
37✔
2741

2742
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
37✔
2743
      mndClearConsensusRspEntry(pInfo);
36✔
2744
      if (streamId == -1) {
36!
2745
        streamMutexUnlock(&execInfo.lock);
×
2746
        taosArrayDestroy(pStreamList);
×
2747
        mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
×
2748
        return TSDB_CODE_FAILED;
×
2749
      }
2750
      void *p = taosArrayPush(pStreamList, &streamId);
36✔
2751
      if (p == NULL) {
36!
2752
        mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
×
2753
      }
2754
    }
2755
  }
2756

2757
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
10,415✔
2758
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
36✔
2759
    if (pStreamId == NULL) {
36!
2760
      continue;
×
2761
    }
2762

2763
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
36✔
2764
  }
2765

2766
  streamMutexUnlock(&execInfo.lock);
10,379✔
2767

2768
  taosArrayDestroy(pStreamList);
10,379✔
2769
  mDebug("end to process consensus-checkpointId in tmr");
10,379✔
2770
  return code;
10,379✔
2771
}
2772

2773
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
259✔
2774
  int32_t code = mndProcessCreateStreamReq(pReq);
259✔
2775
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
259!
2776
    pReq->info.rsp = rpcMallocCont(1);
×
2777
    if (pReq->info.rsp == NULL) {
×
2778
      return terrno;
×
2779
    }
2780

2781
    pReq->info.rspLen = 1;
×
2782
    pReq->info.noResp = false;
×
2783
    pReq->code = code;
×
2784
  }
2785
  return code;
259✔
2786
}
2787

2788
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
224✔
2789
  int32_t code = mndProcessDropStreamReq(pReq);
224✔
2790
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
224!
2791
    pReq->info.rsp = rpcMallocCont(1);
20✔
2792
    if (pReq->info.rsp == NULL) {
20!
2793
      return terrno;
×
2794
    }
2795

2796
    pReq->info.rspLen = 1;
20✔
2797
    pReq->info.noResp = false;
20✔
2798
    pReq->code = code;
20✔
2799
  }
2800
  return code;
224✔
2801
}
2802

2803
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
52,855✔
2804
  if (pExecInfo->initTaskList || pMnode == NULL) {
52,855✔
2805
    return;
52,696✔
2806
  }
2807

2808
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
159✔
2809
  pExecInfo->initTaskList = true;
159✔
2810
}
2811

2812
void mndStreamResetInitTaskListLoadFlag() {
1,499✔
2813
  mInfo("reset task list buffer init flag for leader");
1,499!
2814
  execInfo.initTaskList = false;
1,499✔
2815
}
1,499✔
2816

2817
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
1,791✔
2818
  execInfo.switchFromFollower = false;
1,791✔
2819

2820
  if (execInfo.role == NODE_ROLE_UNINIT) {
1,791✔
2821
    execInfo.role = role;
1,633✔
2822
    if (role == NODE_ROLE_LEADER) {
1,633✔
2823
      mInfo("init mnode is set to leader");
1,446!
2824
    } else {
2825
      mInfo("init mnode is set to follower");
187!
2826
    }
2827
  } else {
2828
    if (role == NODE_ROLE_LEADER) {
158✔
2829
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
53!
2830
        execInfo.role = role;
53✔
2831
        execInfo.switchFromFollower = true;
53✔
2832
        mInfo("mnode switch to be leader from follower");
53!
2833
      } else {
2834
        mInfo("mnode remain to be leader, do nothing");
×
2835
      }
2836
    } else {  // follower's
2837
      if (execInfo.role == NODE_ROLE_LEADER) {
105✔
2838
        execInfo.role = role;
3✔
2839
        mInfo("mnode switch to be follower from leader");
3!
2840
      } else {
2841
        mInfo("mnode remain to be follower, do nothing");
102!
2842
      }
2843
    }
2844
  }
2845
}
1,791✔
2846

2847
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
159✔
2848
  SSdb       *pSdb = pMnode->pSdb;
159✔
2849
  SStreamObj *pStream = NULL;
159✔
2850
  void       *pIter = NULL;
159✔
2851

2852
  while (1) {
2853
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
422✔
2854
    if (pIter == NULL) {
422✔
2855
      break;
159✔
2856
    }
2857

2858
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
263✔
2859
    sdbRelease(pSdb, pStream);
263✔
2860
  }
2861
}
159✔
2862

2863
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
1,078✔
2864
  STrans *pTrans = NULL;
1,078✔
2865
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
1,078✔
2866
                               "update checkpoint-info", &pTrans);
2867
  if (pTrans == NULL || code) {
1,078!
2868
    sdbRelease(pMnode->pSdb, pStream);
×
2869
    return code;
×
2870
  }
2871

2872
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
1,078✔
2873
  if (code) {
1,078!
2874
    sdbRelease(pMnode->pSdb, pStream);
×
2875
    mndTransDrop(pTrans);
×
2876
    return code;
×
2877
  }
2878

2879
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
1,078✔
2880
  if (code) {
1,078!
2881
    sdbRelease(pMnode->pSdb, pStream);
×
2882
    mndTransDrop(pTrans);
×
2883
    return code;
×
2884
  }
2885

2886
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,078✔
2887
  if (code) {
1,078!
2888
    sdbRelease(pMnode->pSdb, pStream);
×
2889
    mndTransDrop(pTrans);
×
2890
    return code;
×
2891
  }
2892

2893
  code = mndTransPrepare(pMnode, pTrans);
1,078✔
2894
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,078!
2895
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
2896
    sdbRelease(pMnode->pSdb, pStream);
×
2897
    mndTransDrop(pTrans);
×
2898
    return code;
×
2899
  }
2900

2901
  sdbRelease(pMnode->pSdb, pStream);
1,078✔
2902
  mndTransDrop(pTrans);
1,078✔
2903

2904
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,078✔
2905
}
2906

2907
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
2✔
2908
  SMnode      *pMnode = pReq->info.node;
2✔
2909
  int32_t      code = 0;
2✔
2910
  SOrphanTask *pTask = NULL;
2✔
2911
  int32_t      i = 0;
2✔
2912
  STrans      *pTrans = NULL;
2✔
2913
  int32_t      numOfTasks = 0;
2✔
2914

2915
  SMStreamDropOrphanMsg msg = {0};
2✔
2916
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
2✔
2917
  if (code) {
2!
2918
    return code;
×
2919
  }
2920

2921
  numOfTasks = taosArrayGetSize(msg.pList);
2✔
2922
  if (numOfTasks == 0) {
2!
2923
    mDebug("no orphan tasks to drop, no need to create trans");
×
2924
    goto _err;
×
2925
  }
2926

2927
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
2!
2928

2929
  i = 0;
2✔
2930
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
2!
2931
    i += 1;
×
2932
  }
2933

2934
  if (pTask == NULL) {
2!
2935
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
2936
    goto _err;
×
2937
  }
2938

2939
  // check if it is conflict with other trans in both sourceDb and targetDb.
2940
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
2✔
2941
  if (code) {
2!
2942
    goto _err;
×
2943
  }
2944

2945
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
2✔
2946

2947
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
2✔
2948
  if (pTrans == NULL || code != 0) {
2!
2949
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
2950
    goto _err;
×
2951
  }
2952

2953
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
2✔
2954
  if (code) {
2!
2955
    goto _err;
×
2956
  }
2957

2958
  // drop all tasks
2959
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
2!
2960
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
2961
    goto _err;
×
2962
  }
2963

2964
  // drop stream
2965
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
2!
2966
    goto _err;
×
2967
  }
2968

2969
  code = mndTransPrepare(pMnode, pTrans);
2✔
2970
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2971
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
2972
    goto _err;
×
2973
  }
2974

2975
_err:
2✔
2976
  tDestroyDropOrphanTaskMsg(&msg);
2✔
2977
  mndTransDrop(pTrans);
2✔
2978

2979
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2980
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
2!
2981
  }
2982
  return code;
2✔
2983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc