• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3629

04 Mar 2025 01:45PM UTC coverage: 63.692% (-0.1%) from 63.79%
#3629

push

travis-ci

web-flow
Merge pull request #30007 from taosdata/revert-29951-docs/update-exception-handling-strategy

Revert "docs: update exception handling strategy"

149369 of 300378 branches covered (49.73%)

Branch coverage included in aggregate %.

233614 of 300930 relevant lines covered (77.63%)

18792670.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.48
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndStream.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
44

45
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
46
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
47

48
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
49
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
51
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
53
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
54
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
55
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
56
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
57
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
58
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
59
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
60
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
61
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
62
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
63
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
64
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
65
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
66
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
67

68
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
69
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
70

71
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
72
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
73
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
74
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
75
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
76

77
int32_t mndInitStream(SMnode *pMnode) {
1,858✔
78
  SSdbTable table = {
1,858✔
79
      .sdbType = SDB_STREAM,
80
      .keyType = SDB_KEY_BINARY,
81
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
82
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
83
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
84
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
85
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
86
  };
87
  SSdbTable tableSeq = {
1,858✔
88
      .sdbType = SDB_STREAM_SEQ,
89
      .keyType = SDB_KEY_BINARY,
90
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
91
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
92
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
93
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
94
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
95
  };
96

97
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,858✔
98
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,858✔
99
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
1,858✔
100

101
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
1,858✔
102
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
1,858✔
103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
1,858✔
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
1,858✔
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
1,858✔
106
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
1,858✔
107
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
1,858✔
108
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
1,858✔
109
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
1,858✔
110

111
  // for msgs inside mnode
112
  // TODO change the name
113
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
1,858✔
114
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
1,858✔
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
1,858✔
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
1,858✔
117

118
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
1,858✔
119
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_ALL_STOP_RSP, mndTransProcessRsp);
1,858✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
1,858✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
1,858✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
1,858✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
1,858✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
1,858✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
1,858✔
126
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
1,858✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
1,858✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
1,858✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
1,858✔
130

131
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
1,858✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
1,858✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
1,858✔
134

135
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,858✔
136
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,858✔
137
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,858✔
138
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,858✔
139

140
  int32_t code = mndInitExecInfo();
1,858✔
141
  if (code) {
1,858!
142
    return code;
×
143
  }
144

145
  code = sdbSetTable(pMnode->pSdb, table);
1,858✔
146
  if (code) {
1,858!
147
    return code;
×
148
  }
149

150
  code = sdbSetTable(pMnode->pSdb, tableSeq);
1,858✔
151
  return code;
1,858✔
152
}
153

154
void mndCleanupStream(SMnode *pMnode) {
1,857✔
155
  taosArrayDestroy(execInfo.pTaskList);
1,857✔
156
  taosArrayDestroy(execInfo.pNodeList);
1,857✔
157
  taosArrayDestroy(execInfo.pKilledChkptTrans);
1,857✔
158
  taosHashCleanup(execInfo.pTaskMap);
1,857✔
159
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
1,857✔
160
  taosHashCleanup(execInfo.pTransferStateStreams);
1,857✔
161
  taosHashCleanup(execInfo.pChkptStreams);
1,857✔
162
  taosHashCleanup(execInfo.pStreamConsensus);
1,857✔
163
  (void)taosThreadMutexDestroy(&execInfo.lock);
1,857✔
164
  mDebug("mnd stream exec info cleanup");
1,857✔
165
}
1,857✔
166

167
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
6,759✔
168
  int32_t     code = 0;
6,759✔
169
  int32_t     lino = 0;
6,759✔
170
  SSdbRow    *pRow = NULL;
6,759✔
171
  SStreamObj *pStream = NULL;
6,759✔
172
  void       *buf = NULL;
6,759✔
173
  int8_t      sver = 0;
6,759✔
174
  int32_t     tlen;
175
  int32_t     dataPos = 0;
6,759✔
176

177
  code = sdbGetRawSoftVer(pRaw, &sver);
6,759✔
178
  TSDB_CHECK_CODE(code, lino, _over);
6,759!
179

180
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
6,759!
181
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
182
    goto _over;
×
183
  }
184

185
  pRow = sdbAllocRow(sizeof(SStreamObj));
6,759✔
186
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
6,759!
187

188
  pStream = sdbGetRowObj(pRow);
6,759✔
189
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
6,759!
190

191
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
6,759!
192

193
  buf = taosMemoryMalloc(tlen + 1);
6,759!
194
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,759!
195

196
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,759!
197

198
  SDecoder decoder;
199
  tDecoderInit(&decoder, buf, tlen + 1);
6,759✔
200
  code = tDecodeSStreamObj(&decoder, pStream, sver);
6,759✔
201
  tDecoderClear(&decoder);
6,759✔
202

203
  if (code < 0) {
6,759!
204
    tFreeStreamObj(pStream);
×
205
  }
206

207
_over:
6,759✔
208
  taosMemoryFreeClear(buf);
6,759!
209

210
  if (code != TSDB_CODE_SUCCESS) {
6,759!
211
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
212
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
213
    taosMemoryFreeClear(pRow);
×
214

215
    terrno = code;
×
216
    return NULL;
×
217
  } else {
218
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
6,759✔
219
           pStream->checkpointId);
220

221
    terrno = 0;
6,759✔
222
    return pRow;
6,759✔
223
  }
224
}
225

226
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
1,834✔
227
  mTrace("stream:%s, perform insert action", pStream->name);
1,834✔
228
  return 0;
1,834✔
229
}
230

231
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
6,759✔
232
  mTrace("stream:%s, perform delete action", pStream->name);
6,759✔
233
  taosWLockLatch(&pStream->lock);
6,759✔
234
  tFreeStreamObj(pStream);
6,759✔
235
  taosWUnLockLatch(&pStream->lock);
6,759✔
236
  return 0;
6,759✔
237
}
238

239
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
3,551✔
240
  mTrace("stream:%s, perform update action", pOldStream->name);
3,551✔
241
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
3,551✔
242

243
  taosWLockLatch(&pOldStream->lock);
3,551✔
244

245
  pOldStream->status = pNewStream->status;
3,551✔
246
  pOldStream->updateTime = pNewStream->updateTime;
3,551✔
247
  pOldStream->checkpointId = pNewStream->checkpointId;
3,551✔
248
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
3,551✔
249

250
  taosWUnLockLatch(&pOldStream->lock);
3,551✔
251
  return 0;
3,551✔
252
}
253

254
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
7,071✔
255
  int32_t code = 0;
7,071✔
256
  SSdb   *pSdb = pMnode->pSdb;
7,071✔
257
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
7,071✔
258
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
7,071!
259
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
2,939✔
260
  }
261
  return code;
7,071✔
262
}
263

264
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
15,122✔
265
  SSdb *pSdb = pMnode->pSdb;
15,122✔
266
  sdbRelease(pSdb, pStream);
15,122✔
267
}
15,122✔
268

269
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
270
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
271
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
272
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
273
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
274

275
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
1,751✔
276
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
1,751!
277
      pCreate->targetStbFullName[0] == 0) {
1,751!
278
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
279
  }
280
  return TSDB_CODE_SUCCESS;
1,751✔
281
}
282

283
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
1,747✔
284
  pWrapper->nCols = taosArrayGetSize(pFields);
1,747✔
285
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
1,747!
286
  if (NULL == pWrapper->pSchema) {
1,747!
287
    return terrno;
×
288
  }
289

290
  int32_t index = 0;
1,747✔
291
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
62,404✔
292
    SField *pField = (SField *)taosArrayGet(pFields, i);
60,657✔
293
    if (pField == NULL) {
60,657!
294
      return terrno;
×
295
    }
296

297
    if (TSDB_DATA_TYPE_NULL == pField->type) {
60,657!
298
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
299
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
300
    } else {
301
      pWrapper->pSchema[index].type = pField->type;
60,657✔
302
      pWrapper->pSchema[index].bytes = pField->bytes;
60,657✔
303
    }
304
    pWrapper->pSchema[index].colId = index + 1;
60,657✔
305
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
60,657✔
306
    pWrapper->pSchema[index].flags = pField->flags;
60,657✔
307
    index += 1;
60,657✔
308
  }
309

310
  return TSDB_CODE_SUCCESS;
1,747✔
311
}
312

313
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
1,747✔
314
  if (pWrapper->nCols < 2) {
1,747!
315
    return false;
×
316
  }
317
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
60,838✔
318
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
59,118✔
319
      return true;
27✔
320
    }
321
  }
322
  return false;
1,720✔
323
}
324

325
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
1,747✔
326
  SNode      *pAst = NULL;
1,747✔
327
  SQueryPlan *pPlan = NULL;
1,747✔
328
  int32_t     code = 0;
1,747✔
329

330
  mInfo("stream:%s to create", pCreate->name);
1,747!
331
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
1,747✔
332
  pObj->createTime = taosGetTimestampMs();
1,747✔
333
  pObj->updateTime = pObj->createTime;
1,747✔
334
  pObj->version = 1;
1,747✔
335

336
  if (pCreate->smaId > 0) {
1,747✔
337
    pObj->subTableWithoutMd5 = 1;
259✔
338
  }
339

340
  pObj->smaId = pCreate->smaId;
1,747✔
341
  pObj->indexForMultiAggBalance = -1;
1,747✔
342

343
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,747✔
344

345
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
1,747✔
346
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
1,747✔
347

348
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,747✔
349
  pObj->status = 0;
1,747✔
350

351
  pObj->conf.igExpired = pCreate->igExpired;
1,747✔
352
  pObj->conf.trigger = pCreate->triggerType;
1,747✔
353
  pObj->conf.triggerParam = pCreate->maxDelay;
1,747✔
354
  pObj->conf.watermark = pCreate->watermark;
1,747✔
355
  pObj->conf.fillHistory = pCreate->fillHistory;
1,747✔
356
  pObj->deleteMark = pCreate->deleteMark;
1,747✔
357
  pObj->igCheckUpdate = pCreate->igUpdate;
1,747✔
358

359
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
1,747✔
360
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
1,747✔
361
  if (pSourceDb == NULL) {
1,747!
362
    code = terrno;
×
363
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
364
          tstrerror(code));
365
    goto FAIL;
×
366
  }
367

368
  pObj->sourceDbUid = pSourceDb->uid;
1,747✔
369
  mndReleaseDb(pMnode, pSourceDb);
1,747✔
370

371
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
1,747✔
372

373
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
1,747✔
374
  if (pTargetDb == NULL) {
1,747!
375
    code = terrno;
×
376
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
377
           tstrerror(code));
378
    goto FAIL;
×
379
  }
380

381
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
1,747✔
382

383
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
1,747✔
384
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,594✔
385
  } else {
386
    pObj->targetStbUid = pCreate->targetStbUid;
153✔
387
  }
388
  pObj->targetDbUid = pTargetDb->uid;
1,747✔
389
  mndReleaseDb(pMnode, pTargetDb);
1,747✔
390

391
  pObj->sql = pCreate->sql;
1,747✔
392
  pObj->ast = pCreate->ast;
1,747✔
393

394
  pCreate->sql = NULL;
1,747✔
395
  pCreate->ast = NULL;
1,747✔
396

397
  // deserialize ast
398
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
1,747!
399
    goto FAIL;
×
400
  }
401

402
  // create output schema
403
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
1,747!
404
    goto FAIL;
×
405
  }
406

407
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
1,747✔
408
  if (numOfNULL > 0) {
1,747✔
409
    pObj->outputSchema.nCols += numOfNULL;
26✔
410
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
26!
411
    if (!pFullSchema) {
26!
412
      code = terrno;
×
413
      goto FAIL;
×
414
    }
415

416
    int32_t nullIndex = 0;
26✔
417
    int32_t dataIndex = 0;
26✔
418
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
332✔
419
      if (nullIndex >= numOfNULL) {
306!
420
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
421
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
422
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
423
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
424
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
425
        dataIndex++;
×
426
      } else {
427
        SColLocation *pos = NULL;
306✔
428
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
306!
429
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
306✔
430
        }
431

432
        if (pos == NULL) {
306!
433
          mError("invalid null column index, %d", nullIndex);
×
434
          continue;
×
435
        }
436

437
        if (i < pos->slotId) {
306✔
438
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
79✔
439
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
79✔
440
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
79✔
441
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
79✔
442
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
79✔
443
          dataIndex++;
79✔
444
        } else {
445
          pFullSchema[i].bytes = 0;
227✔
446
          pFullSchema[i].colId = pos->colId;
227✔
447
          pFullSchema[i].flags = COL_SET_NULL;
227✔
448
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
227✔
449
          pFullSchema[i].type = pos->type;
227✔
450
          nullIndex++;
227✔
451
        }
452
      }
453
    }
454

455
    taosMemoryFree(pObj->outputSchema.pSchema);
26!
456
    pObj->outputSchema.pSchema = pFullSchema;
26✔
457
  }
458

459
  SPlanContext cxt = {
1,747✔
460
      .pAstRoot = pAst,
461
      .topicQuery = false,
462
      .streamQuery = true,
463
      .triggerType =
464
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
1,747✔
465
      .watermark = pObj->conf.watermark,
1,747✔
466
      .igExpired = pObj->conf.igExpired,
1,747✔
467
      .deleteMark = pObj->deleteMark,
1,747✔
468
      .igCheckUpdate = pObj->igCheckUpdate,
1,747✔
469
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
1,747✔
470
  };
471

472
  // using ast and param to build physical plan
473
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
1,747!
474
    goto FAIL;
×
475
  }
476

477
  // save physcial plan
478
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
1,747!
479
    goto FAIL;
×
480
  }
481

482
  pObj->tagSchema.nCols = pCreate->numOfTags;
1,747✔
483
  if (pCreate->numOfTags) {
1,747✔
484
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
284!
485
    if (pObj->tagSchema.pSchema == NULL) {
284!
486
      code = terrno;
×
487
      goto FAIL;
×
488
    }
489
  }
490

491
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
492
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
3,337✔
493
    SField *pField = taosArrayGet(pCreate->pTags, i);
1,590✔
494
    if (pField == NULL) {
1,590!
495
      continue;
×
496
    }
497

498
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
1,590✔
499
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
1,590✔
500
    pObj->tagSchema.pSchema[i].flags = pField->flags;
1,590✔
501
    pObj->tagSchema.pSchema[i].type = pField->type;
1,590✔
502
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
1,590✔
503
  }
504

505
FAIL:
1,747✔
506
  if (pAst != NULL) nodesDestroyNode(pAst);
1,747!
507
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
1,747!
508
  return code;
1,747✔
509
}
510

511
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
13,946✔
512
  SEncoder encoder;
513
  tEncoderInit(&encoder, NULL, 0);
13,946✔
514

515
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
13,946!
516
    pTask->ver = SSTREAM_TASK_VER;
×
517
  }
518

519
  int32_t code = tEncodeStreamTask(&encoder, pTask);
13,946✔
520
  if (code == -1) {
13,946!
521
    tEncoderClear(&encoder);
×
522
    return TSDB_CODE_INVALID_MSG;
×
523
  }
524

525
  int32_t size = encoder.pos;
13,946✔
526
  int32_t tlen = sizeof(SMsgHead) + size;
13,946✔
527
  tEncoderClear(&encoder);
13,946✔
528

529
  void *buf = taosMemoryCalloc(1, tlen);
13,946!
530
  if (buf == NULL) {
13,946!
531
    return terrno;
×
532
  }
533

534
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
13,946✔
535

536
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
13,946✔
537
  tEncoderInit(&encoder, abuf, size);
13,946✔
538
  code = tEncodeStreamTask(&encoder, pTask);
13,946✔
539
  tEncoderClear(&encoder);
13,946✔
540

541
  if (code != 0) {
13,946!
542
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
543
    taosMemoryFree(buf);
×
544
    return code;
×
545
  }
546

547
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
13,946✔
548
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
549
  if (code) {
13,946!
550
    taosMemoryFree(buf);
×
551
  }
552

553
  return code;
13,946✔
554
}
555

556
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
1,775✔
557
  SStreamTaskIter *pIter = NULL;
1,775✔
558
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,775✔
559
  if (code) {
1,775!
560
    mError("failed to create task iter for stream:%s", pStream->name);
×
561
    return code;
×
562
  }
563

564
  while (streamTaskIterNextTask(pIter)) {
10,890✔
565
    SStreamTask *pTask = NULL;
9,115✔
566
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,115✔
567
    if (code) {
9,115!
568
      destroyStreamTaskIter(pIter);
×
569
      return code;
×
570
    }
571

572
    code = mndPersistTaskDeployReq(pTrans, pTask);
9,115✔
573
    if (code) {
9,115!
574
      destroyStreamTaskIter(pIter);
×
575
      return code;
×
576
    }
577
  }
578

579
  destroyStreamTaskIter(pIter);
1,775✔
580

581
  // persistent stream task for already stored ts data
582
  if (pStream->conf.fillHistory) {
1,775✔
583
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
833✔
584

585
    for (int32_t i = 0; i < level; i++) {
2,561✔
586
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
1,728✔
587

588
      int32_t numOfTasks = taosArrayGetSize(pLevel);
1,728✔
589
      for (int32_t j = 0; j < numOfTasks; j++) {
6,559✔
590
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
4,831✔
591
        code = mndPersistTaskDeployReq(pTrans, pTask);
4,831✔
592
        if (code) {
4,831!
593
          return code;
×
594
        }
595
      }
596
    }
597
  }
598

599
  return code;
1,775✔
600
}
601

602
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
1,775✔
603
  int32_t code = 0;
1,775✔
604
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
1,775!
605
    return code;
×
606
  }
607

608
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,775✔
609
}
610

611
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
1,594✔
612
  SStbObj *pStb = NULL;
1,594✔
613
  SDbObj  *pDb = NULL;
1,594✔
614
  int32_t  code = 0;
1,594✔
615
  int32_t  lino = 0;
1,594✔
616

617
  SMCreateStbReq createReq = {0};
1,594✔
618
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,594✔
619
  createReq.numOfColumns = pStream->outputSchema.nCols;
1,594✔
620
  createReq.numOfTags = 1;  // group id
1,594✔
621
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
1,594✔
622
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
1,594!
623

624
  // build fields
625
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
60,595✔
626
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
59,001✔
627
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
59,001!
628

629
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
59,001✔
630
    pField->flags = pStream->outputSchema.pSchema[i].flags;
59,001✔
631
    pField->type = pStream->outputSchema.pSchema[i].type;
59,001✔
632
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
59,001✔
633
    pField->compress = createDefaultColCmprByType(pField->type);
59,001✔
634
  }
635

636
  if (pStream->tagSchema.nCols == 0) {
1,594✔
637
    createReq.numOfTags = 1;
1,310✔
638
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
1,310✔
639
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
1,310!
640

641
    // build tags
642
    SField *pField = taosArrayGet(createReq.pTags, 0);
1,310✔
643
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
1,310!
644

645
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
1,310✔
646
    pField->type = TSDB_DATA_TYPE_UBIGINT;
1,310✔
647
    pField->flags = 0;
1,310✔
648
    pField->bytes = 8;
1,310✔
649
  } else {
650
    createReq.numOfTags = pStream->tagSchema.nCols;
284✔
651
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
284✔
652
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
284!
653

654
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
1,874✔
655
      SField *pField = taosArrayGet(createReq.pTags, i);
1,590✔
656
      if (pField == NULL) {
1,590!
657
        continue;
×
658
      }
659

660
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
1,590✔
661
      pField->flags = pStream->tagSchema.pSchema[i].flags;
1,590✔
662
      pField->type = pStream->tagSchema.pSchema[i].type;
1,590✔
663
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
1,590✔
664
    }
665
  }
666

667
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
1,594!
668
    goto _OVER;
×
669
  }
670

671
  pStb = mndAcquireStb(pMnode, createReq.name);
1,594✔
672
  if (pStb != NULL) {
1,594!
673
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
674
    goto _OVER;
×
675
  }
676

677
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
1,594✔
678
  if (pDb == NULL) {
1,594!
679
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
680
    goto _OVER;
×
681
  }
682

683
  int32_t numOfStbs = -1;
1,594✔
684
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
1,594!
685
    goto _OVER;
×
686
  }
687

688
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
1,594!
689
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
690
    goto _OVER;
×
691
  }
692

693
  SStbObj stbObj = {0};
1,594✔
694

695
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
1,594!
696
    goto _OVER;
×
697
  }
698

699
  stbObj.uid = pStream->targetStbUid;
1,594✔
700

701
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
1,594!
702
    mndFreeStb(&stbObj);
×
703
    goto _OVER;
×
704
  }
705

706
  tFreeSMCreateStbReq(&createReq);
1,594✔
707
  mndFreeStb(&stbObj);
1,594✔
708
  mndReleaseStb(pMnode, pStb);
1,594✔
709
  mndReleaseDb(pMnode, pDb);
1,594✔
710
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
1,594✔
711
  return code;
1,594✔
712

713
_OVER:
×
714
  tFreeSMCreateStbReq(&createReq);
×
715
  mndReleaseStb(pMnode, pStb);
×
716
  mndReleaseDb(pMnode, pDb);
×
717

718
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
719
         tstrerror(code));
720
  return code;
×
721
}
722

723
// 1. stream number check
724
// 2. target stable can not be target table of other existed streams.
725
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
1,747✔
726
  int32_t     numOfStream = 0;
1,747✔
727
  SStreamObj *pStream = NULL;
1,747✔
728
  void       *pIter = NULL;
1,747✔
729

730
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
4,023✔
731
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
2,277✔
732
      ++numOfStream;
1,615✔
733
    }
734

735
    sdbRelease(pMnode->pSdb, pStream);
2,277✔
736

737
    if (numOfStream > MND_STREAM_MAX_NUM) {
2,277!
738
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
739
             pStreamObj->name);
740
      sdbCancelFetch(pMnode->pSdb, pIter);
×
741
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
742
    }
743

744
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
2,277✔
745
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
746
             pStreamObj->name);
747
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
748
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
749
    }
750
  }
751

752
  return TSDB_CODE_SUCCESS;
1,746✔
753
}
754

755
static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
×
756

757
static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
×
758
                                       SStreamTask *pTask) {
759
  int32_t code = TSDB_CODE_SUCCESS;
×
760
  int32_t lino = 0;
×
761

762
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
763
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
764

765
  pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
×
766
  TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
×
767
  pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
×
768
  pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
×
769
  pTask->notifyInfo.streamName = taosStrdup(mndGetDbStr(createReq->name));
×
770
  TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
×
771
  pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
×
772
  TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
×
773
  pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
×
774
  TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
×
775

776
_end:
×
777
  if (code != TSDB_CODE_SUCCESS) {
×
778
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
779
  }
780
  return code;
×
781
}
782

783
static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
1,746✔
784
  int32_t code = TSDB_CODE_SUCCESS;
1,746✔
785
  int32_t lino = 0;
1,746✔
786
  int32_t level = 0;
1,746✔
787
  int32_t nTasks = 0;
1,746✔
788
  SArray *pLevel = NULL;
1,746✔
789

790
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,746!
791
  TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,746!
792

793
  if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
1,746!
794
    goto _end;
1,746✔
795
  }
796

797
  level = taosArrayGetSize(pStream->tasks);
×
798
  for (int32_t i = 0; i < level; ++i) {
×
799
    pLevel = taosArrayGetP(pStream->tasks, i);
×
800
    nTasks = taosArrayGetSize(pLevel);
×
801
    for (int32_t j = 0; j < nTasks; ++j) {
×
802
      code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
803
      TSDB_CHECK_CODE(code, lino, _end);
×
804
    }
805
  }
806

807
  if (pStream->conf.fillHistory && createReq->notifyHistory) {
×
808
    level = taosArrayGetSize(pStream->pHTasksList);
×
809
    for (int32_t i = 0; i < level; ++i) {
×
810
      pLevel = taosArrayGetP(pStream->pHTasksList, i);
×
811
      nTasks = taosArrayGetSize(pLevel);
×
812
      for (int32_t j = 0; j < nTasks; ++j) {
×
813
        code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
814
        TSDB_CHECK_CODE(code, lino, _end);
×
815
      }
816
    }
817
  }
818

819
_end:
×
820
  if (code != TSDB_CODE_SUCCESS) {
1,746!
821
    mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
×
822
  }
823
  return code;
1,746✔
824
}
825

826
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
1,751✔
827
  SMnode     *pMnode = pReq->info.node;
1,751✔
828
  SStreamObj *pStream = NULL;
1,751✔
829
  SStreamObj  streamObj = {0};
1,751✔
830
  char       *sql = NULL;
1,751✔
831
  int32_t     sqlLen = 0;
1,751✔
832
  const char *pMsg = "create stream tasks on dnodes";
1,751✔
833
  int32_t     code = TSDB_CODE_SUCCESS;
1,751✔
834
  int32_t     lino = 0;
1,751✔
835
  STrans     *pTrans = NULL;
1,751✔
836

837
  SCMCreateStreamReq createReq = {0};
1,751✔
838
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
1,751✔
839
  TSDB_CHECK_CODE(code, lino, _OVER);
1,751!
840

841
#ifdef WINDOWS
842
  code = TSDB_CODE_MND_INVALID_PLATFORM;
843
  goto _OVER;
844
#endif
845

846
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
1,751!
847
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
1,751!
848
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
849
    goto _OVER;
×
850
  }
851

852
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
1,751✔
853
  if (pStream != NULL && code == 0) {
1,751!
854
    if (createReq.igExists) {
2✔
855
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
856
      mndReleaseStream(pMnode, pStream);
1✔
857
      tFreeSCMCreateStreamReq(&createReq);
1✔
858
      return code;
1✔
859
    } else {
860
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
861
      goto _OVER;
1✔
862
    }
863
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
1,749!
864
    goto _OVER;
×
865
  }
866

867
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
1,749!
868
    goto _OVER;
×
869
  }
870

871
  if (createReq.sql != NULL) {
1,749!
872
    sql = taosStrdup(createReq.sql);
1,749!
873
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
1,749!
874
  }
875

876
  // check for the taskEp update trans
877
  if (isNodeUpdateTransActive()) {
1,749!
878
    mError("stream:%s failed to create stream, node update trans is active", createReq.name);
×
879
    code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
880
    goto _OVER;
×
881
  }
882

883
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
1,749✔
884
  if (pSourceDb == NULL) {
1,749!
885
    code = terrno;
×
886
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
887
          tstrerror(code));
888
    goto _OVER;
×
889
  }
890

891
  code = mndCheckForSnode(pMnode, pSourceDb);
1,749✔
892
  mndReleaseDb(pMnode, pSourceDb);
1,749✔
893
  if (code != 0) {
1,749✔
894
    goto _OVER;
2✔
895
  }
896

897
  // build stream obj from request
898
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
1,747!
899
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
900
    goto _OVER;
×
901
  }
902

903
  code = doStreamCheck(pMnode, &streamObj);
1,747✔
904
  TSDB_CHECK_CODE(code, lino, _OVER);
1,747✔
905

906
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
1,746✔
907
  if (pTrans == NULL || code) {
1,746!
908
    goto _OVER;
×
909
  }
910

911
  // create stb for stream
912
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
1,746✔
913
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
1,594!
914
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
915
      mndTransDrop(pTrans);
×
916
      goto _OVER;
×
917
    }
918
  } else {
919
    mDebug("stream:%s no need create stable", createReq.name);
152✔
920
  }
921

922
  // schedule stream task for stream obj
923
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
1,746✔
924
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,746!
925
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
926
    mndTransDrop(pTrans);
×
927
    goto _OVER;
×
928
  }
929

930
  // add notify info into all stream tasks
931
  code = addStreamNotifyInfo(&createReq, &streamObj);
1,746✔
932
  if (code != TSDB_CODE_SUCCESS) {
1,746!
933
    mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
×
934
    mndTransDrop(pTrans);
×
935
    goto _OVER;
×
936
  }
937

938
  // add stream to trans
939
  code = mndPersistStream(pTrans, &streamObj);
1,746✔
940
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,746!
941
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
942
    mndTransDrop(pTrans);
×
943
    goto _OVER;
×
944
  }
945

946
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
1,746!
947
    mndTransDrop(pTrans);
×
948
    goto _OVER;
×
949
  }
950

951
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
1,746!
952
    mndTransDrop(pTrans);
×
953
    goto _OVER;
×
954
  }
955

956
  // add into buffer firstly
957
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
958
  streamMutexLock(&execInfo.lock);
1,746✔
959
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
1,746✔
960
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
1,746✔
961
  streamMutexUnlock(&execInfo.lock);
1,746✔
962

963
  // execute creation
964
  code = mndTransPrepare(pMnode, pTrans);
1,746✔
965
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,746!
966
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
967
    mndTransDrop(pTrans);
×
968
    goto _OVER;
×
969
  }
970

971
  mndTransDrop(pTrans);
1,746✔
972

973
  SName dbname = {0};
1,746✔
974
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,746✔
975
  if (code) {
1,746!
976
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
977
    goto _OVER;
×
978
  }
979

980
  SName name = {0};
1,746✔
981
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
1,746✔
982
  if (code) {
1,746!
983
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
984
    goto _OVER;
×
985
  }
986

987
  // reuse this function for stream
988
  if (sql != NULL && sqlLen > 0) {
1,746!
989
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
990
  } else {
991
    char detail[1000] = {0};
1,746✔
992
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
1,746✔
993
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
1,746✔
994
  }
995

996
_OVER:
1,750✔
997
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,750!
998
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
4!
999
  } else {
1000
    mDebug("stream:%s create stream completed", createReq.name);
1,746✔
1001
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,746✔
1002
  }
1003

1004
  mndReleaseStream(pMnode, pStream);
1,750✔
1005
  tFreeSCMCreateStreamReq(&createReq);
1,750✔
1006
  tFreeStreamObj(&streamObj);
1,750✔
1007

1008
  if (sql != NULL) {
1,750✔
1009
    taosMemoryFreeClear(sql);
1,749!
1010
  }
1011

1012
  return code;
1,750✔
1013
}
1014

1015
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
1016
  SMnode          *pMnode = pReq->info.node;
×
1017
  SStreamObj      *pStream = NULL;
×
1018
  int32_t          code = 0;
×
1019
  SMPauseStreamReq pauseReq = {0};
×
1020

1021
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1022
    return TSDB_CODE_INVALID_MSG;
×
1023
  }
1024

1025
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
1026
  if (pStream == NULL || code != 0) {
×
1027
    if (pauseReq.igNotExists) {
×
1028
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
1029
      return 0;
×
1030
    } else {
1031
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
1032
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1033
    }
1034
  }
1035

1036
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
1037
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
1038
    sdbRelease(pMnode->pSdb, pStream);
×
1039
    return code;
×
1040
  }
1041

1042
  // check if it is conflict with other trans in both sourceDb and targetDb.
1043
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
1044
  if (code) {
×
1045
    sdbRelease(pMnode->pSdb, pStream);
×
1046
    return code;
×
1047
  }
1048

1049
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
1050
  if (updated) {
×
1051
    mError("tasks are not ready for restart, node update detected");
×
1052
    sdbRelease(pMnode->pSdb, pStream);
×
1053
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1054
  }
1055

1056
  STrans *pTrans = NULL;
×
1057
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
1058
                       &pTrans);
1059
  if (pTrans == NULL || code) {
×
1060
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1061
    sdbRelease(pMnode->pSdb, pStream);
×
1062
    return code;
×
1063
  }
1064

1065
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
1066
  if (code) {
×
1067
    sdbRelease(pMnode->pSdb, pStream);
×
1068
    mndTransDrop(pTrans);
×
1069
    return code;
×
1070
  }
1071

1072
  // if nodeUpdate happened, not send pause trans
1073
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
1074
  if (code) {
×
1075
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
1076
    sdbRelease(pMnode->pSdb, pStream);
×
1077
    mndTransDrop(pTrans);
×
1078
    return code;
×
1079
  }
1080

1081
  code = mndTransPrepare(pMnode, pTrans);
×
1082
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1083
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
1084
    sdbRelease(pMnode->pSdb, pStream);
×
1085
    mndTransDrop(pTrans);
×
1086
    return code;
×
1087
  }
1088

1089
  sdbRelease(pMnode->pSdb, pStream);
×
1090
  mndTransDrop(pTrans);
×
1091

1092
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1093
}
1094

1095
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
1,387✔
1096
  SStreamObj *pStream = NULL;
1,387✔
1097
  void       *pIter = NULL;
1,387✔
1098
  SSdb       *pSdb = pMnode->pSdb;
1,387✔
1099
  int64_t     maxChkptId = 0;
1,387✔
1100

1101
  while (1) {
1102
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4,830✔
1103
    if (pIter == NULL) break;
4,830✔
1104

1105
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
3,443✔
1106
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
3,443✔
1107
           pStream->checkpointId);
1108
    sdbRelease(pSdb, pStream);
3,443✔
1109
  }
1110

1111
  {  // check the max checkpoint id from all vnodes.
1112
    int64_t maxCheckpointId = -1;
1,387✔
1113
    if (lock) {
1,387✔
1114
      streamMutexLock(&execInfo.lock);
649✔
1115
    }
1116

1117
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
16,363✔
1118
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
14,976✔
1119
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
14,976✔
1120
      if (p == NULL || pEntry == NULL) {
14,976!
1121
        continue;
×
1122
      }
1123

1124
      if (pEntry->checkpointInfo.failed) {
14,976!
1125
        continue;
×
1126
      }
1127

1128
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
14,976✔
1129
        maxCheckpointId = pEntry->checkpointInfo.latestId;
1,836✔
1130
      }
1131
    }
1132

1133
    if (lock) {
1,387✔
1134
      streamMutexUnlock(&execInfo.lock);
649✔
1135
    }
1136

1137
    if (maxCheckpointId > maxChkptId) {
1,387!
1138
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1139
             maxCheckpointId);
1140
      maxChkptId = maxCheckpointId;
×
1141
    }
1142
  }
1143

1144
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
1,387✔
1145
  return maxChkptId + 1;
1,387✔
1146
}
1147

1148
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
1,389✔
1149
                                               int8_t mndTrigger, bool lock) {
1150
  int32_t code = TSDB_CODE_SUCCESS;
1,389✔
1151
  bool    conflict = false;
1,389✔
1152
  int64_t ts = taosGetTimestampMs();
1,389✔
1153
  STrans *pTrans = NULL;
1,389✔
1154

1155
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
1,389!
1156
    return code;
×
1157
  }
1158

1159
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
1,389✔
1160
  if (code) {
1,389✔
1161
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
1!
1162
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
1163
    goto _ERR;
1✔
1164
  }
1165

1166
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
1,388✔
1167
                       "gen checkpoint for stream", &pTrans);
1168
  if (code) {
1,388!
1169
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1170
           tstrerror(code));
1171
    goto _ERR;
×
1172
  }
1173

1174
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
1,388✔
1175
  if (code) {
1,388!
1176
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
1177
    goto _ERR;
×
1178
  }
1179

1180
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
1,388✔
1181

1182
  taosWLockLatch(&pStream->lock);
1,388✔
1183
  pStream->currentTick = 1;
1,388✔
1184

1185
  // 1. redo action: broadcast checkpoint source msg for all source vg
1186
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
1,388✔
1187
  for (int32_t i = 0; i < totalLevel; i++) {
4,195✔
1188
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
2,807✔
1189
    SStreamTask *p = taosArrayGetP(pLevel, 0);
2,807✔
1190

1191
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
2,807✔
1192
      int32_t sz = taosArrayGetSize(pLevel);
1,388✔
1193
      for (int32_t j = 0; j < sz; j++) {
4,650✔
1194
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
3,262✔
1195
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
3,262✔
1196

1197
        if (code != TSDB_CODE_SUCCESS) {
3,262!
1198
          taosWUnLockLatch(&pStream->lock);
×
1199
          goto _ERR;
×
1200
        }
1201
      }
1202
    }
1203
  }
1204

1205
  // 2. reset tick
1206
  pStream->checkpointId = checkpointId;
1,388✔
1207
  pStream->checkpointFreq = taosGetTimestampMs();
1,388✔
1208
  pStream->currentTick = 0;
1,388✔
1209

1210
  // 3. commit log: stream checkpoint info
1211
  pStream->version = pStream->version + 1;
1,388✔
1212
  taosWUnLockLatch(&pStream->lock);
1,388✔
1213

1214
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
1,388!
1215
    goto _ERR;
×
1216
  }
1217

1218
  code = mndTransPrepare(pMnode, pTrans);
1,388✔
1219
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,388!
1220
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1221
  } else {
1222
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,388✔
1223
  }
1224

1225
_ERR:
1,389✔
1226
  mndTransDrop(pTrans);
1,389✔
1227
  return code;
1,389✔
1228
}
1229

1230
int32_t extractStreamNodeList(SMnode *pMnode) {
3,736✔
1231
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
3,736✔
1232
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
727✔
1233
    if (code) {
727!
1234
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
1235
      return code;
×
1236
    }
1237
  }
1238

1239
  return taosArrayGetSize(execInfo.pNodeList);
3,736✔
1240
}
1241

1242
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,802✔
1243
  int32_t code = 0;
1,802✔
1244
  if (mndStreamNodeIsUpdated(pMnode)) {
1,802✔
1245
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
31✔
1246
  }
1247

1248
  streamMutexLock(&execInfo.lock);
1,771✔
1249
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,771✔
1250
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
727✔
1251
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
727!
1252
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
1253
      code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1254
    }
1255
  }
1256

1257
  streamMutexUnlock(&execInfo.lock);
1,771✔
1258
  return code;
1,771✔
1259
}
1260

1261
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
1,908✔
1262
  int64_t ts = -1;
1,908✔
1263
  int32_t taskId = -1;
1,908✔
1264

1265
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
30,589✔
1266
    STaskId          *p = taosArrayGet(pTaskList, i);
28,812✔
1267
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
28,812✔
1268
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
28,812!
1269
      continue;
22,144✔
1270
    }
1271

1272
    // -1 denote not ready now or never ready till now
1273
    if (pEntry->hTaskId != 0) {
6,668✔
1274
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
6!
1275
            " exists, checkpoint not issued",
1276
            pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1277
            pEntry->hTaskId);
1278
      return -1;
6✔
1279
    }
1280

1281
    if (pEntry->status != TASK_STATUS__READY) {
6,662✔
1282
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
125!
1283
            (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1284
      return -1;
125✔
1285
    }
1286

1287
    if (ts < pEntry->startTime) {
6,537✔
1288
      ts = pEntry->startTime;
3,515✔
1289
      taskId = pEntry->id.taskId;
3,515✔
1290
    }
1291
  }
1292

1293
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
1,777✔
1294
  return ts;
1,777✔
1295
}
1296

1297
typedef struct {
1298
  int64_t streamId;
1299
  int64_t duration;
1300
} SCheckpointInterval;
1301

1302
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
704✔
1303
  const SCheckpointInterval *pInt1 = p1;
704✔
1304
  const SCheckpointInterval *pInt2 = p2;
704✔
1305
  if (pInt1->duration == pInt2->duration) {
704✔
1306
    return 0;
54✔
1307
  }
1308

1309
  return pInt1->duration > pInt2->duration ? -1 : 1;
650✔
1310
}
1311

1312
// all tasks of this stream should be ready, otherwise do nothing
1313
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
1,908✔
1314
  bool ready = false;
1,908✔
1315

1316
  streamMutexLock(&execInfo.lock);
1,908✔
1317

1318
  int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
1,908✔
1319
  if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
1,908!
1320

1321
    if (lastReadyTs != -1) {
561✔
1322
      mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64
430!
1323
            "ms less than threshold",
1324
            pStream->uid, lastReadyTs, (now - lastReadyTs));
1325
    }
1326

1327
    ready = false;
561✔
1328
  } else {
1329
    ready = true;
1,347✔
1330
  }
1331

1332
  streamMutexUnlock(&execInfo.lock);
1,908✔
1333
  return ready;
1,908✔
1334
}
1335

1336
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,802✔
1337
  SMnode     *pMnode = pReq->info.node;
1,802✔
1338
  SSdb       *pSdb = pMnode->pSdb;
1,802✔
1339
  void       *pIter = NULL;
1,802✔
1340
  SStreamObj *pStream = NULL;
1,802✔
1341
  int32_t     code = 0;
1,802✔
1342
  int32_t     numOfCheckpointTrans = 0;
1,802✔
1343
  SArray     *pLongChkpts = NULL;
1,802✔
1344
  SArray     *pList = NULL;
1,802✔
1345
  int64_t     now = taosGetTimestampMs();
1,802✔
1346

1347
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,802✔
1348
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
31✔
1349
  }
1350

1351
  pList = taosArrayInit(4, sizeof(SCheckpointInterval));
1,771✔
1352
  if (pList == NULL) {
1,771!
1353
    mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
1354
    return terrno;
×
1355
  }
1356

1357
  pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo));
1,771✔
1358
  if (pLongChkpts == NULL) {
1,771!
1359
    mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
1360
    taosArrayDestroy(pList);
×
1361
    return terrno;
×
1362
  }
1363

1364
  // check if ongong checkpoint trans or long chkpt trans exist.
1365
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts);
1,771✔
1366
  if (code) {
1,771!
1367
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1368

1369
    taosArrayDestroy(pList);
×
1370
    taosArrayDestroy(pLongChkpts);
×
1371
    return code;
×
1372
  }
1373

1374
  // kill long exec checkpoint and set task status
1375
  if (taosArrayGetSize(pLongChkpts) > 0) {
1,771✔
1376
    killChkptAndResetStreamTask(pMnode, pLongChkpts);
1✔
1377

1378
    taosArrayDestroy(pList);
1✔
1379
    taosArrayDestroy(pLongChkpts);
1✔
1380
    return TSDB_CODE_SUCCESS;
1✔
1381
  }
1382

1383
  taosArrayDestroy(pLongChkpts);
1,770✔
1384

1385
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
4,580✔
1386
    int64_t duration = now - pStream->checkpointFreq;
2,810✔
1387
    if (duration < tsStreamCheckpointInterval * 1000) {
2,810✔
1388
      sdbRelease(pSdb, pStream);
902✔
1389
      continue;
1,463✔
1390
    }
1391

1392
    bool ready = isStreamReadyHelp(now, pStream);
1,908✔
1393
    if (!ready) {
1,908✔
1394
      sdbRelease(pSdb, pStream);
561✔
1395
      continue;
561✔
1396
    }
1397

1398
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
1,347✔
1399
    void               *p = taosArrayPush(pList, &in);
1,347✔
1400
    if (p) {
1,347!
1401
      int32_t currentSize = taosArrayGetSize(pList);
1,347✔
1402
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
1,347✔
1403
             "s), concurrently launch threshold:%d",
1404
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1405
             tsMaxConcurrentCheckpoint);
1406
    } else {
1407
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1408
    }
1409
    sdbRelease(pSdb, pStream);
1,347✔
1410
  }
1411

1412
  int32_t size = taosArrayGetSize(pList);
1,770✔
1413
  if (size == 0) {
1,770✔
1414
    taosArrayDestroy(pList);
1,102✔
1415
    return code;
1,102✔
1416
  }
1417

1418
  taosArraySort(pList, streamWaitComparFn);
668✔
1419

1420
  int32_t numOfQual = taosArrayGetSize(pList);
668✔
1421
  if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
668✔
1422
    mDebug(
19!
1423
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1424
        "checkpoint trans are not allowed, wait for 30s",
1425
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1426
    taosArrayDestroy(pList);
19✔
1427
    return code;
19✔
1428
  }
1429

1430
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
649✔
1431
  mDebug(
649✔
1432
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1433
      "concurrent trans threshold:%d",
1434
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1435

1436
  int32_t started = 0;
649✔
1437
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
649✔
1438

1439
  for (int32_t i = 0; i < numOfQual; ++i) {
653✔
1440
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
651✔
1441
    if (pCheckpointInfo == NULL) {
651!
1442
      continue;
×
1443
    }
1444

1445
    SStreamObj *p = NULL;
651✔
1446
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
651✔
1447
    if (p != NULL && code == 0) {
651!
1448
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
651✔
1449
      sdbRelease(pSdb, p);
651✔
1450

1451
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
651!
1452
        started += 1;
651✔
1453

1454
        if (started >= capacity) {
651✔
1455
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
647✔
1456
                 (started + numOfCheckpointTrans));
1457
          break;
647✔
1458
        }
1459
      } else {
1460
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1461
      }
1462
    }
1463
  }
1464

1465
  taosArrayDestroy(pList);
649✔
1466
  return code;
649✔
1467
}
1468

1469
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
1,415✔
1470
  SMnode     *pMnode = pReq->info.node;
1,415✔
1471
  SStreamObj *pStream = NULL;
1,415✔
1472
  int32_t     code = 0;
1,415✔
1473

1474
  SMDropStreamReq dropReq = {0};
1,415✔
1475
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
1,415!
1476
    mError("invalid drop stream msg recv, discarded");
×
1477
    code = TSDB_CODE_INVALID_MSG;
×
1478
    TAOS_RETURN(code);
×
1479
  }
1480

1481
  mDebug("recv drop stream:%s msg", dropReq.name);
1,415✔
1482

1483
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
1,415✔
1484
  if (pStream == NULL || code != 0) {
1,415!
1485
    if (dropReq.igNotExists) {
141✔
1486
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
131!
1487
      sdbRelease(pMnode->pSdb, pStream);
131✔
1488
      tFreeMDropStreamReq(&dropReq);
131✔
1489
      return 0;
131✔
1490
    } else {
1491
      mError("stream:%s not exist failed to drop it", dropReq.name);
10!
1492
      tFreeMDropStreamReq(&dropReq);
10✔
1493
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
10✔
1494
    }
1495
  }
1496

1497
  if (pStream->smaId != 0) {
1,274✔
1498
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
219!
1499

1500
    void    *pIter = NULL;
219✔
1501
    SSmaObj *pSma = NULL;
219✔
1502
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
219✔
1503
    while (pIter) {
361✔
1504
      if (pSma && pSma->uid == pStream->smaId) {
147!
1505
        sdbRelease(pMnode->pSdb, pSma);
5✔
1506
        sdbRelease(pMnode->pSdb, pStream);
5✔
1507

1508
        sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1509
        tFreeMDropStreamReq(&dropReq);
5✔
1510
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
5✔
1511

1512
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
5!
1513
               dropReq.name, pStream->uid, tstrerror(terrno));
1514
        TAOS_RETURN(code);
5✔
1515
      }
1516

1517
      if (pSma) {
142!
1518
        sdbRelease(pMnode->pSdb, pSma);
142✔
1519
      }
1520

1521
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
142✔
1522
    }
1523
  }
1524

1525
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
1,269!
1526
    sdbRelease(pMnode->pSdb, pStream);
×
1527
    tFreeMDropStreamReq(&dropReq);
×
1528
    return -1;
×
1529
  }
1530

1531
  // check if it is conflict with other trans in both sourceDb and targetDb.
1532
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
1,269✔
1533
  if (code) {
1,269!
1534
    sdbRelease(pMnode->pSdb, pStream);
×
1535
    tFreeMDropStreamReq(&dropReq);
×
1536
    return code;
×
1537
  }
1538

1539
  STrans *pTrans = NULL;
1,269✔
1540
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
1,269✔
1541
  if (pTrans == NULL || code) {
1,269!
1542
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1543
    sdbRelease(pMnode->pSdb, pStream);
×
1544
    tFreeMDropStreamReq(&dropReq);
×
1545
    TAOS_RETURN(code);
×
1546
  }
1547

1548
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
1,269✔
1549
  if (code) {
1,269!
1550
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1551
    sdbRelease(pMnode->pSdb, pStream);
×
1552
    mndTransDrop(pTrans);
×
1553
    tFreeMDropStreamReq(&dropReq);
×
1554
    TAOS_RETURN(code);
×
1555
  }
1556

1557
  // drop all tasks
1558
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
1,269✔
1559
  if (code) {
1,269!
1560
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1561
    sdbRelease(pMnode->pSdb, pStream);
×
1562
    mndTransDrop(pTrans);
×
1563
    tFreeMDropStreamReq(&dropReq);
×
1564
    TAOS_RETURN(code);
×
1565
  }
1566

1567
  // drop stream
1568
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
1,269✔
1569
  if (code) {
1,269!
1570
    sdbRelease(pMnode->pSdb, pStream);
×
1571
    mndTransDrop(pTrans);
×
1572
    tFreeMDropStreamReq(&dropReq);
×
1573
    TAOS_RETURN(code);
×
1574
  }
1575

1576
  code = mndTransPrepare(pMnode, pTrans);
1,269✔
1577
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,269!
1578
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1579
    sdbRelease(pMnode->pSdb, pStream);
×
1580
    mndTransDrop(pTrans);
×
1581
    tFreeMDropStreamReq(&dropReq);
×
1582
    TAOS_RETURN(code);
×
1583
  }
1584

1585
  // kill the related checkpoint trans
1586
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
1,269✔
1587
  if (transId != 0) {
1,269!
1588
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
1589
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1590
  }
1591

1592
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
1,269✔
1593
         pStream->uid, transId);
1594

1595
  removeStreamTasksInBuf(pStream, &execInfo);
1,269✔
1596

1597
  SName name = {0};
1,269✔
1598
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,269✔
1599
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
1,269✔
1600

1601
  sdbRelease(pMnode->pSdb, pStream);
1,269✔
1602
  mndTransDrop(pTrans);
1,269✔
1603
  tFreeMDropStreamReq(&dropReq);
1,269✔
1604

1605
  if (code == 0) {
1,269✔
1606
    return TSDB_CODE_ACTION_IN_PROGRESS;
1,259✔
1607
  } else {
1608
    TAOS_RETURN(code);
10✔
1609
  }
1610
}
1611

1612
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,884✔
1613
  SSdb   *pSdb = pMnode->pSdb;
1,884✔
1614
  void   *pIter = NULL;
1,884✔
1615
  int32_t code = 0;
1,884✔
1616

1617
  while (1) {
565✔
1618
    SStreamObj *pStream = NULL;
2,449✔
1619
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,449✔
1620
    if (pIter == NULL) break;
2,449✔
1621

1622
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
566✔
1623
      if (pStream->sourceDbUid != pStream->targetDbUid) {
83✔
1624
        sdbRelease(pSdb, pStream);
1✔
1625
        sdbCancelFetch(pSdb, pIter);
1✔
1626
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1627
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1628
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1629
      } else {
1630
        // kill the related checkpoint trans
1631
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
82✔
1632
        if (transId != 0) {
82!
1633
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
1634
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1635
        }
1636

1637
        // drop the stream obj in execInfo
1638
        removeStreamTasksInBuf(pStream, &execInfo);
82✔
1639

1640
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
82✔
1641
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
82!
1642
          sdbRelease(pSdb, pStream);
×
1643
          sdbCancelFetch(pSdb, pIter);
×
1644
          return code;
×
1645
        }
1646
      }
1647
    }
1648

1649
    sdbRelease(pSdb, pStream);
565✔
1650
  }
1651

1652
  return 0;
1,883✔
1653
}
1654

1655
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,267✔
1656
  SMnode     *pMnode = pReq->info.node;
11,267✔
1657
  SSdb       *pSdb = pMnode->pSdb;
11,267✔
1658
  int32_t     numOfRows = 0;
11,267✔
1659
  SStreamObj *pStream = NULL;
11,267✔
1660
  int32_t     code = 0;
11,267✔
1661

1662
  while (numOfRows < rows) {
44,495!
1663
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
44,495✔
1664
    if (pShow->pIter == NULL) break;
44,500✔
1665

1666
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
33,225✔
1667
    if (code == 0) {
33,207!
1668
      numOfRows++;
33,208✔
1669
    }
1670
    sdbRelease(pSdb, pStream);
33,207✔
1671
  }
1672

1673
  pShow->numOfRows += numOfRows;
11,275✔
1674
  return numOfRows;
11,275✔
1675
}
1676

1677
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1678
  SSdb *pSdb = pMnode->pSdb;
×
1679
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1680
}
×
1681

1682
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
21,507✔
1683
  SMnode     *pMnode = pReq->info.node;
21,507✔
1684
  SSdb       *pSdb = pMnode->pSdb;
21,507✔
1685
  int32_t     numOfRows = 0;
21,507✔
1686
  SStreamObj *pStream = NULL;
21,507✔
1687
  int32_t     code = 0;
21,507✔
1688

1689
  streamMutexLock(&execInfo.lock);
21,507✔
1690
  mndInitStreamExecInfo(pMnode, &execInfo);
21,516✔
1691
  streamMutexUnlock(&execInfo.lock);
21,516✔
1692

1693
  while (numOfRows < rowsCapacity) {
86,567✔
1694
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
86,517✔
1695
    if (pShow->pIter == NULL) {
86,516✔
1696
      break;
21,466✔
1697
    }
1698

1699
    // lock
1700
    taosRLockLatch(&pStream->lock);
65,050✔
1701

1702
    int32_t count = mndGetNumOfStreamTasks(pStream);
65,049✔
1703
    if (numOfRows + count > rowsCapacity) {
65,001✔
1704
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
40✔
1705
      if (code) {
40!
1706
        mError("failed to prepare the result block buffer, quit return value");
×
1707
        taosRUnLockLatch(&pStream->lock);
×
1708
        sdbRelease(pSdb, pStream);
×
1709
        continue;
×
1710
      }
1711
    }
1712

1713
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
65,001✔
1714
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
65,001✔
1715
    if (pSourceDb != NULL) {
65,025!
1716
      precision = pSourceDb->cfg.precision;
65,028✔
1717
      mndReleaseDb(pMnode, pSourceDb);
65,028✔
1718
    }
1719

1720
    // add row for each task
1721
    SStreamTaskIter *pIter = NULL;
65,050✔
1722
    code = createStreamTaskIter(pStream, &pIter);
65,050✔
1723
    if (code) {
65,051!
1724
      taosRUnLockLatch(&pStream->lock);
×
1725
      sdbRelease(pSdb, pStream);
×
1726
      mError("failed to create task iter for stream:%s", pStream->name);
×
1727
      continue;
×
1728
    }
1729

1730
    while (streamTaskIterNextTask(pIter)) {
287,797✔
1731
      SStreamTask *pTask = NULL;
222,860✔
1732
      code = streamTaskIterGetCurrent(pIter, &pTask);
222,860✔
1733
      if (code) {
222,880!
1734
        destroyStreamTaskIter(pIter);
×
1735
        break;
×
1736
      }
1737

1738
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
222,880✔
1739
      if (code == TSDB_CODE_SUCCESS) {
222,746!
1740
        numOfRows++;
222,769✔
1741
      }
1742
    }
1743

1744
    pBlock->info.rows = numOfRows;
64,784✔
1745

1746
    destroyStreamTaskIter(pIter);
64,784✔
1747
    taosRUnLockLatch(&pStream->lock);
64,975✔
1748

1749
    sdbRelease(pSdb, pStream);
65,025✔
1750
  }
1751

1752
  pShow->numOfRows += numOfRows;
21,516✔
1753
  return numOfRows;
21,516✔
1754
}
1755

1756
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
1757
  SSdb *pSdb = pMnode->pSdb;
×
1758
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1759
}
×
1760

1761
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
761✔
1762
  SMnode     *pMnode = pReq->info.node;
761✔
1763
  SStreamObj *pStream = NULL;
761✔
1764
  int32_t     code = 0;
761✔
1765

1766
  SMPauseStreamReq pauseReq = {0};
761✔
1767
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
761!
1768
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1769
  }
1770

1771
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
761✔
1772
  if (pStream == NULL || code != 0) {
761!
1773
    if (pauseReq.igNotExists) {
422✔
1774
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
169!
1775
      return 0;
169✔
1776
    } else {
1777
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
253!
1778
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
253✔
1779
    }
1780
  }
1781

1782
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
339!
1783

1784
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
339!
1785
    sdbRelease(pMnode->pSdb, pStream);
×
1786
    return code;
×
1787
  }
1788

1789
  // check if it is conflict with other trans in both sourceDb and targetDb.
1790
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
339✔
1791
  if (code) {
339!
1792
    sdbRelease(pMnode->pSdb, pStream);
×
1793
    TAOS_RETURN(code);
×
1794
  }
1795

1796
  bool updated = mndStreamNodeIsUpdated(pMnode);
339✔
1797
  if (updated) {
339!
1798
    mError("tasks are not ready for pause, node update detected");
×
1799
    sdbRelease(pMnode->pSdb, pStream);
×
1800
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1801
  }
1802

1803
  {  // check for tasks, if tasks are not ready, not allowed to pause
1804
    bool found = false;
339✔
1805
    bool readyToPause = true;
339✔
1806
    streamMutexLock(&execInfo.lock);
339✔
1807

1808
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
4,971✔
1809
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
4,632✔
1810
      if (p == NULL) {
4,632!
1811
        continue;
×
1812
      }
1813

1814
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
4,632✔
1815
      if (pEntry == NULL) {
4,632!
1816
        continue;
×
1817
      }
1818

1819
      if (pEntry->id.streamId != pStream->uid) {
4,632✔
1820
        continue;
3,019✔
1821
      }
1822

1823
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
1,613!
1824
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
178!
1825
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1826
        readyToPause = false;
178✔
1827
      }
1828

1829
      found = true;
1,613✔
1830
    }
1831

1832
    streamMutexUnlock(&execInfo.lock);
339✔
1833
    if (!found) {
339!
1834
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1835
      sdbRelease(pMnode->pSdb, pStream);
×
1836
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1837
    }
1838

1839
    if (!readyToPause) {
339✔
1840
      mError("stream:%s task not ready for pause yet", pauseReq.name);
51!
1841
      sdbRelease(pMnode->pSdb, pStream);
51✔
1842
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
51✔
1843
    }
1844
  }
1845

1846
  STrans *pTrans = NULL;
288✔
1847
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
288✔
1848
  if (pTrans == NULL || code) {
288!
1849
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1850
    sdbRelease(pMnode->pSdb, pStream);
×
1851
    return code;
×
1852
  }
1853

1854
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
288✔
1855
  if (code) {
288!
1856
    sdbRelease(pMnode->pSdb, pStream);
×
1857
    mndTransDrop(pTrans);
×
1858
    return code;
×
1859
  }
1860

1861
  // if nodeUpdate happened, not send pause trans
1862
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
288✔
1863
  if (code) {
288!
1864
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1865
    sdbRelease(pMnode->pSdb, pStream);
×
1866
    mndTransDrop(pTrans);
×
1867
    return code;
×
1868
  }
1869

1870
  // pause stream
1871
  taosWLockLatch(&pStream->lock);
288✔
1872
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
288✔
1873
  if (code) {
288!
1874
    taosWUnLockLatch(&pStream->lock);
×
1875
    sdbRelease(pMnode->pSdb, pStream);
×
1876
    mndTransDrop(pTrans);
×
1877
    return code;
×
1878
  }
1879

1880
  taosWUnLockLatch(&pStream->lock);
288✔
1881

1882
  code = mndTransPrepare(pMnode, pTrans);
288✔
1883
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
288!
1884
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1885
    sdbRelease(pMnode->pSdb, pStream);
×
1886
    mndTransDrop(pTrans);
×
1887
    return code;
×
1888
  }
1889

1890
  sdbRelease(pMnode->pSdb, pStream);
288✔
1891
  mndTransDrop(pTrans);
288✔
1892

1893
  return TSDB_CODE_ACTION_IN_PROGRESS;
288✔
1894
}
1895

1896
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
877✔
1897
  SMnode     *pMnode = pReq->info.node;
877✔
1898
  SStreamObj *pStream = NULL;
877✔
1899
  int32_t     code = 0;
877✔
1900

1901
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
877!
1902
    return code;
×
1903
  }
1904

1905
  SMResumeStreamReq resumeReq = {0};
877✔
1906
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
877!
1907
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1908
  }
1909

1910
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
877✔
1911
  if (pStream == NULL || code != 0) {
877!
1912
    if (resumeReq.igNotExists) {
338✔
1913
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
337!
1914
      sdbRelease(pMnode->pSdb, pStream);
337✔
1915
      return 0;
337✔
1916
    } else {
1917
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1918
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1919
    }
1920
  }
1921

1922
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
539!
1923
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
539!
1924
    sdbRelease(pMnode->pSdb, pStream);
×
1925
    return -1;
×
1926
  }
1927

1928
  // check if it is conflict with other trans in both sourceDb and targetDb.
1929
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
539✔
1930
  if (code) {
539!
1931
    sdbRelease(pMnode->pSdb, pStream);
×
1932
    return code;
×
1933
  }
1934

1935
  STrans *pTrans = NULL;
539✔
1936
  code =
1937
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
539✔
1938
  if (pTrans == NULL || code) {
539!
1939
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
1940
    sdbRelease(pMnode->pSdb, pStream);
×
1941
    return code;
×
1942
  }
1943

1944
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
539✔
1945
  if (code) {
539!
1946
    sdbRelease(pMnode->pSdb, pStream);
×
1947
    mndTransDrop(pTrans);
×
1948
    return code;
×
1949
  }
1950

1951
  // set the resume action
1952
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
539✔
1953
  if (code) {
539!
1954
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
1955
    sdbRelease(pMnode->pSdb, pStream);
×
1956
    mndTransDrop(pTrans);
×
1957
    return code;
×
1958
  }
1959

1960
  // resume stream
1961
  taosWLockLatch(&pStream->lock);
539✔
1962
  pStream->status = STREAM_STATUS__NORMAL;
539✔
1963
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
539!
1964
    taosWUnLockLatch(&pStream->lock);
×
1965

1966
    sdbRelease(pMnode->pSdb, pStream);
×
1967
    mndTransDrop(pTrans);
×
1968
    return code;
×
1969
  }
1970

1971
  taosWUnLockLatch(&pStream->lock);
539✔
1972
  code = mndTransPrepare(pMnode, pTrans);
539✔
1973
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
539!
1974
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1975
    sdbRelease(pMnode->pSdb, pStream);
×
1976
    mndTransDrop(pTrans);
×
1977
    return code;
×
1978
  }
1979

1980
  sdbRelease(pMnode->pSdb, pStream);
539✔
1981
  mndTransDrop(pTrans);
539✔
1982

1983
  return TSDB_CODE_ACTION_IN_PROGRESS;
539✔
1984
}
1985

1986
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
1987
  SMnode     *pMnode = pReq->info.node;
×
1988
  SStreamObj *pStream = NULL;
×
1989
  int32_t     code = 0;
×
1990

1991
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
1992
    return code;
×
1993
  }
1994

1995
  SMResetStreamReq resetReq = {0};
×
1996
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
1997
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1998
  }
1999

2000
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
2001

2002
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
2003
  if (pStream == NULL || code != 0) {
×
2004
    if (resetReq.igNotExists) {
×
2005
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
2006
      return 0;
×
2007
    } else {
2008
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
2009
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2010
    }
2011
  }
2012

2013
  //todo(liao hao jun)
2014
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
2015
}
2016

2017
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes, STrans** pUpdateTrans) {
4✔
2018
  SSdb       *pSdb = pMnode->pSdb;
4✔
2019
  void       *pIter = NULL;
4✔
2020
  STrans     *pTrans = NULL;
4✔
2021
  int32_t     code = 0;
4✔
2022
  *pUpdateTrans = NULL;
4✔
2023

2024
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
2025
  while (1) {
4✔
2026
    SStreamObj *pStream = NULL;
8✔
2027
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
8✔
2028
    if (pIter == NULL) {
8✔
2029
      break;
4✔
2030
    }
2031

2032
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
4✔
2033
    sdbRelease(pSdb, pStream);
4✔
2034

2035
    if (code) {
4!
2036
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
2037
      sdbCancelFetch(pSdb, pIter);
×
2038
      return code;
×
2039
    }
2040
  }
2041

2042
  while (1) {
4✔
2043
    SStreamObj *pStream = NULL;
8✔
2044
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
8✔
2045
    if (pIter == NULL) {
8✔
2046
      break;
4✔
2047
    }
2048

2049
    // here create only one trans
2050
    if (pTrans == NULL) {
4!
2051
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
4✔
2052
                           "update task epsets", &pTrans);
2053
      if (pTrans == NULL || code) {
4!
2054
        sdbRelease(pSdb, pStream);
×
2055
        sdbCancelFetch(pSdb, pIter);
×
2056
        return terrno = code;
×
2057
      }
2058
    }
2059

2060
    if (!includeAllNodes) {
4!
2061
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
4✔
2062
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
4✔
2063
      if (p1 == NULL && p2 == NULL) {
4!
2064
        mDebug("stream:0x%" PRIx64 " %s not involved in nodeUpdate, ignore", pStream->uid, pStream->name);
×
2065
        sdbRelease(pSdb, pStream);
×
2066
        continue;
×
2067
      }
2068
    }
2069

2070
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
4✔
2071
           pStream->name, pTrans->id);
2072

2073
    // NOTE: for each stream, we register one trans entry for task update
2074
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
4✔
2075
    if (code) {
4!
2076
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
2077
    }
2078

2079
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
4✔
2080

2081
    // todo: not continue, drop all and retry again
2082
    if (code != TSDB_CODE_SUCCESS) {
4!
2083
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
2084
             tstrerror(code));
2085
      sdbRelease(pSdb, pStream);
×
2086
      continue;
×
2087
    }
2088

2089
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
4✔
2090
    sdbRelease(pSdb, pStream);
4✔
2091

2092
    if (code != TSDB_CODE_SUCCESS) {
4!
2093
      sdbCancelFetch(pSdb, pIter);
×
2094
      return code;
×
2095
    }
2096
  }
2097

2098
  // no need to build the trans to handle the vgroup update
2099
  *pUpdateTrans = pTrans;
4✔
2100
  return code;
4✔
2101
}
2102

2103
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
731✔
2104
  SSdb       *pSdb = pMnode->pSdb;
731✔
2105
  SStreamObj *pStream = NULL;
731✔
2106
  void       *pIter = NULL;
731✔
2107
  int32_t     code = 0;
731✔
2108

2109
  mDebug("start to refresh node list by existed streams");
731✔
2110

2111
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
731✔
2112
  if (pHash == NULL) {
731!
2113
    return terrno;
×
2114
  }
2115

2116
  while (1) {
4✔
2117
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
735✔
2118
    if (pIter == NULL) {
735✔
2119
      break;
731✔
2120
    }
2121

2122
    taosWLockLatch(&pStream->lock);
4✔
2123

2124
    SStreamTaskIter *pTaskIter = NULL;
4✔
2125
    code = createStreamTaskIter(pStream, &pTaskIter);
4✔
2126
    if (code) {
4!
2127
      taosWUnLockLatch(&pStream->lock);
×
2128
      sdbRelease(pSdb, pStream);
×
2129
      mError("failed to create task iter for stream:%s", pStream->name);
×
2130
      continue;
×
2131
    }
2132

2133
    while (streamTaskIterNextTask(pTaskIter)) {
28✔
2134
      SStreamTask *pTask = NULL;
24✔
2135
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
24✔
2136
      if (code) {
24!
2137
        break;
×
2138
      }
2139

2140
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
24✔
2141
      epsetAssign(&entry.epset, &pTask->info.epSet);
24✔
2142
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
24✔
2143
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
24!
2144
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2145
      }
2146
    }
2147

2148
    destroyStreamTaskIter(pTaskIter);
4✔
2149
    taosWUnLockLatch(&pStream->lock);
4✔
2150

2151
    sdbRelease(pSdb, pStream);
4✔
2152
  }
2153

2154
  taosArrayClear(pNodeList);
731✔
2155

2156
  // convert to list
2157
  pIter = NULL;
731✔
2158
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
744✔
2159
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
13✔
2160

2161
    void *p = taosArrayPush(pNodeList, pEntry);
13✔
2162
    if (p == NULL) {
13!
2163
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2164
      if (code == 0) {
×
2165
        code = terrno;
×
2166
      }
2167
      continue;
×
2168
    }
2169

2170
    char    buf[256] = {0};
13✔
2171
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
13✔
2172
    if (ret != 0) {                                                // print error and continue
13!
2173
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2174
    }
2175

2176
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
13✔
2177
  }
2178

2179
  taosHashCleanup(pHash);
731✔
2180

2181
  mDebug("numOfvNodes:%d get after extracting nodeInfo from all streams", (int32_t)taosArrayGetSize(pNodeList));
731✔
2182
  return code;
731✔
2183
}
2184

2185
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2186
  void   *pIter = NULL;
×
2187
  int32_t code = 0;
×
2188
  while (1) {
×
2189
    SVgObj *pVgroup = NULL;
×
2190
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2191
    if (pIter == NULL) {
×
2192
      break;
×
2193
    }
2194

2195
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2196
    sdbRelease(pSdb, pVgroup);
×
2197

2198
    if (code == 0) {
×
2199
      int32_t size = taosHashGetSize(pDBMap);
×
2200
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2201
    }
2202
  }
2203
}
×
2204

2205
static int32_t doProcessNodeCheckHelp(SArray *pNodeSnapshot, SMnode *pMnode, SVgroupChangeInfo *pChangeInfo,
1,562✔
2206
                                      bool *pUpdateAllVgroups) {
2207
  int32_t code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
1,562✔
2208
  if (code) {
1,562!
2209
    mDebug("failed to remove expired node entry in buf, code:%s", tstrerror(code));
×
2210
    return code;
×
2211
  }
2212

2213
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, pChangeInfo);
1,562✔
2214
  if (code) {
1,562!
2215
    mDebug("failed to find changed vnode(s) during vnode(s) check, code:%s", tstrerror(code));
×
2216
    return code;
×
2217
  }
2218

2219
  {
2220
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
1,562!
2221
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2222
      *pUpdateAllVgroups = true;
×
2223
      execInfo.switchFromFollower = false;  // reset the flag
×
2224
      addAllDbsIntoHashmap(pChangeInfo->pDBMap, pMnode->pSdb);
×
2225
    }
2226
  }
2227

2228
  if (taosArrayGetSize(pChangeInfo->pUpdateNodeList) > 0 || (*pUpdateAllVgroups)) {
1,562!
2229
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2230
    killAllCheckpointTrans(pMnode, pChangeInfo);
4✔
2231
  } else {
2232
    mDebug("no update found in vnode(s) list");
1,558✔
2233
  }
2234

2235
  return code;
1,562✔
2236
}
2237

2238
// this function runs by only one thread, so it is not multi-thread safe
2239
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
1,595✔
2240
  int32_t           code = 0;
1,595✔
2241
  bool              allReady = true;
1,595✔
2242
  SArray           *pNodeSnapshot = NULL;
1,595✔
2243
  SMnode           *pMnode = pMsg->info.node;
1,595✔
2244
  int64_t           tsms = taosGetTimestampMs();
1,595✔
2245
  int64_t           ts = tsms / 1000;
1,595✔
2246
  bool              updateAllVgroups = false;
1,595✔
2247
  SVgroupChangeInfo changeInfo = {0};
1,595✔
2248

2249
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
1,595✔
2250
  if (old != 0) {
1,595!
2251
    mDebug("still in checking node change");
×
2252
    return 0;
×
2253
  }
2254

2255
  mDebug("start to do node changing check, ts:%" PRId64, tsms);
1,595✔
2256

2257
  streamMutexLock(&execInfo.lock);
1,595✔
2258
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,595✔
2259
  streamMutexUnlock(&execInfo.lock);
1,595✔
2260

2261
  if (numOfNodes == 0) {
1,595!
2262
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2263
    execInfo.ts = ts;
×
2264
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2265
    return 0;
×
2266
  }
2267

2268
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
1,595✔
2269
  if (code) {
1,595!
2270
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2271
  }
2272

2273
  if (!allReady) {
1,595✔
2274
    taosArrayDestroy(pNodeSnapshot);
33✔
2275
    atomic_store_32(&mndNodeCheckSentinel, 0);
33✔
2276
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
33!
2277
    return 0;
33✔
2278
  }
2279

2280
  streamMutexLock(&execInfo.lock);
1,562✔
2281
  code = doProcessNodeCheckHelp(pNodeSnapshot, pMnode, &changeInfo, &updateAllVgroups);
1,562✔
2282
  streamMutexUnlock(&execInfo.lock);
1,562✔
2283

2284
  if (code) {
1,562!
2285
    goto _end;
×
2286
  }
2287

2288
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
1,562!
2289
    mDebug("vnode(s) change detected, build trans to update stream task epsets");
4✔
2290

2291
    STrans *pTrans = NULL;
4✔
2292

2293
    streamMutexLock(&execInfo.lock);
4✔
2294
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans);
4✔
2295
    streamMutexUnlock(&execInfo.lock);
4✔
2296

2297
    // NOTE: sync trans out of lock
2298
    if (code == 0 && pTrans != NULL) {
4!
2299
      code = mndTransPrepare(pMnode, pTrans);
4✔
2300
      if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4!
2301
        mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2302
      }
2303

2304
      mndTransDrop(pTrans);
4✔
2305
    }
2306

2307
    // keep the new vnode snapshot if success
2308
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
4!
2309
      streamMutexLock(&execInfo.lock);
4✔
2310

2311
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
4✔
2312
      int32_t num = (int)taosArrayGetSize(execInfo.pNodeList);
4✔
2313
      if (code == 0) {
4!
2314
        execInfo.ts = ts;
4✔
2315
        mDebug("create trans successfully, update cached node list, numOfNodes:%d", num);
4✔
2316
      }
2317

2318
      streamMutexUnlock(&execInfo.lock);
4✔
2319

2320
      if (code) {
4!
2321
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
2322
        goto _end;
×
2323
      }
2324
    }
2325
  }
2326

2327
  mndDestroyVgroupChangeInfo(&changeInfo);
1,562✔
2328

2329
_end:
1,562✔
2330
  taosArrayDestroy(pNodeSnapshot);
1,562✔
2331

2332
  mDebug("end to do stream task node change checking, elapsed time:%" PRId64 "ms", taosGetTimestampMs() - tsms);
1,940✔
2333
  atomic_store_32(&mndNodeCheckSentinel, 0);
1,562✔
2334

2335
  return 0;
1,562✔
2336
}
2337

2338
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
2,869✔
2339
  SMnode *pMnode = pReq->info.node;
2,869✔
2340
  SSdb   *pSdb = pMnode->pSdb;
2,869✔
2341
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
2,869✔
2342
    return 0;
1,274✔
2343
  }
2344

2345
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
1,595✔
2346
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
1,595✔
2347
  if (pMsg == NULL) {
1,595!
2348
    return terrno;
×
2349
  }
2350

2351
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
1,595✔
2352
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1,595✔
2353
}
2354

2355
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
2,014✔
2356
  SStreamTaskIter *pIter = NULL;
2,014✔
2357
  int32_t          code = createStreamTaskIter(pStream, &pIter);
2,014✔
2358
  if (code) {
2,014!
2359
    mError("failed to create task iter for stream:%s", pStream->name);
×
2360
    return;
×
2361
  }
2362

2363
  while (streamTaskIterNextTask(pIter)) {
12,002✔
2364
    SStreamTask *pTask = NULL;
9,988✔
2365
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,988✔
2366
    if (code) {
9,988!
2367
      break;
×
2368
    }
2369

2370
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
9,988✔
2371
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
9,988✔
2372
    if (p == NULL) {
9,988✔
2373
      STaskStatusEntry entry = {0};
9,139✔
2374
      streamTaskStatusInit(&entry, pTask);
9,139✔
2375

2376
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
9,139✔
2377
      if (code == 0) {
9,139!
2378
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
9,139✔
2379
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
9,139✔
2380
        if (px) {
9,139!
2381
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
9,139!
2382
        } else {
2383
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2384
        }
2385
      } else {
2386
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2387
      }
2388

2389
      // add the new vgroups if not added yet
2390
      bool exist = false;
9,139✔
2391
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
49,080✔
2392
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
47,252✔
2393
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
47,252!
2394
          exist = true;
7,311✔
2395
          break;
7,311✔
2396
        }
2397
      }
2398

2399
      if (!exist) {
9,139✔
2400
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
1,828✔
2401
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
1,828✔
2402

2403
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
1,828✔
2404
        if (px) {
1,828!
2405
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
1,828!
2406
        } else {
2407
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2408
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2409
        }
2410
      }
2411
    }
2412
  }
2413

2414
  destroyStreamTaskIter(pIter);
2,014✔
2415
}
2416

2417
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
4,398✔
2418
  int32_t num = taosArrayGetSize(pList);
4,398✔
2419
  for (int32_t i = 0; i < num; ++i) {
16,342✔
2420
    int32_t *pId = taosArrayGet(pList, i);
11,945✔
2421
    if (pId == NULL) {
11,945!
2422
      continue;
×
2423
    }
2424

2425
    if (taskId == *pId) {
11,945✔
2426
      return;
1✔
2427
    }
2428
  }
2429

2430
  int32_t numOfTasks = taosArrayGetSize(pList);
4,397✔
2431
  void   *p = taosArrayPush(pList, &taskId);
4,397✔
2432
  if (p) {
4,397!
2433
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
4,397✔
2434
  } else {
2435
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2436
           uid, numOfTasks);
2437
  }
2438
}
2439

2440
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
4,398✔
2441
  SMnode                  *pMnode = pReq->info.node;
4,398✔
2442
  SStreamTaskCheckpointReq req = {0};
4,398✔
2443

2444
  SDecoder decoder = {0};
4,398✔
2445
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,398✔
2446

2447
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
4,398!
2448
    tDecoderClear(&decoder);
×
2449
    mError("invalid task checkpoint req msg received");
×
2450
    return TSDB_CODE_INVALID_MSG;
×
2451
  }
2452
  tDecoderClear(&decoder);
4,398✔
2453

2454
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
4,398✔
2455

2456
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2457
  streamMutexLock(&execInfo.lock);
4,398✔
2458

2459
  SStreamObj *pStream = NULL;
4,398✔
2460
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,398✔
2461
  if (pStream == NULL || code != 0) {
4,398!
2462
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2463
          req.streamId);
2464

2465
    // not in meta-store yet, try to acquire the task in exec buffer
2466
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2467
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2468
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2469
    if (p == NULL) {
×
2470
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2471
      streamMutexUnlock(&execInfo.lock);
×
2472
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2473
    } else {
2474
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2475
             req.streamId, req.taskId);
2476
    }
2477
  }
2478

2479
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,398!
2480

2481
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
4,398✔
2482
  if (pReqTaskList == NULL) {
4,398✔
2483
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
766✔
2484
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
766✔
2485
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
766✔
2486
    if (code) {
766!
2487
      mError("failed to put into transfer state stream map, code: out of memory");
×
2488
    }
2489
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
766✔
2490
  } else {
2491
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
3,632✔
2492
  }
2493

2494
  int32_t total = taosArrayGetSize(*pReqTaskList);
4,398✔
2495
  if (total == numOfTasks) {  // all tasks have sent the reqs
4,398✔
2496
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
738✔
2497
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
738!
2498

2499
    if (pStream != NULL) {  // TODO:handle error
738!
2500
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
738✔
2501
      if (code) {
738!
2502
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
738!
2503
      }
2504
    } else {
2505
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2506
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2507
      // sleep(500ms)
2508
    }
2509

2510
    // remove this entry
2511
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
738✔
2512

2513
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
738✔
2514
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
738✔
2515
  }
2516

2517
  if (pStream != NULL) {
4,398!
2518
    mndReleaseStream(pMnode, pStream);
4,398✔
2519
  }
2520

2521
  streamMutexUnlock(&execInfo.lock);
4,398✔
2522

2523
  {
2524
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
4,398✔
2525
    rsp.pCont = rpcMallocCont(rsp.contLen);
4,398✔
2526
    if (rsp.pCont == NULL) {
4,398!
2527
      return terrno;
×
2528
    }
2529

2530
    SMsgHead *pHead = rsp.pCont;
4,398✔
2531
    pHead->vgId = htonl(req.nodeId);
4,398✔
2532

2533
    tmsgSendRsp(&rsp);
4,398✔
2534
    pReq->info.handle = NULL;  // disable auto rsp
4,398✔
2535
  }
2536

2537
  return 0;
4,398✔
2538
}
2539

2540
// valid the info according to the HbMsg
2541
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
6,469✔
2542
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
6,469✔
2543
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
6,469✔
2544
  if (pTaskEntry == NULL) {
6,469✔
2545
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
16!
2546
    return false;
16✔
2547
  }
2548

2549
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
6,453!
2550
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2551
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2552
    return false;
×
2553
  }
2554

2555
  // now the task in checkpoint procedure
2556
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
6,453!
2557
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2558
           " discard",
2559
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2560
    return false;
×
2561
  }
2562

2563
  if (reportChkptId >= pReport->checkpointId) {
6,453!
2564
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2565
           " discard",
2566
           pReport->taskId, pReport->checkpointId, reportChkptId);
2567
    return false;
×
2568
  }
2569

2570
  return true;
6,453✔
2571
}
2572

2573
static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) {
6,469✔
2574
  bool valid = validateChkptReport(pReport, reportedChkptId);
6,469✔
2575
  if (!valid) {
6,469✔
2576
    return;
16✔
2577
  }
2578

2579
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
21,831✔
2580
    STaskChkptInfo *p = taosArrayGet(pList, i);
15,378✔
2581
    if (p == NULL) {
15,378!
2582
      continue;
×
2583
    }
2584

2585
    if (p->taskId == pReport->taskId) {
15,378!
2586
      if (p->checkpointId > pReport->checkpointId) {
×
2587
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2588
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2589
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2590
        mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2591
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2592

2593
        // update the checkpoint report info
2594
        p->checkpointId = pReport->checkpointId;
×
2595
        p->ts = pReport->checkpointTs;
×
2596
        p->version = pReport->checkpointVer;
×
2597
        p->transId = pReport->transId;
×
2598
        p->dropHTask = pReport->dropHTask;
×
2599
      } else {
2600
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2601
      }
2602
      return;
×
2603
    }
2604
  }
2605

2606
  STaskChkptInfo info = {
6,453✔
2607
      .streamId = pReport->streamId,
6,453✔
2608
      .taskId = pReport->taskId,
6,453✔
2609
      .transId = pReport->transId,
6,453✔
2610
      .dropHTask = pReport->dropHTask,
6,453✔
2611
      .version = pReport->checkpointVer,
6,453✔
2612
      .ts = pReport->checkpointTs,
6,453✔
2613
      .checkpointId = pReport->checkpointId,
6,453✔
2614
      .nodeId = pReport->nodeId,
6,453✔
2615
  };
2616

2617
  void *p = taosArrayPush(pList, &info);
6,453✔
2618
  if (p == NULL) {
6,453!
2619
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2620
  } else {
2621
    int32_t size = taosArrayGetSize(pList);
6,453✔
2622
    mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report",
6,453✔
2623
           pReport->streamId, pReport->taskId, size);
2624
  }
2625
}
2626

2627
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
6,469✔
2628
  SMnode           *pMnode = pReq->info.node;
6,469✔
2629
  SCheckpointReport req = {0};
6,469✔
2630

2631
  SDecoder decoder = {0};
6,469✔
2632
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
6,469✔
2633

2634
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
6,469!
2635
    tDecoderClear(&decoder);
×
2636
    mError("invalid task checkpoint-report msg received");
×
2637
    return TSDB_CODE_INVALID_MSG;
×
2638
  }
2639
  tDecoderClear(&decoder);
6,469✔
2640

2641
  streamMutexLock(&execInfo.lock);
6,469✔
2642
  mndInitStreamExecInfo(pMnode, &execInfo);
6,469✔
2643
  streamMutexUnlock(&execInfo.lock);
6,469✔
2644

2645
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
6,469✔
2646
         " checkpointVer:%" PRId64 " transId:%d",
2647
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2648

2649
  // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans.
2650
  streamMutexLock(&execInfo.lock);
6,469✔
2651

2652
  SStreamObj *pStream = NULL;
6,469✔
2653
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
6,469✔
2654
  if (pStream == NULL || code != 0) {
6,469!
2655
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2656

2657
    // not in meta-store yet, try to acquire the task in exec buffer
2658
    // the checkpoint req arrives too soon before the completion of the creation of stream trans.
2659
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2660
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2661
    if (p == NULL) {
×
2662
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2663
      streamMutexUnlock(&execInfo.lock);
×
2664
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2665
    } else {
2666
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2667
             req.streamId, req.taskId);
2668
    }
2669
  }
2670

2671
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
6,469!
2672

2673
  SChkptReportInfo *pInfo =
2674
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
6,469✔
2675
  if (pInfo == NULL) {
6,469✔
2676
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
740✔
2677
    if (info.pTaskList != NULL) {
740!
2678
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
740✔
2679
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
740✔
2680
      if (code) {
740!
2681
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2682
      }
2683

2684
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
740✔
2685
    }
2686
  } else {
2687
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
5,729✔
2688
  }
2689

2690
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
6,469✔
2691
  if (total == numOfTasks) {  // all tasks have sent the reqs
6,469✔
2692
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
1,365!
2693
          " will be issued soon",
2694
          req.streamId, pStream->name, total, req.checkpointId);
2695
  }
2696

2697
  if (pStream != NULL) {
6,469!
2698
    mndReleaseStream(pMnode, pStream);
6,469✔
2699
  }
2700

2701
  streamMutexUnlock(&execInfo.lock);
6,469✔
2702

2703
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
6,469✔
2704
  return code;
6,469✔
2705
}
2706

2707
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
127✔
2708
  int32_t num = 0;
127✔
2709
  int64_t chkId = INT64_MAX;
127✔
2710
  *pExistedTasks = 0;
127✔
2711
  *pAllSame = true;
127✔
2712

2713
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
1,720✔
2714
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
1,593✔
2715
    if (p == NULL) {
1,593!
2716
      continue;
×
2717
    }
2718

2719
    if (p->streamId != streamId) {
1,593✔
2720
      continue;
880✔
2721
    }
2722

2723
    num += 1;
713✔
2724
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
713✔
2725
    if (chkId > pe->checkpointInfo.latestId) {
713✔
2726
      if (chkId != INT64_MAX) {
127!
2727
        *pAllSame = false;
×
2728
      }
2729
      chkId = pe->checkpointInfo.latestId;
127✔
2730
    }
2731
  }
2732

2733
  *pExistedTasks = num;
127✔
2734
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
127!
2735
    return -1;
×
2736
  }
2737

2738
  return chkId;
127✔
2739
}
2740

2741
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
6,469✔
2742
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
6,469✔
2743
  rsp.pCont = rpcMallocCont(rsp.contLen);
6,469✔
2744
  if (rsp.pCont != NULL) {
6,469!
2745
    SMsgHead *pHead = rsp.pCont;
6,469✔
2746
    pHead->vgId = htonl(vgId);
6,469✔
2747

2748
    tmsgSendRsp(&rsp);
6,469✔
2749
    pInfo->handle = NULL;  // disable auto rsp
6,469✔
2750
  }
2751
}
6,469✔
2752

2753
static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) {
31✔
2754
  int32_t alreadySend = taosArrayGetSize(pList);
31✔
2755

2756
  for (int32_t i = 0; i < alreadySend; ++i) {
158✔
2757
    int32_t *taskId = taosArrayGet(pList, i);
127✔
2758
    if (taskId == NULL) {
127!
2759
      continue;
×
2760
    }
2761

2762
    for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
127!
2763
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
127✔
2764
      if ((pe != NULL) && (pe->req.taskId == *taskId)) {
127!
2765
        taosArrayRemove(pInfo->pTaskList, k);
127✔
2766
        break;
127✔
2767
      }
2768
    }
2769
  }
2770

2771
  return alreadySend;
31✔
2772
}
2773

2774
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
12,910✔
2775
  SMnode *pMnode = pMsg->info.node;
12,910✔
2776
  int64_t now = taosGetTimestampMs();
12,910✔
2777
  bool    allReady = true;
12,910✔
2778
  SArray *pNodeSnapshot = NULL;
12,910✔
2779
  int32_t maxAllowedTrans = 20;
12,910✔
2780
  int32_t numOfTrans = 0;
12,910✔
2781
  int32_t code = 0;
12,910✔
2782
  void   *pIter = NULL;
12,910✔
2783

2784
  SArray *pList = taosArrayInit(4, sizeof(int32_t));
12,910✔
2785
  if (pList == NULL) {
12,910!
2786
    return terrno;
×
2787
  }
2788

2789
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
12,910✔
2790
  if (pStreamList == NULL) {
12,910!
2791
    taosArrayDestroy(pList);
×
2792
    return terrno;
×
2793
  }
2794

2795
  mDebug("start to process consensus-checkpointId in tmr");
12,910✔
2796

2797
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
12,910✔
2798
  taosArrayDestroy(pNodeSnapshot);
12,910✔
2799
  if (code) {
12,910✔
2800
    mError("failed to get the vgroup snapshot, ignore it and continue");
123!
2801
  }
2802

2803
  if (!allReady) {
12,910✔
2804
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1,419!
2805
    taosArrayDestroy(pStreamList);
1,419✔
2806
    taosArrayDestroy(pList);
1,419✔
2807
    return 0;
1,419✔
2808
  }
2809

2810
  streamMutexLock(&execInfo.lock);
11,491✔
2811

2812
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
11,522✔
2813
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
31✔
2814

2815
    taosArrayClear(pList);
31✔
2816

2817
    int64_t     streamId = -1;
31✔
2818
    int32_t     num = taosArrayGetSize(pInfo->pTaskList);
31✔
2819
    SStreamObj *pStream = NULL;
31✔
2820

2821
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
31✔
2822
    if (pStream == NULL || code != 0) {  // stream has been dropped already
31!
2823
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2824
      void *p = taosArrayPush(pStreamList, &pInfo->streamId);
×
2825
      if (p == NULL) {
×
2826
        mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
×
2827
               " code:%s, continue",
2828
               pInfo->streamId, tstrerror(terrno));
2829
      }
2830
      continue;
×
2831
    }
2832

2833
    for (int32_t j = 0; j < num; ++j) {
158✔
2834
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
127✔
2835
      if (pe == NULL) {
127!
2836
        continue;
×
2837
      }
2838

2839
      if (streamId == -1) {
127✔
2840
        streamId = pe->req.streamId;
31✔
2841
      }
2842

2843
      int32_t existed = 0;
127✔
2844
      bool    allSame = true;
127✔
2845
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
127✔
2846
      if (chkId == -1) {
127!
2847
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2848
               pInfo->numOfTasks, pe->req.taskId);
2849
        break;
×
2850
      }
2851

2852
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
254!
2853
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
127✔
2854
               pe->req.startTs, (now - pe->ts) / 1000.0);
2855
        if (chkId > pe->req.checkpointId) {
127!
2856
          streamMutexUnlock(&execInfo.lock);
×
2857
          taosArrayDestroy(pStreamList);
×
2858
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2859
                 pe->req.checkpointId, chkId);
2860

2861
          mndReleaseStream(pMnode, pStream);
×
2862
          taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
2863
          return TSDB_CODE_FAILED;
×
2864
        }
2865

2866
        // todo: check for redundant consensus-checkpoint trans, if this kinds of trans repeatly failed.
2867
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
127✔
2868
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
127!
2869
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2870
        }
2871

2872
        void *p = taosArrayPush(pList, &pe->req.taskId);
127✔
2873
        if (p == NULL) {
127!
2874
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2875
        }
2876
      } else {
2877
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
×
2878
               pe->req.startTs, (now - pe->ts) / 1000.0);
2879
      }
2880
    }
2881

2882
    mndReleaseStream(pMnode, pStream);
31✔
2883

2884
    int32_t alreadySend = doCleanReqList(pList, pInfo);
31✔
2885

2886
    // clear request stream item with empty task list
2887
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
31!
2888
      mndClearConsensusRspEntry(pInfo);
31✔
2889
      if (streamId == -1) {
31!
2890
        mError("streamId is -1, streamId:%" PRIx64 " in consensus-checkpointId hashMap, cont", pInfo->streamId);
×
2891
      }
2892

2893
      void *p = taosArrayPush(pStreamList, &streamId);
31✔
2894
      if (p == NULL) {
31!
2895
        mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId);
×
2896
      }
2897
    }
2898

2899
    numOfTrans += alreadySend;
31✔
2900
    if (numOfTrans > maxAllowedTrans) {
31!
2901
      mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend);
×
2902
      taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
2903
      break;
×
2904
    }
2905
  }
2906

2907
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
11,522✔
2908
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
31✔
2909
    if (pStreamId == NULL) {
31!
2910
      continue;
×
2911
    }
2912

2913
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
31✔
2914
  }
2915

2916
  streamMutexUnlock(&execInfo.lock);
11,491✔
2917

2918
  taosArrayDestroy(pStreamList);
11,491✔
2919
  taosArrayDestroy(pList);
11,491✔
2920

2921
  mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
11,491✔
2922
  return code;
11,491✔
2923
}
2924

2925
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
259✔
2926
  int32_t code = mndProcessCreateStreamReq(pReq);
259✔
2927
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
259!
2928
    pReq->info.rsp = rpcMallocCont(1);
×
2929
    if (pReq->info.rsp == NULL) {
×
2930
      return terrno;
×
2931
    }
2932

2933
    pReq->info.rspLen = 1;
×
2934
    pReq->info.noResp = false;
×
2935
    pReq->code = code;
×
2936
  }
2937
  return code;
259✔
2938
}
2939

2940
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
224✔
2941
  int32_t code = mndProcessDropStreamReq(pReq);
224✔
2942
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
224!
2943
    pReq->info.rsp = rpcMallocCont(1);
20✔
2944
    if (pReq->info.rsp == NULL) {
20!
2945
      return terrno;
×
2946
    }
2947

2948
    pReq->info.rspLen = 1;
20✔
2949
    pReq->info.noResp = false;
20✔
2950
    pReq->code = code;
20✔
2951
  }
2952
  return code;
224✔
2953
}
2954

2955
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
58,610✔
2956
  if (pExecInfo->initTaskList || pMnode == NULL) {
58,610✔
2957
    return;
58,451✔
2958
  }
2959

2960
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
159✔
2961
  pExecInfo->initTaskList = true;
159✔
2962
}
2963

2964
void mndStreamResetInitTaskListLoadFlag() {
1,627✔
2965
  mInfo("reset task list buffer init flag for leader");
1,627!
2966
  execInfo.initTaskList = false;
1,627✔
2967
}
1,627✔
2968

2969
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
1,945✔
2970
  execInfo.switchFromFollower = false;
1,945✔
2971

2972
  if (execInfo.role == NODE_ROLE_UNINIT) {
1,945✔
2973
    execInfo.role = role;
1,759✔
2974
    if (role == NODE_ROLE_LEADER) {
1,759✔
2975
      mInfo("init mnode is set to leader");
1,572!
2976
    } else {
2977
      mInfo("init mnode is set to follower");
187!
2978
    }
2979
  } else {
2980
    if (role == NODE_ROLE_LEADER) {
186✔
2981
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
55!
2982
        execInfo.role = role;
55✔
2983
        execInfo.switchFromFollower = true;
55✔
2984
        mInfo("mnode switch to be leader from follower");
55!
2985
      } else {
2986
        mInfo("mnode remain to be leader, do nothing");
×
2987
      }
2988
    } else {  // follower's
2989
      if (execInfo.role == NODE_ROLE_LEADER) {
131!
2990
        execInfo.role = role;
×
2991
        mInfo("mnode switch to be follower from leader");
×
2992
      } else {
2993
        mInfo("mnode remain to be follower, do nothing");
131!
2994
      }
2995
    }
2996
  }
2997
}
1,945✔
2998

2999
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
159✔
3000
  SSdb       *pSdb = pMnode->pSdb;
159✔
3001
  SStreamObj *pStream = NULL;
159✔
3002
  void       *pIter = NULL;
159✔
3003

3004
  while (1) {
3005
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
427✔
3006
    if (pIter == NULL) {
427✔
3007
      break;
159✔
3008
    }
3009

3010
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
268✔
3011
    sdbRelease(pSdb, pStream);
268✔
3012
  }
3013
}
159✔
3014

3015
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
1,205✔
3016
  STrans *pTrans = NULL;
1,205✔
3017
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
1,205✔
3018
                               "update checkpoint-info", &pTrans);
3019
  if (pTrans == NULL || code) {
1,205!
3020
    sdbRelease(pMnode->pSdb, pStream);
×
3021
    return code;
×
3022
  }
3023

3024
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
1,205✔
3025
  if (code) {
1,205!
3026
    sdbRelease(pMnode->pSdb, pStream);
×
3027
    mndTransDrop(pTrans);
×
3028
    return code;
×
3029
  }
3030

3031
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
1,205✔
3032
  if (code) {
1,205!
3033
    sdbRelease(pMnode->pSdb, pStream);
×
3034
    mndTransDrop(pTrans);
×
3035
    return code;
×
3036
  }
3037

3038
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,205✔
3039
  if (code) {
1,205!
3040
    sdbRelease(pMnode->pSdb, pStream);
×
3041
    mndTransDrop(pTrans);
×
3042
    return code;
×
3043
  }
3044

3045
  code = mndTransPrepare(pMnode, pTrans);
1,205✔
3046
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,205!
3047
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
3048
    sdbRelease(pMnode->pSdb, pStream);
×
3049
    mndTransDrop(pTrans);
×
3050
    return code;
×
3051
  }
3052

3053
  sdbRelease(pMnode->pSdb, pStream);
1,205✔
3054
  mndTransDrop(pTrans);
1,205✔
3055

3056
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,205✔
3057
}
3058

3059
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
2✔
3060
  SMnode      *pMnode = pReq->info.node;
2✔
3061
  int32_t      code = 0;
2✔
3062
  SOrphanTask *pTask = NULL;
2✔
3063
  int32_t      i = 0;
2✔
3064
  STrans      *pTrans = NULL;
2✔
3065
  int32_t      numOfTasks = 0;
2✔
3066

3067
  SMStreamDropOrphanMsg msg = {0};
2✔
3068
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
2✔
3069
  if (code) {
2!
3070
    return code;
×
3071
  }
3072

3073
  numOfTasks = taosArrayGetSize(msg.pList);
2✔
3074
  if (numOfTasks == 0) {
2!
3075
    mDebug("no orphan tasks to drop, no need to create trans");
×
3076
    goto _err;
×
3077
  }
3078

3079
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
2!
3080

3081
  i = 0;
2✔
3082
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
2!
3083
    i += 1;
×
3084
  }
3085

3086
  if (pTask == NULL) {
2!
3087
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
3088
    goto _err;
×
3089
  }
3090

3091
  // check if it is conflict with other trans in both sourceDb and targetDb.
3092
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
2✔
3093
  if (code) {
2!
3094
    goto _err;
×
3095
  }
3096

3097
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
2✔
3098

3099
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
2✔
3100
  if (pTrans == NULL || code != 0) {
2!
3101
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3102
    goto _err;
×
3103
  }
3104

3105
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
2✔
3106
  if (code) {
2!
3107
    goto _err;
×
3108
  }
3109

3110
  // drop all tasks
3111
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
2!
3112
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3113
    goto _err;
×
3114
  }
3115

3116
  // drop stream
3117
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
2!
3118
    goto _err;
×
3119
  }
3120

3121
  code = mndTransPrepare(pMnode, pTrans);
2✔
3122
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
3123
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
3124
    goto _err;
×
3125
  }
3126

3127
_err:
2✔
3128
  tDestroyDropOrphanTaskMsg(&msg);
2✔
3129
  mndTransDrop(pTrans);
2✔
3130

3131
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
3132
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
2!
3133
  }
3134
  return code;
2✔
3135
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc