• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3549

06 Dec 2024 09:44AM UTC coverage: 59.948% (+0.1%) from 59.846%
#3549

push

travis-ci

web-flow
Merge pull request #29057 from taosdata/docs/TD-33031-3.0

docs: description of user privileges

118833 of 254191 branches covered (46.75%)

Branch coverage included in aggregate %.

199893 of 277480 relevant lines covered (72.04%)

19006119.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.96
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndScheduler.h"
21
#include "mndShow.h"
22
#include "mndStb.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
44

45
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
46
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
47

48
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
49
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
51
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
53
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
54
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
55
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
56
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
57
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
58
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
59
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
60
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
61
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
62
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
63
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
64
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
65
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
66

67
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
68
static void     removeExpiredNodeInfo(const SArray *pNodeSnapshot);
69
static int32_t  doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
70
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
71

72
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
73
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
74
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
75
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
77

78
int32_t mndInitStream(SMnode *pMnode) {
1,814✔
79
  SSdbTable table = {
1,814✔
80
      .sdbType = SDB_STREAM,
81
      .keyType = SDB_KEY_BINARY,
82
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
83
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
84
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
85
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
86
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
87
  };
88
  SSdbTable tableSeq = {
1,814✔
89
      .sdbType = SDB_STREAM_SEQ,
90
      .keyType = SDB_KEY_BINARY,
91
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
92
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
93
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
94
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
95
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
96
  };
97

98
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,814✔
99
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,814✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
1,814✔
101

102
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
1,814✔
103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
1,814✔
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
1,814✔
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
1,814✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
1,814✔
107
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
1,814✔
108
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
1,814✔
109
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
1,814✔
110
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
1,814✔
111

112
  // for msgs inside mnode
113
  // TODO change the name
114
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
1,814✔
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
1,814✔
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
1,814✔
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
1,814✔
118

119
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
1,814✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
1,814✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
1,814✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
1,814✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
1,814✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
1,814✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
1,814✔
126
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
1,814✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
1,814✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
1,814✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
1,814✔
130

131
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
1,814✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
1,814✔
133

134
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,814✔
135
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,814✔
136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,814✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,814✔
138

139
  int32_t code = mndInitExecInfo();
1,814✔
140
  if (code) {
1,814!
141
    return code;
×
142
  }
143

144
  code = sdbSetTable(pMnode->pSdb, table);
1,814✔
145
  if (code) {
1,814!
146
    return code;
×
147
  }
148

149
  code = sdbSetTable(pMnode->pSdb, tableSeq);
1,814✔
150
  return code;
1,814✔
151
}
152

153
void mndCleanupStream(SMnode *pMnode) {
1,814✔
154
  taosArrayDestroy(execInfo.pTaskList);
1,814✔
155
  taosArrayDestroy(execInfo.pNodeList);
1,814✔
156
  taosArrayDestroy(execInfo.pKilledChkptTrans);
1,814✔
157
  taosHashCleanup(execInfo.pTaskMap);
1,814✔
158
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
1,814✔
159
  taosHashCleanup(execInfo.pTransferStateStreams);
1,814✔
160
  taosHashCleanup(execInfo.pChkptStreams);
1,814✔
161
  taosHashCleanup(execInfo.pStreamConsensus);
1,814✔
162
  (void) taosThreadMutexDestroy(&execInfo.lock);
1,814✔
163
  mDebug("mnd stream exec info cleanup");
1,814✔
164
}
1,814✔
165

166
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
6,136✔
167
  int32_t     code = 0;
6,136✔
168
  int32_t     lino = 0;
6,136✔
169
  SSdbRow *   pRow = NULL;
6,136✔
170
  SStreamObj *pStream = NULL;
6,136✔
171
  void *      buf = NULL;
6,136✔
172
  int8_t      sver = 0;
6,136✔
173
  int32_t     tlen;
174
  int32_t     dataPos = 0;
6,136✔
175

176
  code = sdbGetRawSoftVer(pRaw, &sver);
6,136✔
177
  TSDB_CHECK_CODE(code, lino, _over);
6,136!
178

179
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
6,136!
180
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
181
    goto _over;
×
182
  }
183

184
  pRow = sdbAllocRow(sizeof(SStreamObj));
6,136✔
185
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
6,136!
186

187
  pStream = sdbGetRowObj(pRow);
6,136✔
188
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
6,136!
189

190
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
6,136!
191

192
  buf = taosMemoryMalloc(tlen + 1);
6,136✔
193
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,136!
194

195
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,136!
196

197
  SDecoder decoder;
198
  tDecoderInit(&decoder, buf, tlen + 1);
6,136✔
199
  code = tDecodeSStreamObj(&decoder, pStream, sver);
6,136✔
200
  tDecoderClear(&decoder);
6,136✔
201

202
  if (code < 0) {
6,136!
203
    tFreeStreamObj(pStream);
×
204
  }
205

206
_over:
6,136✔
207
  taosMemoryFreeClear(buf);
6,136!
208

209
  if (code != TSDB_CODE_SUCCESS) {
6,136!
210
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
211
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
212
    taosMemoryFreeClear(pRow);
×
213

214
    terrno = code;
×
215
    return NULL;
×
216
  } else {
217
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
6,136✔
218
           pStream->checkpointId);
219

220
    terrno = 0;
6,136✔
221
    return pRow;
6,136✔
222
  }
223
}
224

225
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
1,688✔
226
  mTrace("stream:%s, perform insert action", pStream->name);
1,688✔
227
  return 0;
1,688✔
228
}
229

230
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
6,136✔
231
  mTrace("stream:%s, perform delete action", pStream->name);
6,136✔
232
  taosWLockLatch(&pStream->lock);
6,136✔
233
  tFreeStreamObj(pStream);
6,136✔
234
  taosWUnLockLatch(&pStream->lock);
6,136✔
235
  return 0;
6,136✔
236
}
237

238
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
3,093✔
239
  mTrace("stream:%s, perform update action", pOldStream->name);
3,093✔
240
  (void) atomic_exchange_32(&pOldStream->version, pNewStream->version);
3,093✔
241

242
  taosWLockLatch(&pOldStream->lock);
3,093✔
243

244
  pOldStream->status = pNewStream->status;
3,093✔
245
  pOldStream->updateTime = pNewStream->updateTime;
3,093✔
246
  pOldStream->checkpointId = pNewStream->checkpointId;
3,093✔
247
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
3,093✔
248

249
  taosWUnLockLatch(&pOldStream->lock);
3,093✔
250
  return 0;
3,093✔
251
}
252

253
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
6,815✔
254
  int32_t code = 0;
6,815✔
255
  SSdb *pSdb = pMnode->pSdb;
6,815✔
256
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
6,815✔
257
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
6,815!
258
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
2,794✔
259
  }
260
  return code;
6,815✔
261
}
262

263
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
15,092✔
264
  SSdb *pSdb = pMnode->pSdb;
15,092✔
265
  sdbRelease(pSdb, pStream);
15,092✔
266
}
15,092✔
267

268
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
269
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
270
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
271
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
272
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
273

274
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
1,606✔
275
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
1,606!
276
      pCreate->targetStbFullName[0] == 0) {
1,606!
277
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
278
  }
279
  return TSDB_CODE_SUCCESS;
1,606✔
280
}
281

282
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
1,602✔
283
  pWrapper->nCols = taosArrayGetSize(pFields);
1,602✔
284
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
1,602✔
285
  if (NULL == pWrapper->pSchema) {
1,602!
286
    return terrno;
×
287
  }
288

289
  int32_t index = 0;
1,602✔
290
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
61,750✔
291
    SField *pField = (SField *)taosArrayGet(pFields, i);
60,148✔
292
    if (pField == NULL) {
60,148!
293
      return terrno;
×
294
    }
295

296
    if (TSDB_DATA_TYPE_NULL == pField->type) {
60,148!
297
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
298
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
299
    } else {
300
      pWrapper->pSchema[index].type = pField->type;
60,148✔
301
      pWrapper->pSchema[index].bytes = pField->bytes;
60,148✔
302
    }
303
    pWrapper->pSchema[index].colId = index + 1;
60,148✔
304
    strcpy(pWrapper->pSchema[index].name, pField->name);
60,148✔
305
    pWrapper->pSchema[index].flags = pField->flags;
60,148✔
306
    index += 1;
60,148✔
307
  }
308

309
  return TSDB_CODE_SUCCESS;
1,602✔
310
}
311

312
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
1,602✔
313
  if (pWrapper->nCols < 2) {
1,602!
314
    return false;
×
315
  }
316
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
60,361✔
317
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
58,766✔
318
      return true;
7✔
319
    }
320
  }
321
  return false;
1,595✔
322
}
323

324
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
1,602✔
325
  SNode      *pAst = NULL;
1,602✔
326
  SQueryPlan *pPlan = NULL;
1,602✔
327
  int32_t     code = 0;
1,602✔
328

329
  mInfo("stream:%s to create", pCreate->name);
1,602!
330
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
1,602✔
331
  pObj->createTime = taosGetTimestampMs();
1,602✔
332
  pObj->updateTime = pObj->createTime;
1,602✔
333
  pObj->version = 1;
1,602✔
334

335
  if (pCreate->smaId > 0) {
1,602✔
336
    pObj->subTableWithoutMd5 = 1;
259✔
337
  }
338

339
  pObj->smaId = pCreate->smaId;
1,602✔
340
  pObj->indexForMultiAggBalance = -1;
1,602✔
341

342
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,602✔
343

344
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
1,602✔
345
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
1,602✔
346

347
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,602✔
348
  pObj->status = 0;
1,602✔
349

350
  pObj->conf.igExpired = pCreate->igExpired;
1,602✔
351
  pObj->conf.trigger = pCreate->triggerType;
1,602✔
352
  pObj->conf.triggerParam = pCreate->maxDelay;
1,602✔
353
  pObj->conf.watermark = pCreate->watermark;
1,602✔
354
  pObj->conf.fillHistory = pCreate->fillHistory;
1,602✔
355
  pObj->deleteMark = pCreate->deleteMark;
1,602✔
356
  pObj->igCheckUpdate = pCreate->igUpdate;
1,602✔
357

358
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
1,602✔
359
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
1,602✔
360
  if (pSourceDb == NULL) {
1,602!
361
    code = terrno;
×
362
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, tstrerror(code));
×
363
    goto FAIL;
×
364
  }
365

366
  pObj->sourceDbUid = pSourceDb->uid;
1,602✔
367
  mndReleaseDb(pMnode, pSourceDb);
1,602✔
368

369
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
1,602✔
370

371
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
1,602✔
372
  if (pTargetDb == NULL) {
1,602!
373
    code = terrno;
×
374
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, tstrerror(code));
×
375
    goto FAIL;
×
376
  }
377

378
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
1,602✔
379

380
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
1,602✔
381
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,458✔
382
  } else {
383
    pObj->targetStbUid = pCreate->targetStbUid;
144✔
384
  }
385
  pObj->targetDbUid = pTargetDb->uid;
1,602✔
386
  mndReleaseDb(pMnode, pTargetDb);
1,602✔
387

388
  pObj->sql = pCreate->sql;
1,602✔
389
  pObj->ast = pCreate->ast;
1,602✔
390

391
  pCreate->sql = NULL;
1,602✔
392
  pCreate->ast = NULL;
1,602✔
393

394
  // deserialize ast
395
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
1,602!
396
    goto FAIL;
×
397
  }
398

399
  // create output schema
400
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
1,602!
401
    goto FAIL;
×
402
  }
403

404
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
1,602✔
405
  if (numOfNULL > 0) {
1,602✔
406
    pObj->outputSchema.nCols += numOfNULL;
26✔
407
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
26✔
408
    if (!pFullSchema) {
26!
409
      code = terrno;
×
410
      goto FAIL;
×
411
    }
412

413
    int32_t nullIndex = 0;
26✔
414
    int32_t dataIndex = 0;
26✔
415
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
332✔
416
      if (nullIndex >= numOfNULL) {
306!
417
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
418
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
419
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
420
        strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
×
421
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
422
        dataIndex++;
×
423
      } else {
424
        SColLocation *pos = NULL;
306✔
425
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
306!
426
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
306✔
427
        }
428

429
        if (pos == NULL) {
306!
430
          mError("invalid null column index, %d", nullIndex);
×
431
          continue;
×
432
        }
433

434
        if (i < pos->slotId) {
306✔
435
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
79✔
436
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
79✔
437
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
79✔
438
          strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
79✔
439
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
79✔
440
          dataIndex++;
79✔
441
        } else {
442
          pFullSchema[i].bytes = 0;
227✔
443
          pFullSchema[i].colId = pos->colId;
227✔
444
          pFullSchema[i].flags = COL_SET_NULL;
227✔
445
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
227✔
446
          pFullSchema[i].type = pos->type;
227✔
447
          nullIndex++;
227✔
448
        }
449
      }
450
    }
451

452
    taosMemoryFree(pObj->outputSchema.pSchema);
26✔
453
    pObj->outputSchema.pSchema = pFullSchema;
26✔
454
  }
455

456
  SPlanContext cxt = {
1,602✔
457
      .pAstRoot = pAst,
458
      .topicQuery = false,
459
      .streamQuery = true,
460
      .triggerType = (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY)? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
1,602✔
461
      .watermark = pObj->conf.watermark,
1,602✔
462
      .igExpired = pObj->conf.igExpired,
1,602✔
463
      .deleteMark = pObj->deleteMark,
1,602✔
464
      .igCheckUpdate = pObj->igCheckUpdate,
1,602✔
465
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
1,602✔
466
  };
467

468
  // using ast and param to build physical plan
469
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
1,602!
470
    goto FAIL;
×
471
  }
472

473
  // save physcial plan
474
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
1,602!
475
    goto FAIL;
×
476
  }
477

478
  pObj->tagSchema.nCols = pCreate->numOfTags;
1,602✔
479
  if (pCreate->numOfTags) {
1,602✔
480
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
279✔
481
    if (pObj->tagSchema.pSchema == NULL) {
279!
482
      code = terrno;
×
483
      goto FAIL;
×
484
    }
485
  }
486

487
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
488
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
3,187✔
489
    SField *pField = taosArrayGet(pCreate->pTags, i);
1,585✔
490
    if (pField == NULL) {
1,585!
491
      continue;
×
492
    }
493

494
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
1,585✔
495
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
1,585✔
496
    pObj->tagSchema.pSchema[i].flags = pField->flags;
1,585✔
497
    pObj->tagSchema.pSchema[i].type = pField->type;
1,585✔
498
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
1,585✔
499
  }
500

501
FAIL:
1,602✔
502
  if (pAst != NULL) nodesDestroyNode(pAst);
1,602!
503
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
1,602!
504
  return code;
1,602✔
505
}
506

507
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
13,496✔
508
  SEncoder encoder;
509
  tEncoderInit(&encoder, NULL, 0);
13,496✔
510

511
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
13,496!
512
    pTask->ver = SSTREAM_TASK_VER;
×
513
  }
514

515
  int32_t code = tEncodeStreamTask(&encoder, pTask);
13,496✔
516
  if (code == -1) {
13,496!
517
    tEncoderClear(&encoder);
×
518
    return TSDB_CODE_INVALID_MSG;
×
519
  }
520

521
  int32_t size = encoder.pos;
13,496✔
522
  int32_t tlen = sizeof(SMsgHead) + size;
13,496✔
523
  tEncoderClear(&encoder);
13,496✔
524

525
  void *buf = taosMemoryCalloc(1, tlen);
13,496✔
526
  if (buf == NULL) {
13,496!
527
    return terrno;
×
528
  }
529

530
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
13,496✔
531

532
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
13,496✔
533
  tEncoderInit(&encoder, abuf, size);
13,496✔
534
  code = tEncodeStreamTask(&encoder, pTask);
13,496✔
535
  tEncoderClear(&encoder);
13,496✔
536

537
  if (code != 0) {
13,496!
538
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
539
    taosMemoryFree(buf);
×
540
    return code;
×
541
  }
542

543
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
13,496✔
544
  if (code) {
13,496!
545
    taosMemoryFree(buf);
×
546
  }
547

548
  return code;
13,496✔
549
}
550

551
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
1,630✔
552
  SStreamTaskIter *pIter = NULL;
1,630✔
553
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,630✔
554
  if (code) {
1,630!
555
    mError("failed to create task iter for stream:%s", pStream->name);
×
556
    return code;
×
557
  }
558

559
  while (streamTaskIterNextTask(pIter)) {
10,303✔
560
    SStreamTask *pTask = NULL;
8,673✔
561
    code = streamTaskIterGetCurrent(pIter, &pTask);
8,673✔
562
    if (code) {
8,673!
563
      destroyStreamTaskIter(pIter);
×
564
      return code;
×
565
    }
566

567
    code = mndPersistTaskDeployReq(pTrans, pTask);
8,673✔
568
    if (code) {
8,673!
569
      destroyStreamTaskIter(pIter);
×
570
      return code;
×
571
    }
572
  }
573

574
  destroyStreamTaskIter(pIter);
1,630✔
575

576
  // persistent stream task for already stored ts data
577
  if (pStream->conf.fillHistory) {
1,630✔
578
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
825✔
579

580
    for (int32_t i = 0; i < level; i++) {
2,545✔
581
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
1,720✔
582

583
      int32_t numOfTasks = taosArrayGetSize(pLevel);
1,720✔
584
      for (int32_t j = 0; j < numOfTasks; j++) {
6,543✔
585
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
4,823✔
586
        code = mndPersistTaskDeployReq(pTrans, pTask);
4,823✔
587
        if (code) {
4,823!
588
          return code;
×
589
        }
590
      }
591
    }
592
  }
593

594
  return code;
1,630✔
595
}
596

597
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
1,630✔
598
  int32_t code = 0;
1,630✔
599
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
1,630!
600
    return code;
×
601
  }
602

603
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,630✔
604
}
605

606
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
1,458✔
607
  SStbObj *pStb = NULL;
1,458✔
608
  SDbObj  *pDb = NULL;
1,458✔
609
  int32_t  code = 0;
1,458✔
610
  int32_t  lino = 0;
1,458✔
611

612
  SMCreateStbReq createReq = {0};
1,458✔
613
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,458✔
614
  createReq.numOfColumns = pStream->outputSchema.nCols;
1,458✔
615
  createReq.numOfTags = 1;  // group id
1,458✔
616
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
1,458✔
617
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
1,458!
618

619
  // build fields
620
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
59,977✔
621
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
58,519✔
622
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
58,519!
623

624
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
58,519✔
625
    pField->flags = pStream->outputSchema.pSchema[i].flags;
58,519✔
626
    pField->type = pStream->outputSchema.pSchema[i].type;
58,519✔
627
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
58,519✔
628
    pField->compress = createDefaultColCmprByType(pField->type);
58,519✔
629
  }
630

631
  if (pStream->tagSchema.nCols == 0) {
1,458✔
632
    createReq.numOfTags = 1;
1,179✔
633
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
1,179✔
634
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
1,179!
635

636
    // build tags
637
    SField *pField = taosArrayGet(createReq.pTags, 0);
1,179✔
638
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
1,179!
639

640
    strcpy(pField->name, "group_id");
1,179✔
641
    pField->type = TSDB_DATA_TYPE_UBIGINT;
1,179✔
642
    pField->flags = 0;
1,179✔
643
    pField->bytes = 8;
1,179✔
644
  } else {
645
    createReq.numOfTags = pStream->tagSchema.nCols;
279✔
646
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
279✔
647
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
279!
648

649
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
1,864✔
650
      SField *pField = taosArrayGet(createReq.pTags, i);
1,585✔
651
      if (pField == NULL) {
1,585!
652
        continue;
×
653
      }
654

655
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
1,585✔
656
      pField->flags = pStream->tagSchema.pSchema[i].flags;
1,585✔
657
      pField->type = pStream->tagSchema.pSchema[i].type;
1,585✔
658
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
1,585✔
659
    }
660
  }
661

662
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
1,458!
663
    goto _OVER;
×
664
  }
665

666
  pStb = mndAcquireStb(pMnode, createReq.name);
1,458✔
667
  if (pStb != NULL) {
1,458!
668
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
669
    goto _OVER;
×
670
  }
671

672
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
1,458✔
673
  if (pDb == NULL) {
1,458!
674
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
675
    goto _OVER;
×
676
  }
677

678
  int32_t numOfStbs = -1;
1,458✔
679
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
1,458!
680
    goto _OVER;
×
681
  }
682

683
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
1,458!
684
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
685
    goto _OVER;
×
686
  }
687

688
  SStbObj stbObj = {0};
1,458✔
689

690
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
1,458!
691
    goto _OVER;
×
692
  }
693

694
  stbObj.uid = pStream->targetStbUid;
1,458✔
695

696
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
1,458!
697
    mndFreeStb(&stbObj);
×
698
    goto _OVER;
×
699
  }
700

701
  tFreeSMCreateStbReq(&createReq);
1,458✔
702
  mndFreeStb(&stbObj);
1,458✔
703
  mndReleaseStb(pMnode, pStb);
1,458✔
704
  mndReleaseDb(pMnode, pDb);
1,458✔
705
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
1,458✔
706
  return code;
1,458✔
707

708
_OVER:
×
709
  tFreeSMCreateStbReq(&createReq);
×
710
  mndReleaseStb(pMnode, pStb);
×
711
  mndReleaseDb(pMnode, pDb);
×
712

713
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
714
         tstrerror(code));
715
  return code;
×
716
}
717

718
// 1. stream number check
719
// 2. target stable can not be target table of other existed streams.
720
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
1,602✔
721
  int32_t     numOfStream = 0;
1,602✔
722
  SStreamObj *pStream = NULL;
1,602✔
723
  void       *pIter = NULL;
1,602✔
724

725
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,431✔
726
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
1,830✔
727
      ++numOfStream;
1,352✔
728
    }
729

730
    sdbRelease(pMnode->pSdb, pStream);
1,830✔
731

732
    if (numOfStream > MND_STREAM_MAX_NUM) {
1,830!
733
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
734
             pStreamObj->name);
735
      sdbCancelFetch(pMnode->pSdb, pIter);
×
736
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
737
    }
738

739
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
1,830✔
740
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
741
             pStreamObj->name);
742
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
743
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
744
    }
745
  }
746

747
  return TSDB_CODE_SUCCESS;
1,601✔
748
}
749

750
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
1,606✔
751
  SMnode     *pMnode = pReq->info.node;
1,606✔
752
  SStreamObj *pStream = NULL;
1,606✔
753
  SStreamObj  streamObj = {0};
1,606✔
754
  char       *sql = NULL;
1,606✔
755
  int32_t     sqlLen = 0;
1,606✔
756
  const char *pMsg = "create stream tasks on dnodes";
1,606✔
757
  int32_t     code = TSDB_CODE_SUCCESS;
1,606✔
758
  int32_t     lino = 0;
1,606✔
759
  STrans     *pTrans = NULL;
1,606✔
760

761
  SCMCreateStreamReq createReq = {0};
1,606✔
762
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
1,606✔
763
  TSDB_CHECK_CODE(code, lino, _OVER);
1,606!
764

765
#ifdef WINDOWS
766
  code = TSDB_CODE_MND_INVALID_PLATFORM;
767
  goto _OVER;
768
#endif
769

770
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
1,606!
771
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
1,606!
772
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
773
    goto _OVER;
×
774
  }
775

776
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
1,606✔
777
  if (pStream != NULL && code == 0) {
1,606!
778
    if (createReq.igExists) {
2✔
779
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
780
      mndReleaseStream(pMnode, pStream);
1✔
781
      tFreeSCMCreateStreamReq(&createReq);
1✔
782
      return code;
1✔
783
    } else {
784
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
785
      goto _OVER;
1✔
786
    }
787
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
1,604!
788
    goto _OVER;
×
789
  }
790

791
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
1,604!
792
    goto _OVER;
×
793
  }
794

795
  if (createReq.sql != NULL) {
1,604!
796
    sql = taosStrdup(createReq.sql);
1,604✔
797
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
1,604!
798
  }
799

800
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
1,604✔
801
  if (pSourceDb == NULL) {
1,604!
802
    code = terrno;
×
803
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
804
          tstrerror(code));
805
    goto _OVER;
×
806
  }
807

808
  code = mndCheckForSnode(pMnode, pSourceDb);
1,604✔
809
  mndReleaseDb(pMnode, pSourceDb);
1,604✔
810
  if (code != 0) {
1,604✔
811
    goto _OVER;
2✔
812
  }
813

814
  // build stream obj from request
815
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
1,602!
816
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
817
    goto _OVER;
×
818
  }
819

820
  code = doStreamCheck(pMnode, &streamObj);
1,602✔
821
  TSDB_CHECK_CODE(code, lino, _OVER);
1,602✔
822

823
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
1,601✔
824
  if (pTrans == NULL || code) {
1,601!
825
    goto _OVER;
×
826
  }
827

828
  // create stb for stream
829
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
1,601✔
830
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
1,458!
831
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
832
      mndTransDrop(pTrans);
×
833
      goto _OVER;
×
834
    }
835
  } else {
836
    mDebug("stream:%s no need create stable", createReq.name);
143✔
837
  }
838

839
  // schedule stream task for stream obj
840
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
1,601✔
841
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,601!
842
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
843
    mndTransDrop(pTrans);
×
844
    goto _OVER;
×
845
  }
846

847
  // add stream to trans
848
  code = mndPersistStream(pTrans, &streamObj);
1,601✔
849
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,601!
850
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
851
    mndTransDrop(pTrans);
×
852
    goto _OVER;
×
853
  }
854

855
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
1,601!
856
    mndTransDrop(pTrans);
×
857
    goto _OVER;
×
858
  }
859

860
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
1,601!
861
    mndTransDrop(pTrans);
×
862
    goto _OVER;
×
863
  }
864

865
  // add into buffer firstly
866
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
867
  streamMutexLock(&execInfo.lock);
1,601✔
868
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
1,601✔
869
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
1,601✔
870
  streamMutexUnlock(&execInfo.lock);
1,601✔
871

872
  // execute creation
873
  code = mndTransPrepare(pMnode, pTrans);
1,601✔
874
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,601!
875
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
876
    mndTransDrop(pTrans);
×
877
    goto _OVER;
×
878
  }
879

880
  mndTransDrop(pTrans);
1,601✔
881

882
  SName dbname = {0};
1,601✔
883
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,601✔
884
  if (code) {
1,601!
885
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
886
    goto _OVER;
×
887
  }
888

889
  SName name = {0};
1,601✔
890
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
1,601✔
891
  if (code) {
1,601!
892
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
893
    goto _OVER;
×
894
  }
895

896
  // reuse this function for stream
897
  if (sql != NULL && sqlLen > 0) {
1,601!
898
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
899
  } else {
900
    char detail[1000] = {0};
1,601✔
901
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
1,601✔
902
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
1,601✔
903
  }
904

905
_OVER:
1,605✔
906
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,605!
907
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
4!
908
  } else {
909
    mDebug("stream:%s create stream completed", createReq.name);
1,601✔
910
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,601✔
911
  }
912

913
  mndReleaseStream(pMnode, pStream);
1,605✔
914
  tFreeSCMCreateStreamReq(&createReq);
1,605✔
915
  tFreeStreamObj(&streamObj);
1,605✔
916

917
  if (sql != NULL) {
1,605✔
918
    taosMemoryFreeClear(sql);
1,604!
919
  }
920

921
  return code;
1,605✔
922
}
923

924
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
925
  SMnode          *pMnode = pReq->info.node;
×
926
  SStreamObj      *pStream = NULL;
×
927
  int32_t          code = 0;
×
928
  SMPauseStreamReq pauseReq = {0};
×
929

930
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
931
    return TSDB_CODE_INVALID_MSG;
×
932
  }
933

934
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
935
  if (pStream == NULL || code != 0) {
×
936
    if (pauseReq.igNotExists) {
×
937
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
938
      return 0;
×
939
    } else {
940
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
941
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
942
    }
943
  }
944

945
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
946
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
947
    sdbRelease(pMnode->pSdb, pStream);
×
948
    return code;
×
949
  }
950

951
  // check if it is conflict with other trans in both sourceDb and targetDb.
952
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
953
  if (code) {
×
954
    sdbRelease(pMnode->pSdb, pStream);
×
955
    return code;
×
956
  }
957

958
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
959
  if (updated) {
×
960
    mError("tasks are not ready for restart, node update detected");
×
961
    sdbRelease(pMnode->pSdb, pStream);
×
962
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
963
  }
964

965
  STrans *pTrans = NULL;
×
966
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream", &pTrans);
×
967
  if (pTrans == NULL || code) {
×
968
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
969
    sdbRelease(pMnode->pSdb, pStream);
×
970
    return code;
×
971
  }
972

973
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
974
  if (code) {
×
975
    sdbRelease(pMnode->pSdb, pStream);
×
976
    mndTransDrop(pTrans);
×
977
    return code;
×
978
  }
979

980
  // if nodeUpdate happened, not send pause trans
981
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
982
  if (code) {
×
983
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
984
    sdbRelease(pMnode->pSdb, pStream);
×
985
    mndTransDrop(pTrans);
×
986
    return code;
×
987
  }
988

989
  code = mndTransPrepare(pMnode, pTrans);
×
990
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
991
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
992
    sdbRelease(pMnode->pSdb, pStream);
×
993
    mndTransDrop(pTrans);
×
994
    return code;
×
995
  }
996

997
  sdbRelease(pMnode->pSdb, pStream);
×
998
  mndTransDrop(pTrans);
×
999

1000
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1001
}
1002

1003
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
1,339✔
1004
  SStreamObj *pStream = NULL;
1,339✔
1005
  void       *pIter = NULL;
1,339✔
1006
  SSdb       *pSdb = pMnode->pSdb;
1,339✔
1007
  int64_t     maxChkptId = 0;
1,339✔
1008

1009
  while (1) {
1010
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4,644✔
1011
    if (pIter == NULL) break;
4,644✔
1012

1013
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
3,305✔
1014
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
3,305✔
1015
           pStream->checkpointId);
1016
    sdbRelease(pSdb, pStream);
3,305✔
1017
  }
1018

1019
  {  // check the max checkpoint id from all vnodes.
1020
    int64_t maxCheckpointId = -1;
1,339✔
1021
    if (lock) {
1,339✔
1022
      streamMutexLock(&execInfo.lock);
543✔
1023
    }
1024

1025
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
16,334✔
1026
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
14,995✔
1027
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
14,995✔
1028
      if (p == NULL || pEntry == NULL) {
14,995!
1029
        continue;
×
1030
      }
1031

1032
      if (pEntry->checkpointInfo.failed) {
14,995!
1033
        continue;
×
1034
      }
1035

1036
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
14,995✔
1037
        maxCheckpointId = pEntry->checkpointInfo.latestId;
1,927✔
1038
      }
1039
    }
1040

1041
    if (lock) {
1,339✔
1042
      streamMutexUnlock(&execInfo.lock);
543✔
1043
    }
1044

1045
    if (maxCheckpointId > maxChkptId) {
1,339!
1046
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1047
             maxCheckpointId);
1048
      maxChkptId = maxCheckpointId;
×
1049
    }
1050
  }
1051

1052
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
1,339✔
1053
  return maxChkptId + 1;
1,339✔
1054
}
1055

1056
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
1,341✔
1057
                                               int8_t mndTrigger, bool lock) {
1058
  int32_t code = TSDB_CODE_SUCCESS;
1,341✔
1059
  bool    conflict = false;
1,341✔
1060
  int64_t ts = taosGetTimestampMs();
1,341✔
1061
  STrans *pTrans = NULL;
1,341✔
1062

1063
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
1,341!
1064
    return code;
×
1065
  }
1066

1067
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
1,341✔
1068
  if (code) {
1,341!
1069
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1070
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
1071
    goto _ERR;
×
1072
  }
1073

1074
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
1,341✔
1075
                       "gen checkpoint for stream", &pTrans);
1076
  if (code) {
1,341!
1077
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1078
           tstrerror(code));
1079
    goto _ERR;
×
1080
  }
1081

1082
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
1,341✔
1083
  if (code) {
1,341!
1084
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
1085
    goto _ERR;
×
1086
  }
1087

1088
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
1,341✔
1089

1090
  taosWLockLatch(&pStream->lock);
1,341✔
1091
  pStream->currentTick = 1;
1,341✔
1092

1093
  // 1. redo action: broadcast checkpoint source msg for all source vg
1094
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
1,341✔
1095
  for (int32_t i = 0; i < totalLevel; i++) {
4,070✔
1096
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
2,729✔
1097
    SStreamTask *p = taosArrayGetP(pLevel, 0);
2,729✔
1098

1099
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
2,729✔
1100
      int32_t sz = taosArrayGetSize(pLevel);
1,341✔
1101
      for (int32_t j = 0; j < sz; j++) {
4,570✔
1102
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
3,229✔
1103
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
3,229✔
1104

1105
        if (code != TSDB_CODE_SUCCESS) {
3,229!
1106
          taosWUnLockLatch(&pStream->lock);
×
1107
          goto _ERR;
×
1108
        }
1109
      }
1110
    }
1111
  }
1112

1113
  // 2. reset tick
1114
  pStream->checkpointId = checkpointId;
1,341✔
1115
  pStream->checkpointFreq = taosGetTimestampMs();
1,341✔
1116
  pStream->currentTick = 0;
1,341✔
1117

1118
  // 3. commit log: stream checkpoint info
1119
  pStream->version = pStream->version + 1;
1,341✔
1120
  taosWUnLockLatch(&pStream->lock);
1,341✔
1121

1122
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
1,341!
1123
    goto _ERR;
×
1124
  }
1125

1126
  code = mndTransPrepare(pMnode, pTrans);
1,341✔
1127
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,341!
1128
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1129
  } else {
1130
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,341✔
1131
  }
1132

1133
_ERR:
1,341✔
1134
  mndTransDrop(pTrans);
1,341✔
1135
  return code;
1,341✔
1136
}
1137

1138
int32_t extractStreamNodeList(SMnode *pMnode) {
3,157✔
1139
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
3,157✔
1140
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
699✔
1141
    if (code) {
699!
1142
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
1143
      return code;
×
1144
    }
1145
  }
1146

1147
  return taosArrayGetSize(execInfo.pNodeList);
3,157✔
1148
}
1149

1150
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,553✔
1151
  bool ready = true;
1,553✔
1152
  if (mndStreamNodeIsUpdated(pMnode)) {
1,553✔
1153
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
26✔
1154
  }
1155

1156
  streamMutexLock(&execInfo.lock);
1,527✔
1157
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,527✔
1158
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
699✔
1159
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
699!
1160
      streamMutexUnlock(&execInfo.lock);
×
1161
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
1162
      return TSDB_CODE_FAILED;
×
1163
    }
1164
  }
1165

1166
  SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
1,527✔
1167

1168
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
8,882✔
1169
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
7,492✔
1170
    if (p == NULL) {
7,492!
1171
      continue;
×
1172
    }
1173

1174
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
7,492✔
1175
    if (pEntry == NULL) {
7,492!
1176
      continue;
×
1177
    }
1178

1179
    if (pEntry->status == TASK_STATUS__STOP) {
7,492✔
1180
      for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
66!
1181
        STaskId *pId = taosArrayGet(pInvalidList, j);
×
1182
        if (pId == NULL) {
×
1183
          continue;
×
1184
        }
1185

1186
        if (pEntry->id.streamId == pId->streamId) {
×
1187
          void *px = taosArrayPush(pInvalidList, &pEntry->id);
×
1188
          if (px == NULL) {
×
1189
            mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
1190
          }
1191
          break;
×
1192
        }
1193
      }
1194
    }
1195

1196
    if (pEntry->status != TASK_STATUS__READY) {
7,492✔
1197
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
115✔
1198
             (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1199
      ready = false;
115✔
1200
      break;
115✔
1201
    }
1202

1203
    if (pEntry->hTaskId != 0) {
7,377✔
1204
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
22✔
1205
             " exists, checkpoint not issued",
1206
             pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1207
             pEntry->hTaskId);
1208
      ready = false;
22✔
1209
      break;
22✔
1210
    }
1211
  }
1212

1213
  removeTasksInBuf(pInvalidList, &execInfo);
1,527✔
1214
  taosArrayDestroy(pInvalidList);
1,527✔
1215

1216
  streamMutexUnlock(&execInfo.lock);
1,527✔
1217
  return ready ? 0 : -1;
1,527✔
1218
}
1219

1220
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
1,308✔
1221
  int64_t ts = -1;
1,308✔
1222
  int32_t taskId = -1;
1,308✔
1223

1224
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
19,952✔
1225
    STaskId          *p = taosArrayGet(pTaskList, i);
18,644✔
1226
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
18,644✔
1227
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
18,644!
1228
      continue;
13,899✔
1229
    }
1230

1231
    if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
4,745!
1232
      ts = pEntry->startTime;
2,775✔
1233
      taskId = pEntry->id.taskId;
2,775✔
1234
    }
1235
  }
1236

1237
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
1,308✔
1238
  return ts;
1,308✔
1239
}
1240

1241
typedef struct {
1242
  int64_t streamId;
1243
  int64_t duration;
1244
} SCheckpointInterval;
1245

1246
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
584✔
1247
  const SCheckpointInterval *pInt1 = p1;
584✔
1248
  const SCheckpointInterval *pInt2 = p2;
584✔
1249
  if (pInt1->duration == pInt2->duration) {
584✔
1250
    return 0;
47✔
1251
  }
1252

1253
  return pInt1->duration > pInt2->duration ? -1 : 1;
537✔
1254
}
1255

1256
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,553✔
1257
  SMnode     *pMnode = pReq->info.node;
1,553✔
1258
  SSdb       *pSdb = pMnode->pSdb;
1,553✔
1259
  void       *pIter = NULL;
1,553✔
1260
  SStreamObj *pStream = NULL;
1,553✔
1261
  int32_t     code = 0;
1,553✔
1262
  int32_t     numOfCheckpointTrans = 0;
1,553✔
1263

1264
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,553✔
1265
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
163✔
1266
  }
1267

1268
  SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
1,390✔
1269
  if (pList == NULL) {
1,390!
1270
    return terrno;
×
1271
  }
1272

1273
  int64_t now = taosGetTimestampMs();
1,390✔
1274

1275
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,313✔
1276
    int64_t duration = now - pStream->checkpointFreq;
1,923✔
1277
    if (duration < tsStreamCheckpointInterval * 1000) {
1,923✔
1278
      sdbRelease(pSdb, pStream);
615✔
1279
      continue;
817✔
1280
    }
1281

1282
    streamMutexLock(&execInfo.lock);
1,308✔
1283
    int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
1,308✔
1284
    if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
1,308!
1285
      streamMutexUnlock(&execInfo.lock);
202✔
1286
      sdbRelease(pSdb, pStream);
202✔
1287
      continue;
202✔
1288
    }
1289
    streamMutexUnlock(&execInfo.lock);
1,106✔
1290

1291
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
1,106✔
1292
    void* p = taosArrayPush(pList, &in);
1,106✔
1293
    if (p) {
1,106!
1294
      int32_t currentSize = taosArrayGetSize(pList);
1,106✔
1295
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
1,106✔
1296
             "s), concurrently launch threshold:%d",
1297
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1298
             tsMaxConcurrentCheckpoint);
1299
    } else {
1300
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1301
    }
1302
    sdbRelease(pSdb, pStream);
1,106✔
1303
  }
1304

1305
  int32_t size = taosArrayGetSize(pList);
1,390✔
1306
  if (size == 0) {
1,390✔
1307
    taosArrayDestroy(pList);
847✔
1308
    return code;
847✔
1309
  }
1310

1311
  taosArraySort(pList, streamWaitComparFn);
543✔
1312
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
543✔
1313
  if (code) {
543!
1314
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1315
    taosArrayDestroy(pList);
×
1316
    return code;
×
1317
  }
1318

1319
  int32_t numOfQual = taosArrayGetSize(pList);
543✔
1320
  if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
543!
1321
    mDebug(
×
1322
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1323
        "checkpoint trans are not allowed, wait for 30s",
1324
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1325
    taosArrayDestroy(pList);
×
1326
    return code;
×
1327
  }
1328

1329
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
543✔
1330
  mDebug(
543✔
1331
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1332
      "concurrent trans threshold:%d",
1333
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1334

1335
  int32_t started = 0;
543✔
1336
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
543✔
1337

1338
  for (int32_t i = 0; i < numOfQual; ++i) {
547✔
1339
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
545✔
1340
    if (pCheckpointInfo == NULL) {
545!
1341
      continue;
×
1342
    }
1343

1344
    SStreamObj *p = NULL;
545✔
1345
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
545✔
1346
    if (p != NULL && code == 0) {
545!
1347
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
545✔
1348
      sdbRelease(pSdb, p);
545✔
1349

1350
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
545!
1351
        started += 1;
545✔
1352

1353
        if (started >= capacity) {
545✔
1354
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
541✔
1355
                 (started + numOfCheckpointTrans));
1356
          break;
541✔
1357
        }
1358
      } else {
1359
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1360
      }
1361
    }
1362
  }
1363

1364
  taosArrayDestroy(pList);
543✔
1365
  return code;
543✔
1366
}
1367

1368
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
1,397✔
1369
  SMnode *    pMnode = pReq->info.node;
1,397✔
1370
  SStreamObj *pStream = NULL;
1,397✔
1371
  int32_t     code = 0;
1,397✔
1372

1373
  SMDropStreamReq dropReq = {0};
1,397✔
1374
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
1,397!
1375
    mError("invalid drop stream msg recv, discarded");
×
1376
    code = TSDB_CODE_INVALID_MSG;
×
1377
    TAOS_RETURN(code);
×
1378
  }
1379

1380
  mDebug("recv drop stream:%s msg", dropReq.name);
1,397✔
1381

1382
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
1,397✔
1383
  if (pStream == NULL || code != 0) {
1,397!
1384
    if (dropReq.igNotExists) {
141✔
1385
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
131!
1386
      sdbRelease(pMnode->pSdb, pStream);
131✔
1387
      tFreeMDropStreamReq(&dropReq);
131✔
1388
      return 0;
131✔
1389
    } else {
1390
      mError("stream:%s not exist failed to drop it", dropReq.name);
10!
1391
      tFreeMDropStreamReq(&dropReq);
10✔
1392
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
10✔
1393
    }
1394
  }
1395

1396
  if (pStream->smaId != 0) {
1,256✔
1397
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
219!
1398

1399
    void *   pIter = NULL;
219✔
1400
    SSmaObj *pSma = NULL;
219✔
1401
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
219✔
1402
    while (pIter) {
361✔
1403
      if (pSma && pSma->uid == pStream->smaId) {
147!
1404
        sdbRelease(pMnode->pSdb, pSma);
5✔
1405
        sdbRelease(pMnode->pSdb, pStream);
5✔
1406

1407
        sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1408
        tFreeMDropStreamReq(&dropReq);
5✔
1409
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
5✔
1410

1411
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
5!
1412
               dropReq.name, pStream->uid, tstrerror(terrno));
1413
        TAOS_RETURN(code);
5✔
1414
      }
1415

1416
      if (pSma) {
142!
1417
        sdbRelease(pMnode->pSdb, pSma);
142✔
1418
      }
1419

1420
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
142✔
1421
    }
1422
  }
1423

1424
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
1,251!
1425
    sdbRelease(pMnode->pSdb, pStream);
×
1426
    tFreeMDropStreamReq(&dropReq);
×
1427
    return -1;
×
1428
  }
1429

1430
  // check if it is conflict with other trans in both sourceDb and targetDb.
1431
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
1,251✔
1432
  if (code) {
1,251!
1433
    sdbRelease(pMnode->pSdb, pStream);
×
1434
    tFreeMDropStreamReq(&dropReq);
×
1435
    return code;
×
1436
  }
1437

1438
  STrans *pTrans = NULL;
1,251✔
1439
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
1,251✔
1440
  if (pTrans == NULL || code) {
1,251!
1441
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1442
    sdbRelease(pMnode->pSdb, pStream);
×
1443
    tFreeMDropStreamReq(&dropReq);
×
1444
    TAOS_RETURN(code);
×
1445
  }
1446

1447
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
1,251✔
1448
  if (code) {
1,251!
1449
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1450
    sdbRelease(pMnode->pSdb, pStream);
×
1451
    mndTransDrop(pTrans);
×
1452
    tFreeMDropStreamReq(&dropReq);
×
1453
    TAOS_RETURN(code);
×
1454
  }
1455

1456
  // drop all tasks
1457
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
1,251✔
1458
  if (code) {
1,251!
1459
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1460
    sdbRelease(pMnode->pSdb, pStream);
×
1461
    mndTransDrop(pTrans);
×
1462
    tFreeMDropStreamReq(&dropReq);
×
1463
    TAOS_RETURN(code);
×
1464
  }
1465

1466
  // drop stream
1467
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
1,251✔
1468
  if (code) {
1,251!
1469
    sdbRelease(pMnode->pSdb, pStream);
×
1470
    mndTransDrop(pTrans);
×
1471
    tFreeMDropStreamReq(&dropReq);
×
1472
    TAOS_RETURN(code);
×
1473
  }
1474

1475
  code = mndTransPrepare(pMnode, pTrans);
1,251✔
1476
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,251!
1477
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1478
    sdbRelease(pMnode->pSdb, pStream);
×
1479
    mndTransDrop(pTrans);
×
1480
    tFreeMDropStreamReq(&dropReq);
×
1481
    TAOS_RETURN(code);
×
1482
  }
1483

1484
  // kill the related checkpoint trans
1485
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
1,251✔
1486
  if (transId != 0) {
1,251!
1487
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
1488
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1489
  }
1490

1491
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
1,251✔
1492
         pStream->uid, transId);
1493

1494
  removeStreamTasksInBuf(pStream, &execInfo);
1,251✔
1495

1496
  SName name = {0};
1,251✔
1497
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,251✔
1498
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
1,251✔
1499

1500
  sdbRelease(pMnode->pSdb, pStream);
1,251✔
1501
  mndTransDrop(pTrans);
1,251✔
1502
  tFreeMDropStreamReq(&dropReq);
1,251✔
1503

1504
  if (code == 0) {
1,251✔
1505
    return TSDB_CODE_ACTION_IN_PROGRESS;
1,241✔
1506
  } else {
1507
    TAOS_RETURN(code);
10✔
1508
  }
1509
}
1510

1511
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,939✔
1512
  SSdb   *pSdb = pMnode->pSdb;
1,939✔
1513
  void   *pIter = NULL;
1,939✔
1514
  int32_t code = 0;
1,939✔
1515

1516
  while (1) {
578✔
1517
    SStreamObj *pStream = NULL;
2,517✔
1518
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,517✔
1519
    if (pIter == NULL) break;
2,517✔
1520

1521
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
579✔
1522
      if (pStream->sourceDbUid != pStream->targetDbUid) {
82✔
1523
        sdbRelease(pSdb, pStream);
1✔
1524
        sdbCancelFetch(pSdb, pIter);
1✔
1525
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1526
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1527
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1528
      } else {
1529
        // kill the related checkpoint trans
1530
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
81✔
1531
        if (transId != 0) {
81!
1532
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
1533
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1534
        }
1535

1536
        // drop the stream obj in execInfo
1537
        removeStreamTasksInBuf(pStream, &execInfo);
81✔
1538

1539
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
81✔
1540
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
81!
1541
          sdbRelease(pSdb, pStream);
×
1542
          sdbCancelFetch(pSdb, pIter);
×
1543
          return code;
×
1544
        }
1545
      }
1546
    }
1547

1548
    sdbRelease(pSdb, pStream);
578✔
1549
  }
1550

1551
  return 0;
1,938✔
1552
}
1553

1554
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,250✔
1555
  SMnode     *pMnode = pReq->info.node;
11,250✔
1556
  SSdb       *pSdb = pMnode->pSdb;
11,250✔
1557
  int32_t     numOfRows = 0;
11,250✔
1558
  SStreamObj *pStream = NULL;
11,250✔
1559
  int32_t     code = 0;
11,250✔
1560

1561
  while (numOfRows < rows) {
44,387!
1562
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
44,387✔
1563
    if (pShow->pIter == NULL) break;
44,402✔
1564

1565
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
33,143✔
1566
    if (code == 0) {
33,069!
1567
      numOfRows++;
33,071✔
1568
    }
1569
    sdbRelease(pSdb, pStream);
33,069✔
1570
  }
1571

1572
  pShow->numOfRows += numOfRows;
11,259✔
1573
  return numOfRows;
11,259✔
1574
}
1575

1576
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1577
  SSdb *pSdb = pMnode->pSdb;
×
1578
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1579
}
×
1580

1581
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
19,692✔
1582
  SMnode     *pMnode = pReq->info.node;
19,692✔
1583
  SSdb       *pSdb = pMnode->pSdb;
19,692✔
1584
  int32_t     numOfRows = 0;
19,692✔
1585
  SStreamObj *pStream = NULL;
19,692✔
1586
  int32_t     code = 0;
19,692✔
1587

1588
  streamMutexLock(&execInfo.lock);
19,692✔
1589
  mndInitStreamExecInfo(pMnode, &execInfo);
19,707✔
1590
  streamMutexUnlock(&execInfo.lock);
19,707✔
1591

1592
  while (numOfRows < rowsCapacity) {
78,079✔
1593
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
78,040✔
1594
    if (pShow->pIter == NULL) {
78,037✔
1595
      break;
19,665✔
1596
    }
1597

1598
    // lock
1599
    taosRLockLatch(&pStream->lock);
58,372✔
1600

1601
    int32_t count = mndGetNumOfStreamTasks(pStream);
58,376✔
1602
    if (numOfRows + count > rowsCapacity) {
58,301✔
1603
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
30✔
1604
      if (code) {
30!
1605
        mError("failed to prepare the result block buffer, quit return value");
×
1606
        taosRUnLockLatch(&pStream->lock);
×
1607
        sdbRelease(pSdb, pStream);
×
1608
        continue;
×
1609
      }
1610
    }
1611

1612
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
58,301✔
1613
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
58,301✔
1614
    if (pSourceDb != NULL) {
58,334!
1615
      precision = pSourceDb->cfg.precision;
58,339✔
1616
      mndReleaseDb(pMnode, pSourceDb);
58,339✔
1617
    }
1618

1619
    // add row for each task
1620
    SStreamTaskIter *pIter = NULL;
58,372✔
1621
    code = createStreamTaskIter(pStream, &pIter);
58,372✔
1622
    if (code) {
58,370!
1623
      taosRUnLockLatch(&pStream->lock);
×
1624
      sdbRelease(pSdb, pStream);
×
1625
      mError("failed to create task iter for stream:%s", pStream->name);
×
1626
      continue;
×
1627
    }
1628

1629
    while (streamTaskIterNextTask(pIter)) {
259,182✔
1630
      SStreamTask *pTask = NULL;
200,815✔
1631
      code = streamTaskIterGetCurrent(pIter, &pTask);
200,815✔
1632
      if (code) {
200,894!
1633
        destroyStreamTaskIter(pIter);
×
1634
        break;
×
1635
      }
1636

1637
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
200,894✔
1638
      if (code == TSDB_CODE_SUCCESS) {
200,812!
1639
        numOfRows++;
200,820✔
1640
      }
1641
    }
1642

1643
    pBlock->info.rows = numOfRows;
57,525✔
1644

1645
    destroyStreamTaskIter(pIter);
57,525✔
1646
    taosRUnLockLatch(&pStream->lock);
58,317✔
1647

1648
    sdbRelease(pSdb, pStream);
58,364✔
1649
  }
1650

1651
  pShow->numOfRows += numOfRows;
19,704✔
1652
  return numOfRows;
19,704✔
1653
}
1654

1655
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
1656
  SSdb *pSdb = pMnode->pSdb;
×
1657
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1658
}
×
1659

1660
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
750✔
1661
  SMnode     *pMnode = pReq->info.node;
750✔
1662
  SStreamObj *pStream = NULL;
750✔
1663
  int32_t     code = 0;
750✔
1664

1665
  SMPauseStreamReq pauseReq = {0};
750✔
1666
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
750!
1667
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1668
  }
1669

1670
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
750✔
1671
  if (pStream == NULL || code != 0) {
750!
1672
    if (pauseReq.igNotExists) {
422✔
1673
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
169!
1674
      return 0;
169✔
1675
    } else {
1676
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
253!
1677
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
253✔
1678
    }
1679
  }
1680

1681
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
328!
1682

1683
  if (pStream->status == STREAM_STATUS__PAUSE) {
328!
1684
    sdbRelease(pMnode->pSdb, pStream);
×
1685
    return 0;
×
1686
  }
1687

1688
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
328!
1689
    sdbRelease(pMnode->pSdb, pStream);
×
1690
    return code;
×
1691
  }
1692

1693
  // check if it is conflict with other trans in both sourceDb and targetDb.
1694
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
328✔
1695
  if (code) {
328!
1696
    sdbRelease(pMnode->pSdb, pStream);
×
1697
    TAOS_RETURN(code);
×
1698
  }
1699

1700
  bool updated = mndStreamNodeIsUpdated(pMnode);
328✔
1701
  if (updated) {
328!
1702
    mError("tasks are not ready for pause, node update detected");
×
1703
    sdbRelease(pMnode->pSdb, pStream);
×
1704
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1705
  }
1706

1707
  {  // check for tasks, if tasks are not ready, not allowed to pause
1708
    bool found = false;
328✔
1709
    bool readyToPause = true;
328✔
1710
    streamMutexLock(&execInfo.lock);
328✔
1711

1712
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
4,858✔
1713
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
4,530✔
1714
      if (p == NULL) {
4,530!
1715
        continue;
×
1716
      }
1717

1718
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
4,530✔
1719
      if (pEntry == NULL) {
4,530!
1720
        continue;
×
1721
      }
1722

1723
      if (pEntry->id.streamId != pStream->uid) {
4,530✔
1724
        continue;
2,953✔
1725
      }
1726

1727
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
1,577!
1728
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
184!
1729
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1730
        readyToPause = false;
184✔
1731
      }
1732

1733
      found = true;
1,577✔
1734
    }
1735

1736
    streamMutexUnlock(&execInfo.lock);
328✔
1737
    if (!found) {
328!
1738
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1739
      sdbRelease(pMnode->pSdb, pStream);
×
1740
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1741
    }
1742

1743
    if (!readyToPause) {
328✔
1744
      mError("stream:%s task not ready for pause yet", pauseReq.name);
46!
1745
      sdbRelease(pMnode->pSdb, pStream);
46✔
1746
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
46✔
1747
    }
1748
  }
1749

1750
  STrans *pTrans = NULL;
282✔
1751
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
282✔
1752
  if (pTrans == NULL || code) {
282!
1753
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1754
    sdbRelease(pMnode->pSdb, pStream);
×
1755
    return code;
×
1756
  }
1757

1758
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
282✔
1759
  if (code) {
282!
1760
    sdbRelease(pMnode->pSdb, pStream);
×
1761
    mndTransDrop(pTrans);
×
1762
    return code;
×
1763
  }
1764

1765
  // if nodeUpdate happened, not send pause trans
1766
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
282✔
1767
  if (code) {
282!
1768
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1769
    sdbRelease(pMnode->pSdb, pStream);
×
1770
    mndTransDrop(pTrans);
×
1771
    return code;
×
1772
  }
1773

1774
  // pause stream
1775
  taosWLockLatch(&pStream->lock);
282✔
1776
  pStream->status = STREAM_STATUS__PAUSE;
282✔
1777
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
282✔
1778
  if (code) {
282!
1779
    taosWUnLockLatch(&pStream->lock);
×
1780
    sdbRelease(pMnode->pSdb, pStream);
×
1781
    mndTransDrop(pTrans);
×
1782
    return code;
×
1783
  }
1784

1785
  taosWUnLockLatch(&pStream->lock);
282✔
1786

1787
  code = mndTransPrepare(pMnode, pTrans);
282✔
1788
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
282!
1789
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1790
    sdbRelease(pMnode->pSdb, pStream);
×
1791
    mndTransDrop(pTrans);
×
1792
    return code;
×
1793
  }
1794

1795
  sdbRelease(pMnode->pSdb, pStream);
282✔
1796
  mndTransDrop(pTrans);
282✔
1797

1798
  return TSDB_CODE_ACTION_IN_PROGRESS;
282✔
1799
}
1800

1801
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
871✔
1802
  SMnode     *pMnode = pReq->info.node;
871✔
1803
  SStreamObj *pStream = NULL;
871✔
1804
  int32_t     code = 0;
871✔
1805

1806
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
871!
1807
    return code;
×
1808
  }
1809

1810
  SMResumeStreamReq resumeReq = {0};
871✔
1811
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
871!
1812
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1813
  }
1814

1815
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
871✔
1816
  if (pStream == NULL || code != 0) {
871!
1817
    if (resumeReq.igNotExists) {
338✔
1818
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
337!
1819
      sdbRelease(pMnode->pSdb, pStream);
337✔
1820
      return 0;
337✔
1821
    } else {
1822
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1823
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1824
    }
1825
  }
1826

1827
  if (pStream->status != STREAM_STATUS__PAUSE) {
533✔
1828
    sdbRelease(pMnode->pSdb, pStream);
252✔
1829
    return 0;
252✔
1830
  }
1831

1832
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
281!
1833
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
281!
1834
    sdbRelease(pMnode->pSdb, pStream);
×
1835
    return -1;
×
1836
  }
1837

1838
  // check if it is conflict with other trans in both sourceDb and targetDb.
1839
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
281✔
1840
  if (code) {
281!
1841
    sdbRelease(pMnode->pSdb, pStream);
×
1842
    return code;
×
1843
  }
1844

1845
  STrans *pTrans = NULL;
281✔
1846
  code =
1847
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
281✔
1848
  if (pTrans == NULL || code) {
281!
1849
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
1850
    sdbRelease(pMnode->pSdb, pStream);
×
1851
    return code;
×
1852
  }
1853

1854
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
281✔
1855
  if (code) {
281!
1856
    sdbRelease(pMnode->pSdb, pStream);
×
1857
    mndTransDrop(pTrans);
×
1858
    return code;
×
1859
  }
1860

1861
  // set the resume action
1862
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
281✔
1863
  if (code) {
281!
1864
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
1865
    sdbRelease(pMnode->pSdb, pStream);
×
1866
    mndTransDrop(pTrans);
×
1867
    return code;
×
1868
  }
1869

1870
  // resume stream
1871
  taosWLockLatch(&pStream->lock);
281✔
1872
  pStream->status = STREAM_STATUS__NORMAL;
281✔
1873
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
281!
1874
    taosWUnLockLatch(&pStream->lock);
×
1875

1876
    sdbRelease(pMnode->pSdb, pStream);
×
1877
    mndTransDrop(pTrans);
×
1878
    return code;
×
1879
  }
1880

1881
  taosWUnLockLatch(&pStream->lock);
281✔
1882
  code = mndTransPrepare(pMnode, pTrans);
281✔
1883
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
281!
1884
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1885
    sdbRelease(pMnode->pSdb, pStream);
×
1886
    mndTransDrop(pTrans);
×
1887
    return code;
×
1888
  }
1889

1890
  sdbRelease(pMnode->pSdb, pStream);
281✔
1891
  mndTransDrop(pTrans);
281✔
1892

1893
  return TSDB_CODE_ACTION_IN_PROGRESS;
281✔
1894
}
1895

1896
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
10✔
1897
  SSdb       *pSdb = pMnode->pSdb;
10✔
1898
  SStreamObj *pStream = NULL;
10✔
1899
  void       *pIter = NULL;
10✔
1900
  STrans     *pTrans = NULL;
10✔
1901
  int32_t     code = 0;
10✔
1902

1903
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
1904
  while (1) {
1905
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
20✔
1906
    if (pIter == NULL) {
20✔
1907
      break;
10✔
1908
    }
1909

1910
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
10✔
1911
    sdbRelease(pSdb, pStream);
10✔
1912

1913
    if (code) {
10!
1914
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
1915
      sdbCancelFetch(pSdb, pIter);
×
1916
      return code;
×
1917
    }
1918
  }
1919

1920
  while (1) {
1921
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
20✔
1922
    if (pIter == NULL) {
20✔
1923
      break;
10✔
1924
    }
1925

1926
    // here create only one trans
1927
    if (pTrans == NULL) {
10!
1928
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets", &pTrans);
10✔
1929
      if (pTrans == NULL || code) {
10!
1930
        sdbRelease(pSdb, pStream);
×
1931
        sdbCancelFetch(pSdb, pIter);
×
1932
        return terrno = code;
×
1933
      }
1934

1935
      code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
10✔
1936
      if (code) {
10!
1937
        mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
1938
      }
1939
    }
1940

1941
    if (!includeAllNodes) {
10!
1942
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
10✔
1943
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
10✔
1944
      if (p1 == NULL && p2 == NULL) {
10!
1945
        mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
×
1946
        sdbRelease(pSdb, pStream);
×
1947
        continue;
×
1948
      }
1949
    }
1950

1951
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
10✔
1952
           pStream->name, pTrans->id);
1953

1954
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
10✔
1955

1956
    // todo: not continue, drop all and retry again
1957
    if (code != TSDB_CODE_SUCCESS) {
10!
1958
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
1959
             tstrerror(code));
1960
      sdbRelease(pSdb, pStream);
×
1961
      continue;
×
1962
    }
1963

1964
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
10✔
1965
    sdbRelease(pSdb, pStream);
10✔
1966

1967
    if (code != TSDB_CODE_SUCCESS) {
10!
1968
      sdbCancelFetch(pSdb, pIter);
×
1969
      return code;
×
1970
    }
1971
  }
1972

1973
  // no need to build the trans to handle the vgroup update
1974
  if (pTrans == NULL) {
10!
1975
    return 0;
×
1976
  }
1977

1978
  code = mndTransPrepare(pMnode, pTrans);
10✔
1979
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10!
1980
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
1981
    sdbRelease(pMnode->pSdb, pStream);
×
1982
    mndTransDrop(pTrans);
×
1983
    return code;
×
1984
  }
1985

1986
  sdbRelease(pMnode->pSdb, pStream);
10✔
1987
  mndTransDrop(pTrans);
10✔
1988
  return code;
10✔
1989
}
1990

1991
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
709✔
1992
  SSdb       *pSdb = pMnode->pSdb;
709✔
1993
  SStreamObj *pStream = NULL;
709✔
1994
  void       *pIter = NULL;
709✔
1995
  int32_t     code = 0;
709✔
1996

1997
  mDebug("start to refresh node list by existed streams");
709✔
1998

1999
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
709✔
2000
  if (pHash == NULL) {
709!
2001
    return terrno;
×
2002
  }
2003

2004
  while (1) {
10✔
2005
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
719✔
2006
    if (pIter == NULL) {
719✔
2007
      break;
709✔
2008
    }
2009

2010
    taosWLockLatch(&pStream->lock);
10✔
2011

2012
    SStreamTaskIter *pTaskIter = NULL;
10✔
2013
    code = createStreamTaskIter(pStream, &pTaskIter);
10✔
2014
    if (code) {
10!
2015
      taosWUnLockLatch(&pStream->lock);
×
2016
      sdbRelease(pSdb, pStream);
×
2017
      mError("failed to create task iter for stream:%s", pStream->name);
×
2018
      continue;
×
2019
    }
2020

2021
    while (streamTaskIterNextTask(pTaskIter)) {
70✔
2022
      SStreamTask *pTask = NULL;
60✔
2023
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
60✔
2024
      if (code) {
60!
2025
        break;
×
2026
      }
2027

2028
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
60✔
2029
      epsetAssign(&entry.epset, &pTask->info.epSet);
60✔
2030
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
60✔
2031
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
60!
2032
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2033
      }
2034
    }
2035

2036
    destroyStreamTaskIter(pTaskIter);
10✔
2037
    taosWUnLockLatch(&pStream->lock);
10✔
2038

2039
    sdbRelease(pSdb, pStream);
10✔
2040
  }
2041

2042
  taosArrayClear(pNodeList);
709✔
2043

2044
  // convert to list
2045
  pIter = NULL;
709✔
2046
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
740✔
2047
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
31✔
2048

2049
    void *p = taosArrayPush(pNodeList, pEntry);
31✔
2050
    if (p == NULL) {
31!
2051
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2052
      if (code == 0) {
×
2053
        code = terrno;
×
2054
      }
2055
      continue;
×
2056
    }
2057

2058
    char buf[256] = {0};
31✔
2059
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
31✔
2060
    if (ret != 0) {  // print error and continue
31!
2061
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2062
    }
2063

2064
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
31✔
2065
  }
2066

2067
  taosHashCleanup(pHash);
709✔
2068

2069
  mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
709✔
2070
  return code;
709✔
2071
}
2072

2073
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2074
  void   *pIter = NULL;
×
2075
  int32_t code = 0;
×
2076
  while (1) {
×
2077
    SVgObj *pVgroup = NULL;
×
2078
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2079
    if (pIter == NULL) {
×
2080
      break;
×
2081
    }
2082

2083
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2084
    sdbRelease(pSdb, pVgroup);
×
2085

2086
    if (code == 0) {
×
2087
      int32_t size = taosHashGetSize(pDBMap);
×
2088
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2089
    }
2090
  }
2091
}
×
2092

2093
// this function runs by only one thread, so it is not multi-thread safe
2094
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
1,276✔
2095
  int32_t code = 0;
1,276✔
2096
  bool    allReady = true;
1,276✔
2097
  SArray *pNodeSnapshot = NULL;
1,276✔
2098
  SMnode *pMnode = pMsg->info.node;
1,276✔
2099
  int64_t ts = taosGetTimestampSec();
1,276✔
2100
  bool    updateAllVgroups = false;
1,276✔
2101

2102
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
1,276✔
2103
  if (old != 0) {
1,276!
2104
    mDebug("still in checking node change");
×
2105
    return 0;
×
2106
  }
2107

2108
  mDebug("start to do node changing check");
1,276✔
2109

2110
  streamMutexLock(&execInfo.lock);
1,276✔
2111
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,276✔
2112
  streamMutexUnlock(&execInfo.lock);
1,276✔
2113

2114
  if (numOfNodes == 0) {
1,276!
2115
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2116
    execInfo.ts = ts;
×
2117
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2118
    return 0;
×
2119
  }
2120

2121
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
1,276✔
2122
  if (code) {
1,276!
2123
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2124
  }
2125

2126
  if (!allReady) {
1,276✔
2127
    taosArrayDestroy(pNodeSnapshot);
34✔
2128
    atomic_store_32(&mndNodeCheckSentinel, 0);
34✔
2129
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
34!
2130
    return 0;
34✔
2131
  }
2132

2133
  streamMutexLock(&execInfo.lock);
1,242✔
2134

2135
  code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
1,242✔
2136
  if (code) {
1,242!
2137
    goto _end;
×
2138
  }
2139

2140
  SVgroupChangeInfo changeInfo = {0};
1,242✔
2141
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
1,242✔
2142
  if (code) {
1,242!
2143
    goto _end;
×
2144
  }
2145

2146
  {
2147
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
1,242!
2148
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2149
      updateAllVgroups = true;
×
2150
      execInfo.switchFromFollower = false;  // reset the flag
×
2151
      addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
×
2152
    }
2153
  }
2154

2155
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
1,242!
2156
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2157
    killAllCheckpointTrans(pMnode, &changeInfo);
10✔
2158
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
10✔
2159

2160
    // keep the new vnode snapshot if success
2161
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
10!
2162
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
10✔
2163
      if (code) {
10!
2164
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
2165
        goto _end;
×
2166
      }
2167

2168
      execInfo.ts = ts;
10✔
2169
      mDebug("create trans successfully, update cached node list, numOfNodes:%d",
10✔
2170
             (int)taosArrayGetSize(execInfo.pNodeList));
2171
    } else {
2172
      mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
×
2173
    }
2174
  } else {
2175
    mDebug("no update found in nodeList");
1,232✔
2176
  }
2177

2178
  mndDestroyVgroupChangeInfo(&changeInfo);
1,242✔
2179

2180
  _end:
1,242✔
2181
  streamMutexUnlock(&execInfo.lock);
1,242✔
2182
  taosArrayDestroy(pNodeSnapshot);
1,242✔
2183

2184
  mDebug("end to do stream task node change checking");
1,242✔
2185
  atomic_store_32(&mndNodeCheckSentinel, 0);
1,242✔
2186
  return 0;
1,242✔
2187
}
2188

2189
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
2,506✔
2190
  SMnode *pMnode = pReq->info.node;
2,506✔
2191
  SSdb   *pSdb = pMnode->pSdb;
2,506✔
2192
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
2,506✔
2193
    return 0;
1,229✔
2194
  }
2195

2196
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
1,277✔
2197
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
1,277✔
2198
  if (pMsg == NULL) {
1,277!
2199
    return terrno;
×
2200
  }
2201

2202
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
1,277✔
2203
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1,277✔
2204
}
2205

2206
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,801✔
2207
  SStreamTaskIter *pIter = NULL;
1,801✔
2208
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,801✔
2209
  if (code) {
1,801!
2210
    mError("failed to create task iter for stream:%s", pStream->name);
×
2211
    return;
×
2212
  }
2213

2214
  while (streamTaskIterNextTask(pIter)) {
11,131✔
2215
    SStreamTask *pTask = NULL;
9,330✔
2216
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,330✔
2217
    if (code) {
9,330!
2218
      break;
×
2219
    }
2220

2221
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
9,330✔
2222
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
9,330✔
2223
    if (p == NULL) {
9,330✔
2224
      STaskStatusEntry entry = {0};
8,693✔
2225
      streamTaskStatusInit(&entry, pTask);
8,693✔
2226

2227
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
8,693✔
2228
      if (code == 0) {
8,693!
2229
        void *  px = taosArrayPush(pExecNode->pTaskList, &id);
8,693✔
2230
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
8,693✔
2231
        if (px) {
8,693!
2232
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
8,693!
2233
        } else {
2234
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2235
        }
2236
      } else {
2237
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t) entry.id.taskId);
×
2238
      }
2239

2240
      // add the new vgroups if not added yet
2241
      bool exist = false;
8,693✔
2242
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
50,321✔
2243
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
48,607✔
2244
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
48,607!
2245
          exist = true;
6,979✔
2246
          break;
6,979✔
2247
        }
2248
      }
2249

2250
      if (!exist) {
8,693✔
2251
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
1,714✔
2252
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
1,714✔
2253

2254
        void* px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
1,714✔
2255
        if (px) {
1,714!
2256
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
1,714!
2257
        } else {
2258
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList))
×
2259
        }
2260
      }
2261
    }
2262
  }
2263

2264
  destroyStreamTaskIter(pIter);
1,801✔
2265
}
2266

2267
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
4,716✔
2268
  int32_t num = taosArrayGetSize(pList);
4,716✔
2269
  for (int32_t i = 0; i < num; ++i) {
17,421✔
2270
    int32_t *pId = taosArrayGet(pList, i);
12,714✔
2271
    if (pId == NULL) {
12,714!
2272
      continue;
×
2273
    }
2274

2275
    if (taskId == *pId) {
12,714✔
2276
      return;
9✔
2277
    }
2278
  }
2279

2280
  int32_t numOfTasks = taosArrayGetSize(pList);
4,707✔
2281
  void   *p = taosArrayPush(pList, &taskId);
4,707✔
2282
  if (p) {
4,707!
2283
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
4,707✔
2284
  } else {
2285
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2286
           uid, numOfTasks);
2287
  }
2288
}
2289

2290
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
4,716✔
2291
  SMnode                  *pMnode = pReq->info.node;
4,716✔
2292
  SStreamTaskCheckpointReq req = {0};
4,716✔
2293

2294
  SDecoder decoder = {0};
4,716✔
2295
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,716✔
2296

2297
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
4,716!
2298
    tDecoderClear(&decoder);
×
2299
    mError("invalid task checkpoint req msg received");
×
2300
    return TSDB_CODE_INVALID_MSG;
×
2301
  }
2302
  tDecoderClear(&decoder);
4,716✔
2303

2304
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
4,716✔
2305

2306
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2307
  streamMutexLock(&execInfo.lock);
4,716✔
2308

2309
  SStreamObj *pStream = NULL;
4,716✔
2310
  int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,716✔
2311
  if (pStream == NULL || code != 0) {
4,716!
2312
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2313
          req.streamId);
2314

2315
    // not in meta-store yet, try to acquire the task in exec buffer
2316
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2317
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2318
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2319
    if (p == NULL) {
×
2320
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2321
      streamMutexUnlock(&execInfo.lock);
×
2322
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2323
    } else {
2324
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2325
             req.streamId, req.taskId);
2326
    }
2327
  }
2328

2329
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,716!
2330

2331
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
4,716✔
2332
  if (pReqTaskList == NULL) {
4,716✔
2333
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
806✔
2334
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
806✔
2335
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
806✔
2336
    if (code) {
806!
2337
      mError("failed to put into transfer state stream map, code: out of memory");
×
2338
    }
2339
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
806✔
2340
  } else {
2341
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
3,910✔
2342
  }
2343

2344
  int32_t total = taosArrayGetSize(*pReqTaskList);
4,716✔
2345
  if (total == numOfTasks) {  // all tasks has send the reqs
4,716✔
2346
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
796✔
2347
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
796!
2348

2349
    if (pStream != NULL) {  // TODO:handle error
796!
2350
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
796✔
2351
      if (code) {
796!
2352
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
796!
2353
      }
2354
    } else {
2355
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2356
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2357
      // sleep(500ms)
2358
    }
2359

2360
    // remove this entry
2361
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
796✔
2362

2363
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
796✔
2364
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
796✔
2365
  }
2366

2367
  if (pStream != NULL) {
4,716!
2368
    mndReleaseStream(pMnode, pStream);
4,716✔
2369
  }
2370

2371
  streamMutexUnlock(&execInfo.lock);
4,716✔
2372

2373
  {
2374
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
4,716✔
2375
    rsp.pCont = rpcMallocCont(rsp.contLen);
4,716✔
2376
    if (rsp.pCont == NULL) {
4,716!
2377
      return terrno;
×
2378
    }
2379

2380
    SMsgHead *pHead = rsp.pCont;
4,716✔
2381
    pHead->vgId = htonl(req.nodeId);
4,716✔
2382

2383
    tmsgSendRsp(&rsp);
4,716✔
2384
    pReq->info.handle = NULL;  // disable auto rsp
4,716✔
2385
  }
2386

2387
  return 0;
4,716✔
2388
}
2389

2390
// valid the info according to the HbMsg
2391
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
6,368✔
2392
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
6,368✔
2393
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
6,368✔
2394
  if (pTaskEntry == NULL) {
6,368✔
2395
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
28!
2396
    return false;
28✔
2397
  }
2398

2399
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
6,340!
2400
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2401
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2402
    return false;
×
2403
  }
2404

2405
  // now the task in checkpoint procedure
2406
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
6,340!
2407
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2408
           " discard",
2409
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2410
    return false;
×
2411
  }
2412

2413
  if (reportChkptId >= pReport->checkpointId) {
6,340!
2414
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2415
           " discard",
2416
           pReport->taskId, pReport->checkpointId, reportChkptId);
2417
    return false;
×
2418
  }
2419

2420
  return true;
6,340✔
2421
}
2422

2423
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) {
6,368✔
2424
  bool valid = validateChkptReport(pReport, reportChkptId);
6,368✔
2425
  if (!valid) {
6,368✔
2426
    return;
28✔
2427
  }
2428

2429
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
21,619✔
2430
    STaskChkptInfo *p = taosArrayGet(pList, i);
15,279✔
2431
    if (p == NULL) {
15,279!
2432
      continue;
×
2433
    }
2434

2435
    if (p->taskId == pReport->taskId) {
15,279!
2436
      if (p->checkpointId > pReport->checkpointId) {
×
2437
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2438
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2439
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2440
        mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2441
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2442

2443
        // update the checkpoint report info
2444
        p->checkpointId = pReport->checkpointId;
×
2445
        p->ts = pReport->checkpointTs;
×
2446
        p->version = pReport->checkpointVer;
×
2447
        p->transId = pReport->transId;
×
2448
        p->dropHTask = pReport->dropHTask;
×
2449
      } else {
2450
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2451
      }
2452
      return;
×
2453
    }
2454
  }
2455

2456
  STaskChkptInfo info = {
6,340✔
2457
      .streamId = pReport->streamId,
6,340✔
2458
      .taskId = pReport->taskId,
6,340✔
2459
      .transId = pReport->transId,
6,340✔
2460
      .dropHTask = pReport->dropHTask,
6,340✔
2461
      .version = pReport->checkpointVer,
6,340✔
2462
      .ts = pReport->checkpointTs,
6,340✔
2463
      .checkpointId = pReport->checkpointId,
6,340✔
2464
      .nodeId = pReport->nodeId,
6,340✔
2465
  };
2466

2467
  void *p = taosArrayPush(pList, &info);
6,340✔
2468
  if (p == NULL) {
6,340!
2469
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2470
  } else {
2471
    int32_t size = taosArrayGetSize(pList);
6,340✔
2472
    mDebug("stream:0x%"PRIx64" %d tasks has send checkpoint-report", pReport->streamId, size);
6,340✔
2473
  }
2474
}
2475

2476
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
6,368✔
2477
  SMnode           *pMnode = pReq->info.node;
6,368✔
2478
  SCheckpointReport req = {0};
6,368✔
2479

2480
  SDecoder decoder = {0};
6,368✔
2481
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
6,368✔
2482

2483
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
6,368!
2484
    tDecoderClear(&decoder);
×
2485
    mError("invalid task checkpoint-report msg received");
×
2486
    return TSDB_CODE_INVALID_MSG;
×
2487
  }
2488
  tDecoderClear(&decoder);
6,368✔
2489

2490
  streamMutexLock(&execInfo.lock);
6,368✔
2491
  mndInitStreamExecInfo(pMnode, &execInfo);
6,368✔
2492
  streamMutexUnlock(&execInfo.lock);
6,368✔
2493

2494
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
6,368✔
2495
         " checkpointVer:%" PRId64 " transId:%d",
2496
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2497

2498
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2499
  streamMutexLock(&execInfo.lock);
6,368✔
2500

2501
  SStreamObj *pStream = NULL;
6,368✔
2502
  int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream);
6,368✔
2503
  if (pStream == NULL || code != 0) {
6,368!
2504
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2505

2506
    // not in meta-store yet, try to acquire the task in exec buffer
2507
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2508
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2509
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2510
    if (p == NULL) {
×
2511
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2512
      streamMutexUnlock(&execInfo.lock);
×
2513
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2514
    } else {
2515
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2516
             req.streamId, req.taskId);
2517
    }
2518
  }
2519

2520
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
6,368!
2521

2522
  SChkptReportInfo *pInfo = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
6,368✔
2523
  if (pInfo == NULL) {
6,368✔
2524
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
791✔
2525
    if (info.pTaskList != NULL) {
791!
2526
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
791✔
2527
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
791✔
2528
      if (code) {
791!
2529
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2530
      }
2531

2532
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
791✔
2533
    }
2534
  } else {
2535
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
5,577✔
2536
  }
2537

2538
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
6,368✔
2539
  if (total == numOfTasks) {  // all tasks has send the reqs
6,368✔
2540
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
1,294!
2541
          " will be issued soon",
2542
          req.streamId, pStream->name, total, req.checkpointId);
2543
  }
2544

2545
  if (pStream != NULL) {
6,368!
2546
    mndReleaseStream(pMnode, pStream);
6,368✔
2547
  }
2548

2549
  streamMutexUnlock(&execInfo.lock);
6,368✔
2550

2551
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
6,368✔
2552
  return code;
6,368✔
2553
}
2554

2555
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks, bool *pAllSame) {
110✔
2556
  int32_t num = 0;
110✔
2557
  int64_t chkId = INT64_MAX;
110✔
2558
  *pExistedTasks = 0;
110✔
2559
  *pAllSame = true;
110✔
2560

2561
  for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
1,614✔
2562
    STaskId* p = taosArrayGet(execInfo.pTaskList, i);
1,504✔
2563
    if (p == NULL) {
1,504!
2564
      continue;
×
2565
    }
2566

2567
    if (p->streamId != streamId) {
1,504✔
2568
      continue;
868✔
2569
    }
2570

2571
    num += 1;
636✔
2572
    STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
636✔
2573
    if (chkId > pe->checkpointInfo.latestId) {
636✔
2574
      if (chkId != INT64_MAX) {
114✔
2575
        *pAllSame = false;
4✔
2576
      }
2577
      chkId = pe->checkpointInfo.latestId;
114✔
2578
    }
2579
  }
2580

2581
  *pExistedTasks = num;
110✔
2582
  if (num < numOfTasks) { // not all task send info to mnode through hbMsg, no valid checkpoint Id
110!
2583
    return -1;
×
2584
  }
2585

2586
  return chkId;
110✔
2587
}
2588

2589
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
6,368✔
2590
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
6,368✔
2591
  rsp.pCont = rpcMallocCont(rsp.contLen);
6,368✔
2592
  if (rsp.pCont != NULL) {
6,368!
2593
    SMsgHead *pHead = rsp.pCont;
6,368✔
2594
    pHead->vgId = htonl(vgId);
6,368✔
2595

2596
    tmsgSendRsp(&rsp);
6,368✔
2597
    pInfo->handle = NULL;  // disable auto rsp
6,368✔
2598
  }
2599
}
6,368✔
2600

2601
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
11,387✔
2602
  SMnode *pMnode = pMsg->info.node;
11,387✔
2603
  int64_t now = taosGetTimestampMs();
11,387✔
2604
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
11,387✔
2605
  if (pStreamList == NULL) {
11,387!
2606
    return terrno;
×
2607
  }
2608

2609
  mDebug("start to process consensus-checkpointId in tmr");
11,387✔
2610

2611
  bool    allReady = true;
11,387✔
2612
  SArray *pNodeSnapshot = NULL;
11,387✔
2613

2614
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
11,387✔
2615
  taosArrayDestroy(pNodeSnapshot);
11,387✔
2616
  if (code) {
11,387!
2617
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
2618
  }
2619

2620
  if (!allReady) {
11,387✔
2621
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1,106!
2622
    taosArrayDestroy(pStreamList);
1,106✔
2623
    return 0;
1,106✔
2624
  }
2625

2626
  streamMutexLock(&execInfo.lock);
10,281✔
2627

2628
  void *pIter = NULL;
10,281✔
2629
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
10,305✔
2630
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
24✔
2631

2632
    int64_t streamId = -1;
24✔
2633
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
24✔
2634
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
24✔
2635
    if (pList == NULL) {
24!
2636
      continue;
×
2637
    }
2638

2639
    SStreamObj *pStream = NULL;
24✔
2640
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
24✔
2641
    if (pStream == NULL || code != 0) {  // stream has been dropped already
24!
2642
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2643
      taosArrayDestroy(pList);
×
2644
      continue;
×
2645
    }
2646

2647
    for (int32_t j = 0; j < num; ++j) {
134✔
2648
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
110✔
2649
      if (pe == NULL) {
110!
2650
        continue;
×
2651
      }
2652

2653
      streamId = pe->req.streamId;
110✔
2654

2655
      int32_t existed = 0;
110✔
2656
      bool    allSame = true;
110✔
2657
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
110✔
2658
      if (chkId == -1) {
110!
2659
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2660
               pInfo->numOfTasks, pe->req.taskId);
2661
        break;
×
2662
      }
2663

2664
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
110✔
2665
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
106✔
2666
               pe->req.startTs, (now - pe->ts) / 1000.0);
2667
        if (chkId > pe->req.checkpointId) {
106!
2668
          streamMutexUnlock(&execInfo.lock);
×
2669
          taosArrayDestroy(pStreamList);
×
2670
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2671
                 pe->req.checkpointId, chkId);
2672
          return TSDB_CODE_FAILED;
×
2673
        }
2674
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
106✔
2675
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
106!
2676
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
4!
2677
        }
2678

2679
        void* p = taosArrayPush(pList, &pe->req.taskId);
106✔
2680
        if (p == NULL) {
106!
2681
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2682
        }
2683
        streamId = pe->req.streamId;
106✔
2684
      } else {
2685
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
4!
2686
               pe->req.startTs, (now - pe->ts) / 1000.0);
2687
      }
2688
    }
2689

2690
    mndReleaseStream(pMnode, pStream);
24✔
2691

2692
    if (taosArrayGetSize(pList) > 0) {
24✔
2693
      for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
129✔
2694
        int32_t *taskId = taosArrayGet(pList, i);
106✔
2695
        if (taskId == NULL) {
106!
2696
          continue;
×
2697
        }
2698

2699
        for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
106!
2700
          SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
106✔
2701
          if ((pe != NULL) && (pe->req.taskId == *taskId)) {
106!
2702
            taosArrayRemove(pInfo->pTaskList, k);
106✔
2703
            break;
106✔
2704
          }
2705
        }
2706
      }
2707
    }
2708

2709
    taosArrayDestroy(pList);
24✔
2710

2711
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
24✔
2712
      mndClearConsensusRspEntry(pInfo);
23✔
2713
      if (streamId == -1) {
23!
2714
        streamMutexUnlock(&execInfo.lock);
×
2715
        taosArrayDestroy(pStreamList);
×
2716
        mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
×
2717
        return TSDB_CODE_FAILED;
×
2718
      }
2719
      void* p = taosArrayPush(pStreamList, &streamId);
23✔
2720
      if (p == NULL) {
23!
2721
        mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
×
2722
      }
2723
    }
2724
  }
2725

2726
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
10,304✔
2727
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
23✔
2728
    if (pStreamId == NULL) {
23!
2729
      continue;
×
2730
    }
2731

2732
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
23✔
2733
  }
2734

2735
  streamMutexUnlock(&execInfo.lock);
10,281✔
2736

2737
  taosArrayDestroy(pStreamList);
10,281✔
2738
  mDebug("end to process consensus-checkpointId in tmr");
10,281✔
2739
  return code;
10,281✔
2740
}
2741

2742
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
259✔
2743
  int32_t code = mndProcessCreateStreamReq(pReq);
259✔
2744
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
259!
2745
    pReq->info.rsp = rpcMallocCont(1);
×
2746
    if (pReq->info.rsp == NULL) {
×
2747
      return terrno;
×
2748
    }
2749

2750
    pReq->info.rspLen = 1;
×
2751
    pReq->info.noResp = false;
×
2752
    pReq->code = code;
×
2753
  }
2754
  return code;
259✔
2755
}
2756

2757
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
224✔
2758
  int32_t code = mndProcessDropStreamReq(pReq);
224✔
2759
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
224!
2760
    pReq->info.rsp = rpcMallocCont(1);
20✔
2761
    if (pReq->info.rsp == NULL) {
20!
2762
      return terrno;
×
2763
    }
2764

2765
    pReq->info.rspLen = 1;
20✔
2766
    pReq->info.noResp = false;
20✔
2767
    pReq->code = code;
20✔
2768
  }
2769
  return code;
224✔
2770
}
2771

2772
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
50,952✔
2773
  if (pExecInfo->initTaskList || pMnode == NULL) {
50,952!
2774
    return;
50,826✔
2775
  }
2776

2777
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
126✔
2778
  pExecInfo->initTaskList = true;
126✔
2779
}
2780

2781
void mndStreamResetInitTaskListLoadFlag() {
1,594✔
2782
  mInfo("reset task list buffer init flag for leader");
1,594!
2783
  execInfo.initTaskList = false;
1,594✔
2784
}
1,594✔
2785

2786
void mndUpdateStreamExecInfoRole(SMnode* pMnode, int32_t role) {
1,896✔
2787
  execInfo.switchFromFollower = false;
1,896✔
2788

2789
  if (execInfo.role == NODE_ROLE_UNINIT) {
1,896✔
2790
    execInfo.role = role;
1,722✔
2791
    if (role == NODE_ROLE_LEADER) {
1,722✔
2792
      mInfo("init mnode is set to leader");
1,541!
2793
    } else {
2794
      mInfo("init mnode is set to follower");
181!
2795
    }
2796
  } else {
2797
    if (role == NODE_ROLE_LEADER) {
174✔
2798
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
53!
2799
        execInfo.role = role;
53✔
2800
        execInfo.switchFromFollower = true;
53✔
2801
        mInfo("mnode switch to be leader from follower");
53!
2802
      } else {
2803
        mInfo("mnode remain to be leader, do nothing");
×
2804
      }
2805
    } else {  // follower's
2806
      if (execInfo.role == NODE_ROLE_LEADER) {
121✔
2807
        execInfo.role = role;
3✔
2808
        mInfo("mnode switch to be follower from leader");
3!
2809
      } else {
2810
        mInfo("mnode remain to be follower, do nothing");
118!
2811
      }
2812
    }
2813
  }
2814
}
1,896✔
2815

2816
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
126✔
2817
  SSdb       *pSdb = pMnode->pSdb;
126✔
2818
  SStreamObj *pStream = NULL;
126✔
2819
  void       *pIter = NULL;
126✔
2820

2821
  while (1) {
2822
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
326✔
2823
    if (pIter == NULL) {
326✔
2824
      break;
126✔
2825
    }
2826

2827
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
200✔
2828
    sdbRelease(pSdb, pStream);
200✔
2829
  }
2830
}
126✔
2831

2832
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
1,077✔
2833
  STrans *pTrans = NULL;
1,077✔
2834
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
1,077✔
2835
                                 "update checkpoint-info", &pTrans);
2836
  if (pTrans == NULL || code) {
1,077!
2837
    sdbRelease(pMnode->pSdb, pStream);
×
2838
    return code;
×
2839
  }
2840

2841
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
1,077✔
2842
  if (code){
1,077!
2843
    sdbRelease(pMnode->pSdb, pStream);
×
2844
    mndTransDrop(pTrans);
×
2845
    return code;
×
2846
  }
2847

2848
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
1,077✔
2849
  if (code) {
1,077!
2850
    sdbRelease(pMnode->pSdb, pStream);
×
2851
    mndTransDrop(pTrans);
×
2852
    return code;
×
2853
  }
2854

2855
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,077✔
2856
  if (code) {
1,077!
2857
    sdbRelease(pMnode->pSdb, pStream);
×
2858
    mndTransDrop(pTrans);
×
2859
    return code;
×
2860
  }
2861

2862
  code = mndTransPrepare(pMnode, pTrans);
1,077✔
2863
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,077!
2864
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
2865
    sdbRelease(pMnode->pSdb, pStream);
×
2866
    mndTransDrop(pTrans);
×
2867
    return code;
×
2868
  }
2869

2870
  sdbRelease(pMnode->pSdb, pStream);
1,077✔
2871
  mndTransDrop(pTrans);
1,077✔
2872

2873
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,077✔
2874
}
2875

2876
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
2✔
2877
  SMnode      *pMnode = pReq->info.node;
2✔
2878
  int32_t      code = 0;
2✔
2879
  SOrphanTask *pTask = NULL;
2✔
2880
  int32_t      i = 0;
2✔
2881
  STrans      *pTrans = NULL;
2✔
2882
  int32_t      numOfTasks = 0;
2✔
2883

2884
  SMStreamDropOrphanMsg msg = {0};
2✔
2885
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
2✔
2886
  if (code) {
2!
2887
    return code;
×
2888
  }
2889

2890
  numOfTasks = taosArrayGetSize(msg.pList);
2✔
2891
  if (numOfTasks == 0) {
2!
2892
    mDebug("no orphan tasks to drop, no need to create trans");
×
2893
    goto _err;
×
2894
  }
2895

2896
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
2!
2897

2898
  i = 0;
2✔
2899
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
2!
2900
    i += 1;
×
2901
  }
2902

2903
  if (pTask == NULL) {
2!
2904
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
2905
    goto _err;
×
2906
  }
2907

2908
  // check if it is conflict with other trans in both sourceDb and targetDb.
2909
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
2✔
2910
  if (code) {
2!
2911
    goto _err;
×
2912
  }
2913

2914
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
2✔
2915

2916
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
2✔
2917
  if (pTrans == NULL || code != 0) {
2!
2918
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
2919
    goto _err;
×
2920
  }
2921

2922
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
2✔
2923
  if (code) {
2!
2924
    goto _err;
×
2925
  }
2926

2927
  // drop all tasks
2928
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
2!
2929
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
2930
    goto _err;
×
2931
  }
2932

2933
  // drop stream
2934
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
2!
2935
    goto _err;
×
2936
  }
2937

2938
  code = mndTransPrepare(pMnode, pTrans);
2✔
2939
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2940
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
2941
    goto _err;
×
2942
  }
2943

2944
_err:
2✔
2945
  tDestroyDropOrphanTaskMsg(&msg);
2✔
2946
  mndTransDrop(pTrans);
2✔
2947

2948
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2949
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
2!
2950
  }
2951
  return code;
2✔
2952
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc