• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.96
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndStream.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
44

45
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
46
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
47

48
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
49
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
51
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
53
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
54
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
55
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
56
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
57
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
58
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
59
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
60
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
61
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
62
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
63
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
64
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
65
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
66

67
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
68
static void     removeExpiredNodeInfo(const SArray *pNodeSnapshot);
69
static int32_t  doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
70
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
71

72
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
73
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
74
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
75
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
77

78
int32_t mndInitStream(SMnode *pMnode) {
1,516✔
79
  SSdbTable table = {
1,516✔
80
      .sdbType = SDB_STREAM,
81
      .keyType = SDB_KEY_BINARY,
82
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
83
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
84
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
85
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
86
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
87
  };
88
  SSdbTable tableSeq = {
1,516✔
89
      .sdbType = SDB_STREAM_SEQ,
90
      .keyType = SDB_KEY_BINARY,
91
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
92
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
93
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
94
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
95
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
96
  };
97

98
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,516✔
99
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,516✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
1,516✔
101

102
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
1,516✔
103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
1,516✔
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
1,516✔
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
1,516✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
1,516✔
107
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
1,516✔
108
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
1,516✔
109
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
1,516✔
110
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
1,516✔
111

112
  // for msgs inside mnode
113
  // TODO change the name
114
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
1,516✔
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
1,516✔
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
1,516✔
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
1,516✔
118

119
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
1,516✔
120
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
1,516✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
1,516✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
1,516✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
1,516✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
1,516✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
1,516✔
126
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
1,516✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
1,516✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
1,516✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
1,516✔
130

131
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
1,516✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
1,516✔
133

134
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,516✔
135
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,516✔
136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,516✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,516✔
138

139
  int32_t code = mndInitExecInfo();
1,516✔
140
  if (code) {
1,516!
141
    return code;
×
142
  }
143

144
  code = sdbSetTable(pMnode->pSdb, table);
1,516✔
145
  if (code) {
1,516!
146
    return code;
×
147
  }
148

149
  code = sdbSetTable(pMnode->pSdb, tableSeq);
1,516✔
150
  return code;
1,516✔
151
}
152

153
void mndCleanupStream(SMnode *pMnode) {
1,515✔
154
  taosArrayDestroy(execInfo.pTaskList);
1,515✔
155
  taosArrayDestroy(execInfo.pNodeList);
1,515✔
156
  taosArrayDestroy(execInfo.pKilledChkptTrans);
1,515✔
157
  taosHashCleanup(execInfo.pTaskMap);
1,515✔
158
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
1,515✔
159
  taosHashCleanup(execInfo.pTransferStateStreams);
1,515✔
160
  taosHashCleanup(execInfo.pChkptStreams);
1,515✔
161
  taosHashCleanup(execInfo.pStreamConsensus);
1,515✔
162
  (void)taosThreadMutexDestroy(&execInfo.lock);
1,515✔
163
  mDebug("mnd stream exec info cleanup");
1,515✔
164
}
1,515✔
165

166
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
4,937✔
167
  int32_t     code = 0;
4,937✔
168
  int32_t     lino = 0;
4,937✔
169
  SSdbRow    *pRow = NULL;
4,937✔
170
  SStreamObj *pStream = NULL;
4,937✔
171
  void       *buf = NULL;
4,937✔
172
  int8_t      sver = 0;
4,937✔
173
  int32_t     tlen;
174
  int32_t     dataPos = 0;
4,937✔
175

176
  code = sdbGetRawSoftVer(pRaw, &sver);
4,937✔
177
  TSDB_CHECK_CODE(code, lino, _over);
4,937!
178

179
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
4,937!
180
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
181
    goto _over;
×
182
  }
183

184
  pRow = sdbAllocRow(sizeof(SStreamObj));
4,937✔
185
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
4,937!
186

187
  pStream = sdbGetRowObj(pRow);
4,937✔
188
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
4,937!
189

190
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
4,937!
191

192
  buf = taosMemoryMalloc(tlen + 1);
4,937!
193
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
4,937!
194

195
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
4,937!
196

197
  SDecoder decoder;
198
  tDecoderInit(&decoder, buf, tlen + 1);
4,937✔
199
  code = tDecodeSStreamObj(&decoder, pStream, sver);
4,937✔
200
  tDecoderClear(&decoder);
4,937✔
201

202
  if (code < 0) {
4,937!
203
    tFreeStreamObj(pStream);
×
204
  }
205

206
_over:
4,937✔
207
  taosMemoryFreeClear(buf);
4,937!
208

209
  if (code != TSDB_CODE_SUCCESS) {
4,937!
210
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
211
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
212
    taosMemoryFreeClear(pRow);
×
213

214
    terrno = code;
×
215
    return NULL;
×
216
  } else {
217
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
4,937✔
218
           pStream->checkpointId);
219

220
    terrno = 0;
4,937✔
221
    return pRow;
4,937✔
222
  }
223
}
224

225
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
1,651✔
226
  mTrace("stream:%s, perform insert action", pStream->name);
1,651✔
227
  return 0;
1,651✔
228
}
229

230
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
4,937✔
231
  mTrace("stream:%s, perform delete action", pStream->name);
4,937✔
232
  taosWLockLatch(&pStream->lock);
4,937✔
233
  tFreeStreamObj(pStream);
4,937✔
234
  taosWUnLockLatch(&pStream->lock);
4,937✔
235
  return 0;
4,937✔
236
}
237

238
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
1,962✔
239
  mTrace("stream:%s, perform update action", pOldStream->name);
1,962✔
240
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
1,962✔
241

242
  taosWLockLatch(&pOldStream->lock);
1,962✔
243

244
  pOldStream->status = pNewStream->status;
1,962✔
245
  pOldStream->updateTime = pNewStream->updateTime;
1,962✔
246
  pOldStream->checkpointId = pNewStream->checkpointId;
1,962✔
247
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
1,962✔
248

249
  taosWUnLockLatch(&pOldStream->lock);
1,962✔
250
  return 0;
1,962✔
251
}
252

253
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
6,389✔
254
  int32_t code = 0;
6,389✔
255
  SSdb   *pSdb = pMnode->pSdb;
6,389✔
256
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
6,389✔
257
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
6,389!
258
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
2,551✔
259
  }
260
  return code;
6,389✔
261
}
262

263
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
12,187✔
264
  SSdb *pSdb = pMnode->pSdb;
12,187✔
265
  sdbRelease(pSdb, pStream);
12,187✔
266
}
12,187✔
267

268
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
269
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
270
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
271
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
272
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
273

274
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
1,579✔
275
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
1,579!
276
      pCreate->targetStbFullName[0] == 0) {
1,579!
277
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
278
  }
279
  return TSDB_CODE_SUCCESS;
1,579✔
280
}
281

282
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
1,575✔
283
  pWrapper->nCols = taosArrayGetSize(pFields);
1,575✔
284
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
1,575!
285
  if (NULL == pWrapper->pSchema) {
1,575!
286
    return terrno;
×
287
  }
288

289
  int32_t index = 0;
1,575✔
290
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
61,516✔
291
    SField *pField = (SField *)taosArrayGet(pFields, i);
59,941✔
292
    if (pField == NULL) {
59,941!
293
      return terrno;
×
294
    }
295

296
    if (TSDB_DATA_TYPE_NULL == pField->type) {
59,941!
297
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
298
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
299
    } else {
300
      pWrapper->pSchema[index].type = pField->type;
59,941✔
301
      pWrapper->pSchema[index].bytes = pField->bytes;
59,941✔
302
    }
303
    pWrapper->pSchema[index].colId = index + 1;
59,941✔
304
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
59,941✔
305
    pWrapper->pSchema[index].flags = pField->flags;
59,941✔
306
    index += 1;
59,941✔
307
  }
308

309
  return TSDB_CODE_SUCCESS;
1,575✔
310
}
311

312
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
1,575✔
313
  if (pWrapper->nCols < 2) {
1,575!
314
    return false;
×
315
  }
316
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
60,154✔
317
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
58,586✔
318
      return true;
7✔
319
    }
320
  }
321
  return false;
1,568✔
322
}
323

324
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
1,575✔
325
  SNode      *pAst = NULL;
1,575✔
326
  SQueryPlan *pPlan = NULL;
1,575✔
327
  int32_t     code = 0;
1,575✔
328

329
  mInfo("stream:%s to create", pCreate->name);
1,575!
330
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
1,575✔
331
  pObj->createTime = taosGetTimestampMs();
1,575✔
332
  pObj->updateTime = pObj->createTime;
1,575✔
333
  pObj->version = 1;
1,575✔
334

335
  if (pCreate->smaId > 0) {
1,575✔
336
    pObj->subTableWithoutMd5 = 1;
235✔
337
  }
338

339
  pObj->smaId = pCreate->smaId;
1,575✔
340
  pObj->indexForMultiAggBalance = -1;
1,575✔
341

342
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,575✔
343

344
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
1,575✔
345
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
1,575✔
346

347
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,575✔
348
  pObj->status = 0;
1,575✔
349

350
  pObj->conf.igExpired = pCreate->igExpired;
1,575✔
351
  pObj->conf.trigger = pCreate->triggerType;
1,575✔
352
  pObj->conf.triggerParam = pCreate->maxDelay;
1,575✔
353
  pObj->conf.watermark = pCreate->watermark;
1,575✔
354
  pObj->conf.fillHistory = pCreate->fillHistory;
1,575✔
355
  pObj->deleteMark = pCreate->deleteMark;
1,575✔
356
  pObj->igCheckUpdate = pCreate->igUpdate;
1,575✔
357

358
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
1,575✔
359
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
1,575✔
360
  if (pSourceDb == NULL) {
1,575!
361
    code = terrno;
×
362
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
363
          tstrerror(code));
UNCOV
364
    goto FAIL;
×
365
  }
366

367
  pObj->sourceDbUid = pSourceDb->uid;
1,575✔
368
  mndReleaseDb(pMnode, pSourceDb);
1,575✔
369

370
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
1,575✔
371

372
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
1,575✔
373
  if (pTargetDb == NULL) {
1,575!
374
    code = terrno;
×
375
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
376
           tstrerror(code));
UNCOV
377
    goto FAIL;
×
378
  }
379

380
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
1,575✔
381

382
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
1,575✔
383
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,431✔
384
  } else {
385
    pObj->targetStbUid = pCreate->targetStbUid;
144✔
386
  }
387
  pObj->targetDbUid = pTargetDb->uid;
1,575✔
388
  mndReleaseDb(pMnode, pTargetDb);
1,575✔
389

390
  pObj->sql = pCreate->sql;
1,575✔
391
  pObj->ast = pCreate->ast;
1,575✔
392

393
  pCreate->sql = NULL;
1,575✔
394
  pCreate->ast = NULL;
1,575✔
395

396
  // deserialize ast
397
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
1,575!
UNCOV
398
    goto FAIL;
×
399
  }
400

401
  // create output schema
402
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
1,575!
UNCOV
403
    goto FAIL;
×
404
  }
405

406
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
1,575✔
407
  if (numOfNULL > 0) {
1,575✔
408
    pObj->outputSchema.nCols += numOfNULL;
26✔
409
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
26!
410
    if (!pFullSchema) {
26!
UNCOV
411
      code = terrno;
×
UNCOV
412
      goto FAIL;
×
413
    }
414

415
    int32_t nullIndex = 0;
26✔
416
    int32_t dataIndex = 0;
26✔
417
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
332✔
418
      if (nullIndex >= numOfNULL) {
306!
419
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
420
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
421
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
422
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
UNCOV
423
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
UNCOV
424
        dataIndex++;
×
425
      } else {
426
        SColLocation *pos = NULL;
306✔
427
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
306!
428
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
306✔
429
        }
430

431
        if (pos == NULL) {
306!
UNCOV
432
          mError("invalid null column index, %d", nullIndex);
×
UNCOV
433
          continue;
×
434
        }
435

436
        if (i < pos->slotId) {
306✔
437
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
79✔
438
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
79✔
439
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
79✔
440
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
79✔
441
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
79✔
442
          dataIndex++;
79✔
443
        } else {
444
          pFullSchema[i].bytes = 0;
227✔
445
          pFullSchema[i].colId = pos->colId;
227✔
446
          pFullSchema[i].flags = COL_SET_NULL;
227✔
447
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
227✔
448
          pFullSchema[i].type = pos->type;
227✔
449
          nullIndex++;
227✔
450
        }
451
      }
452
    }
453

454
    taosMemoryFree(pObj->outputSchema.pSchema);
26!
455
    pObj->outputSchema.pSchema = pFullSchema;
26✔
456
  }
457

458
  SPlanContext cxt = {
1,575✔
459
      .pAstRoot = pAst,
460
      .topicQuery = false,
461
      .streamQuery = true,
462
      .triggerType =
463
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
1,575✔
464
      .watermark = pObj->conf.watermark,
1,575✔
465
      .igExpired = pObj->conf.igExpired,
1,575✔
466
      .deleteMark = pObj->deleteMark,
1,575✔
467
      .igCheckUpdate = pObj->igCheckUpdate,
1,575✔
468
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
1,575✔
469
  };
470

471
  // using ast and param to build physical plan
472
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
1,575!
UNCOV
473
    goto FAIL;
×
474
  }
475

476
  // save physcial plan
477
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
1,575!
UNCOV
478
    goto FAIL;
×
479
  }
480

481
  pObj->tagSchema.nCols = pCreate->numOfTags;
1,575✔
482
  if (pCreate->numOfTags) {
1,575✔
483
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
255!
484
    if (pObj->tagSchema.pSchema == NULL) {
255!
UNCOV
485
      code = terrno;
×
UNCOV
486
      goto FAIL;
×
487
    }
488
  }
489

490
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
491
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
3,014✔
492
    SField *pField = taosArrayGet(pCreate->pTags, i);
1,439✔
493
    if (pField == NULL) {
1,439!
UNCOV
494
      continue;
×
495
    }
496

497
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
1,439✔
498
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
1,439✔
499
    pObj->tagSchema.pSchema[i].flags = pField->flags;
1,439✔
500
    pObj->tagSchema.pSchema[i].type = pField->type;
1,439✔
501
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
1,439✔
502
  }
503

504
FAIL:
1,575✔
505
  if (pAst != NULL) nodesDestroyNode(pAst);
1,575!
506
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
1,575!
507
  return code;
1,575✔
508
}
509

510
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
13,046✔
511
  SEncoder encoder;
512
  tEncoderInit(&encoder, NULL, 0);
13,046✔
513

514
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
13,046!
UNCOV
515
    pTask->ver = SSTREAM_TASK_VER;
×
516
  }
517

518
  int32_t code = tEncodeStreamTask(&encoder, pTask);
13,046✔
519
  if (code == -1) {
13,046!
UNCOV
520
    tEncoderClear(&encoder);
×
UNCOV
521
    return TSDB_CODE_INVALID_MSG;
×
522
  }
523

524
  int32_t size = encoder.pos;
13,046✔
525
  int32_t tlen = sizeof(SMsgHead) + size;
13,046✔
526
  tEncoderClear(&encoder);
13,046✔
527

528
  void *buf = taosMemoryCalloc(1, tlen);
13,046!
529
  if (buf == NULL) {
13,046!
UNCOV
530
    return terrno;
×
531
  }
532

533
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
13,046✔
534

535
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
13,046✔
536
  tEncoderInit(&encoder, abuf, size);
13,046✔
537
  code = tEncodeStreamTask(&encoder, pTask);
13,046✔
538
  tEncoderClear(&encoder);
13,046✔
539

540
  if (code != 0) {
13,046!
UNCOV
541
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
UNCOV
542
    taosMemoryFree(buf);
×
UNCOV
543
    return code;
×
544
  }
545

546
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
13,046✔
547
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
548
  if (code) {
13,046!
UNCOV
549
    taosMemoryFree(buf);
×
550
  }
551

552
  return code;
13,046✔
553
}
554

555
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
1,600✔
556
  SStreamTaskIter *pIter = NULL;
1,600✔
557
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,600✔
558
  if (code) {
1,600!
UNCOV
559
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
560
    return code;
×
561
  }
562

563
  while (streamTaskIterNextTask(pIter)) {
10,020✔
564
    SStreamTask *pTask = NULL;
8,420✔
565
    code = streamTaskIterGetCurrent(pIter, &pTask);
8,420✔
566
    if (code) {
8,420!
UNCOV
567
      destroyStreamTaskIter(pIter);
×
UNCOV
568
      return code;
×
569
    }
570

571
    code = mndPersistTaskDeployReq(pTrans, pTask);
8,420✔
572
    if (code) {
8,420!
UNCOV
573
      destroyStreamTaskIter(pIter);
×
UNCOV
574
      return code;
×
575
    }
576
  }
577

578
  destroyStreamTaskIter(pIter);
1,600✔
579

580
  // persistent stream task for already stored ts data
581
  if (pStream->conf.fillHistory) {
1,600✔
582
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
799✔
583

584
    for (int32_t i = 0; i < level; i++) {
2,465✔
585
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
1,666✔
586

587
      int32_t numOfTasks = taosArrayGetSize(pLevel);
1,666✔
588
      for (int32_t j = 0; j < numOfTasks; j++) {
6,292✔
589
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
4,626✔
590
        code = mndPersistTaskDeployReq(pTrans, pTask);
4,626✔
591
        if (code) {
4,626!
UNCOV
592
          return code;
×
593
        }
594
      }
595
    }
596
  }
597

598
  return code;
1,600✔
599
}
600

601
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
1,600✔
602
  int32_t code = 0;
1,600✔
603
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
1,600!
UNCOV
604
    return code;
×
605
  }
606

607
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,600✔
608
}
609

610
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
1,431✔
611
  SStbObj *pStb = NULL;
1,431✔
612
  SDbObj  *pDb = NULL;
1,431✔
613
  int32_t  code = 0;
1,431✔
614
  int32_t  lino = 0;
1,431✔
615

616
  SMCreateStbReq createReq = {0};
1,431✔
617
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,431✔
618
  createReq.numOfColumns = pStream->outputSchema.nCols;
1,431✔
619
  createReq.numOfTags = 1;  // group id
1,431✔
620
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
1,431✔
621
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
1,431!
622

623
  // build fields
624
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
59,743✔
625
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
58,312✔
626
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
58,312!
627

628
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
58,312✔
629
    pField->flags = pStream->outputSchema.pSchema[i].flags;
58,312✔
630
    pField->type = pStream->outputSchema.pSchema[i].type;
58,312✔
631
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
58,312✔
632
    pField->compress = createDefaultColCmprByType(pField->type);
58,312✔
633
  }
634

635
  if (pStream->tagSchema.nCols == 0) {
1,431✔
636
    createReq.numOfTags = 1;
1,176✔
637
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
1,176✔
638
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
1,176!
639

640
    // build tags
641
    SField *pField = taosArrayGet(createReq.pTags, 0);
1,176✔
642
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
1,176!
643

644
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
1,176✔
645
    pField->type = TSDB_DATA_TYPE_UBIGINT;
1,176✔
646
    pField->flags = 0;
1,176✔
647
    pField->bytes = 8;
1,176✔
648
  } else {
649
    createReq.numOfTags = pStream->tagSchema.nCols;
255✔
650
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
255✔
651
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
255!
652

653
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
1,694✔
654
      SField *pField = taosArrayGet(createReq.pTags, i);
1,439✔
655
      if (pField == NULL) {
1,439!
UNCOV
656
        continue;
×
657
      }
658

659
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
1,439✔
660
      pField->flags = pStream->tagSchema.pSchema[i].flags;
1,439✔
661
      pField->type = pStream->tagSchema.pSchema[i].type;
1,439✔
662
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
1,439✔
663
    }
664
  }
665

666
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
1,431!
UNCOV
667
    goto _OVER;
×
668
  }
669

670
  pStb = mndAcquireStb(pMnode, createReq.name);
1,431✔
671
  if (pStb != NULL) {
1,431!
UNCOV
672
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
UNCOV
673
    goto _OVER;
×
674
  }
675

676
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
1,431✔
677
  if (pDb == NULL) {
1,431!
UNCOV
678
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
UNCOV
679
    goto _OVER;
×
680
  }
681

682
  int32_t numOfStbs = -1;
1,431✔
683
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
1,431!
684
    goto _OVER;
×
685
  }
686

687
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
1,431!
UNCOV
688
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
UNCOV
689
    goto _OVER;
×
690
  }
691

692
  SStbObj stbObj = {0};
1,431✔
693

694
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
1,431!
UNCOV
695
    goto _OVER;
×
696
  }
697

698
  stbObj.uid = pStream->targetStbUid;
1,431✔
699

700
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
1,431!
UNCOV
701
    mndFreeStb(&stbObj);
×
UNCOV
702
    goto _OVER;
×
703
  }
704

705
  tFreeSMCreateStbReq(&createReq);
1,431✔
706
  mndFreeStb(&stbObj);
1,431✔
707
  mndReleaseStb(pMnode, pStb);
1,431✔
708
  mndReleaseDb(pMnode, pDb);
1,431✔
709
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
1,431✔
710
  return code;
1,431✔
711

UNCOV
712
_OVER:
×
713
  tFreeSMCreateStbReq(&createReq);
×
UNCOV
714
  mndReleaseStb(pMnode, pStb);
×
715
  mndReleaseDb(pMnode, pDb);
×
716

UNCOV
717
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
718
         tstrerror(code));
UNCOV
719
  return code;
×
720
}
721

722
// 1. stream number check
723
// 2. target stable can not be target table of other existed streams.
724
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
1,575✔
725
  int32_t     numOfStream = 0;
1,575✔
726
  SStreamObj *pStream = NULL;
1,575✔
727
  void       *pIter = NULL;
1,575✔
728

729
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,393✔
730
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
1,819✔
731
      ++numOfStream;
1,341✔
732
    }
733

734
    sdbRelease(pMnode->pSdb, pStream);
1,819✔
735

736
    if (numOfStream > MND_STREAM_MAX_NUM) {
1,819!
UNCOV
737
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
738
             pStreamObj->name);
UNCOV
739
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
740
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
741
    }
742

743
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
1,819✔
744
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
745
             pStreamObj->name);
746
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
747
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
748
    }
749
  }
750

751
  return TSDB_CODE_SUCCESS;
1,574✔
752
}
753

754
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
1,579✔
755
  SMnode     *pMnode = pReq->info.node;
1,579✔
756
  SStreamObj *pStream = NULL;
1,579✔
757
  SStreamObj  streamObj = {0};
1,579✔
758
  char       *sql = NULL;
1,579✔
759
  int32_t     sqlLen = 0;
1,579✔
760
  const char *pMsg = "create stream tasks on dnodes";
1,579✔
761
  int32_t     code = TSDB_CODE_SUCCESS;
1,579✔
762
  int32_t     lino = 0;
1,579✔
763
  STrans     *pTrans = NULL;
1,579✔
764

765
  SCMCreateStreamReq createReq = {0};
1,579✔
766
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
1,579✔
767
  TSDB_CHECK_CODE(code, lino, _OVER);
1,579!
768

769
#ifdef WINDOWS
770
  code = TSDB_CODE_MND_INVALID_PLATFORM;
771
  goto _OVER;
772
#endif
773

774
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
1,579!
775
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
1,579!
UNCOV
776
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
UNCOV
777
    goto _OVER;
×
778
  }
779

780
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
1,579✔
781
  if (pStream != NULL && code == 0) {
1,579!
782
    if (createReq.igExists) {
2✔
783
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
784
      mndReleaseStream(pMnode, pStream);
1✔
785
      tFreeSCMCreateStreamReq(&createReq);
1✔
786
      return code;
1✔
787
    } else {
788
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
789
      goto _OVER;
1✔
790
    }
791
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
1,577!
792
    goto _OVER;
×
793
  }
794

795
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
1,577!
UNCOV
796
    goto _OVER;
×
797
  }
798

799
  if (createReq.sql != NULL) {
1,577!
800
    sql = taosStrdup(createReq.sql);
1,577!
801
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
1,577!
802
  }
803

804
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
1,577✔
805
  if (pSourceDb == NULL) {
1,577!
UNCOV
806
    code = terrno;
×
UNCOV
807
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
808
          tstrerror(code));
UNCOV
809
    goto _OVER;
×
810
  }
811

812
  code = mndCheckForSnode(pMnode, pSourceDb);
1,577✔
813
  mndReleaseDb(pMnode, pSourceDb);
1,577✔
814
  if (code != 0) {
1,577✔
815
    goto _OVER;
2✔
816
  }
817

818
  // build stream obj from request
819
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
1,575!
UNCOV
820
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
UNCOV
821
    goto _OVER;
×
822
  }
823

824
  code = doStreamCheck(pMnode, &streamObj);
1,575✔
825
  TSDB_CHECK_CODE(code, lino, _OVER);
1,575✔
826

827
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
1,574✔
828
  if (pTrans == NULL || code) {
1,574!
UNCOV
829
    goto _OVER;
×
830
  }
831

832
  // create stb for stream
833
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
1,574✔
834
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
1,431!
UNCOV
835
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
UNCOV
836
      mndTransDrop(pTrans);
×
UNCOV
837
      goto _OVER;
×
838
    }
839
  } else {
840
    mDebug("stream:%s no need create stable", createReq.name);
143✔
841
  }
842

843
  // schedule stream task for stream obj
844
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
1,574✔
845
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,574!
UNCOV
846
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
UNCOV
847
    mndTransDrop(pTrans);
×
UNCOV
848
    goto _OVER;
×
849
  }
850

851
  // add stream to trans
852
  code = mndPersistStream(pTrans, &streamObj);
1,574✔
853
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,574!
UNCOV
854
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
UNCOV
855
    mndTransDrop(pTrans);
×
856
    goto _OVER;
×
857
  }
858

859
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
1,574!
UNCOV
860
    mndTransDrop(pTrans);
×
861
    goto _OVER;
×
862
  }
863

864
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
1,574!
UNCOV
865
    mndTransDrop(pTrans);
×
UNCOV
866
    goto _OVER;
×
867
  }
868

869
  // add into buffer firstly
870
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
871
  streamMutexLock(&execInfo.lock);
1,574✔
872
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
1,574✔
873
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
1,574✔
874
  streamMutexUnlock(&execInfo.lock);
1,574✔
875

876
  // execute creation
877
  code = mndTransPrepare(pMnode, pTrans);
1,574✔
878
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,574!
UNCOV
879
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
880
    mndTransDrop(pTrans);
×
UNCOV
881
    goto _OVER;
×
882
  }
883

884
  mndTransDrop(pTrans);
1,574✔
885

886
  SName dbname = {0};
1,574✔
887
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,574✔
888
  if (code) {
1,574!
UNCOV
889
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
UNCOV
890
    goto _OVER;
×
891
  }
892

893
  SName name = {0};
1,574✔
894
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
1,574✔
895
  if (code) {
1,574!
UNCOV
896
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
UNCOV
897
    goto _OVER;
×
898
  }
899

900
  // reuse this function for stream
901
  if (sql != NULL && sqlLen > 0) {
1,574!
UNCOV
902
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
903
  } else {
904
    char detail[1000] = {0};
1,574✔
905
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
1,574✔
906
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
1,574✔
907
  }
908

909
_OVER:
1,578✔
910
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,578!
911
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
4!
912
  } else {
913
    mDebug("stream:%s create stream completed", createReq.name);
1,574✔
914
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,574✔
915
  }
916

917
  mndReleaseStream(pMnode, pStream);
1,578✔
918
  tFreeSCMCreateStreamReq(&createReq);
1,578✔
919
  tFreeStreamObj(&streamObj);
1,578✔
920

921
  if (sql != NULL) {
1,578✔
922
    taosMemoryFreeClear(sql);
1,577!
923
  }
924

925
  return code;
1,578✔
926
}
927

928
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
UNCOV
929
  SMnode          *pMnode = pReq->info.node;
×
930
  SStreamObj      *pStream = NULL;
×
931
  int32_t          code = 0;
×
UNCOV
932
  SMPauseStreamReq pauseReq = {0};
×
933

934
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
935
    return TSDB_CODE_INVALID_MSG;
×
936
  }
937

938
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
UNCOV
939
  if (pStream == NULL || code != 0) {
×
940
    if (pauseReq.igNotExists) {
×
941
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
UNCOV
942
      return 0;
×
943
    } else {
UNCOV
944
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
945
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
946
    }
947
  }
948

UNCOV
949
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
UNCOV
950
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
UNCOV
951
    sdbRelease(pMnode->pSdb, pStream);
×
952
    return code;
×
953
  }
954

955
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
956
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
UNCOV
957
  if (code) {
×
958
    sdbRelease(pMnode->pSdb, pStream);
×
959
    return code;
×
960
  }
961

962
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
UNCOV
963
  if (updated) {
×
UNCOV
964
    mError("tasks are not ready for restart, node update detected");
×
965
    sdbRelease(pMnode->pSdb, pStream);
×
966
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
967
  }
968

969
  STrans *pTrans = NULL;
×
970
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
971
                       &pTrans);
UNCOV
972
  if (pTrans == NULL || code) {
×
973
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
974
    sdbRelease(pMnode->pSdb, pStream);
×
975
    return code;
×
976
  }
977

UNCOV
978
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
UNCOV
979
  if (code) {
×
UNCOV
980
    sdbRelease(pMnode->pSdb, pStream);
×
981
    mndTransDrop(pTrans);
×
982
    return code;
×
983
  }
984

985
  // if nodeUpdate happened, not send pause trans
986
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
UNCOV
987
  if (code) {
×
UNCOV
988
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
989
    sdbRelease(pMnode->pSdb, pStream);
×
990
    mndTransDrop(pTrans);
×
991
    return code;
×
992
  }
993

994
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
995
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
996
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
997
    sdbRelease(pMnode->pSdb, pStream);
×
998
    mndTransDrop(pTrans);
×
UNCOV
999
    return code;
×
1000
  }
1001

UNCOV
1002
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1003
  mndTransDrop(pTrans);
×
1004

UNCOV
1005
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1006
}
1007

1008
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
756✔
1009
  SStreamObj *pStream = NULL;
756✔
1010
  void       *pIter = NULL;
756✔
1011
  SSdb       *pSdb = pMnode->pSdb;
756✔
1012
  int64_t     maxChkptId = 0;
756✔
1013

1014
  while (1) {
1015
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,337✔
1016
    if (pIter == NULL) break;
2,337✔
1017

1018
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
1,581✔
1019
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
1,581✔
1020
           pStream->checkpointId);
1021
    sdbRelease(pSdb, pStream);
1,581✔
1022
  }
1023

1024
  {  // check the max checkpoint id from all vnodes.
1025
    int64_t maxCheckpointId = -1;
756✔
1026
    if (lock) {
756✔
1027
      streamMutexLock(&execInfo.lock);
38✔
1028
    }
1029

1030
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
9,540✔
1031
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
8,784✔
1032
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
8,784✔
1033
      if (p == NULL || pEntry == NULL) {
8,784!
UNCOV
1034
        continue;
×
1035
      }
1036

1037
      if (pEntry->checkpointInfo.failed) {
8,784!
UNCOV
1038
        continue;
×
1039
      }
1040

1041
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
8,784✔
1042
        maxCheckpointId = pEntry->checkpointInfo.latestId;
818✔
1043
      }
1044
    }
1045

1046
    if (lock) {
756✔
1047
      streamMutexUnlock(&execInfo.lock);
38✔
1048
    }
1049

1050
    if (maxCheckpointId > maxChkptId) {
756!
UNCOV
1051
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1052
             maxCheckpointId);
UNCOV
1053
      maxChkptId = maxCheckpointId;
×
1054
    }
1055
  }
1056

1057
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
756✔
1058
  return maxChkptId + 1;
756✔
1059
}
1060

1061
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
804✔
1062
                                               int8_t mndTrigger, bool lock) {
1063
  int32_t code = TSDB_CODE_SUCCESS;
804✔
1064
  bool    conflict = false;
804✔
1065
  int64_t ts = taosGetTimestampMs();
804✔
1066
  STrans *pTrans = NULL;
804✔
1067

1068
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
804!
UNCOV
1069
    return code;
×
1070
  }
1071

1072
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
804✔
1073
  if (code) {
804✔
1074
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
1!
1075
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
1076
    goto _ERR;
1✔
1077
  }
1078

1079
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
803✔
1080
                       "gen checkpoint for stream", &pTrans);
1081
  if (code) {
803!
UNCOV
1082
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1083
           tstrerror(code));
1084
    goto _ERR;
×
1085
  }
1086

1087
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
803✔
1088
  if (code) {
803!
UNCOV
1089
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
UNCOV
1090
    goto _ERR;
×
1091
  }
1092

1093
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
803✔
1094

1095
  taosWLockLatch(&pStream->lock);
803✔
1096
  pStream->currentTick = 1;
803✔
1097

1098
  // 1. redo action: broadcast checkpoint source msg for all source vg
1099
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
803✔
1100
  for (int32_t i = 0; i < totalLevel; i++) {
2,456✔
1101
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
1,653✔
1102
    SStreamTask *p = taosArrayGetP(pLevel, 0);
1,653✔
1103

1104
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
1,653✔
1105
      int32_t sz = taosArrayGetSize(pLevel);
803✔
1106
      for (int32_t j = 0; j < sz; j++) {
3,034✔
1107
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
2,231✔
1108
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
2,231✔
1109

1110
        if (code != TSDB_CODE_SUCCESS) {
2,231!
UNCOV
1111
          taosWUnLockLatch(&pStream->lock);
×
UNCOV
1112
          goto _ERR;
×
1113
        }
1114
      }
1115
    }
1116
  }
1117

1118
  // 2. reset tick
1119
  pStream->checkpointId = checkpointId;
803✔
1120
  pStream->checkpointFreq = taosGetTimestampMs();
803✔
1121
  pStream->currentTick = 0;
803✔
1122

1123
  // 3. commit log: stream checkpoint info
1124
  pStream->version = pStream->version + 1;
803✔
1125
  taosWUnLockLatch(&pStream->lock);
803✔
1126

1127
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
803!
1128
    goto _ERR;
×
1129
  }
1130

1131
  code = mndTransPrepare(pMnode, pTrans);
803✔
1132
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
803!
1133
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
69!
1134
  } else {
1135
    code = TSDB_CODE_ACTION_IN_PROGRESS;
734✔
1136
  }
1137

1138
_ERR:
804✔
1139
  mndTransDrop(pTrans);
804✔
1140
  return code;
804✔
1141
}
1142

1143
int32_t extractStreamNodeList(SMnode *pMnode) {
1,982✔
1144
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,982✔
1145
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
710✔
1146
    if (code) {
710!
UNCOV
1147
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
1148
      return code;
×
1149
    }
1150
  }
1151

1152
  return taosArrayGetSize(execInfo.pNodeList);
1,982✔
1153
}
1154

1155
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,096✔
1156
  bool ready = true;
1,096✔
1157
  if (mndStreamNodeIsUpdated(pMnode)) {
1,096✔
1158
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
30✔
1159
  }
1160

1161
  streamMutexLock(&execInfo.lock);
1,066✔
1162
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,066✔
1163
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
710✔
1164
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
710!
UNCOV
1165
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
1166
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
UNCOV
1167
      return TSDB_CODE_FAILED;
×
1168
    }
1169
  }
1170

1171
  SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
1,066✔
1172

1173
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
3,818✔
1174
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
2,886✔
1175
    if (p == NULL) {
2,886!
1176
      continue;
×
1177
    }
1178

1179
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
2,886✔
1180
    if (pEntry == NULL) {
2,886!
1181
      continue;
×
1182
    }
1183

1184
    if (pEntry->status == TASK_STATUS__STOP) {
2,886✔
1185
      for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
73!
1186
        STaskId *pId = taosArrayGet(pInvalidList, j);
×
1187
        if (pId == NULL) {
×
1188
          continue;
×
1189
        }
1190

1191
        if (pEntry->id.streamId == pId->streamId) {
×
UNCOV
1192
          void *px = taosArrayPush(pInvalidList, &pEntry->id);
×
UNCOV
1193
          if (px == NULL) {
×
UNCOV
1194
            mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
1195
          }
UNCOV
1196
          break;
×
1197
        }
1198
      }
1199
    }
1200

1201
    if (pEntry->status != TASK_STATUS__READY) {
2,886✔
1202
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
124✔
1203
             (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1204
      ready = false;
124✔
1205
      break;
124✔
1206
    }
1207

1208
    if (pEntry->hTaskId != 0) {
2,762✔
1209
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
10✔
1210
             " exists, checkpoint not issued",
1211
             pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1212
             pEntry->hTaskId);
1213
      ready = false;
10✔
1214
      break;
10✔
1215
    }
1216
  }
1217

1218
  removeTasksInBuf(pInvalidList, &execInfo);
1,066✔
1219
  taosArrayDestroy(pInvalidList);
1,066✔
1220

1221
  streamMutexUnlock(&execInfo.lock);
1,066✔
1222
  return ready ? 0 : -1;
1,066✔
1223
}
1224

1225
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
351✔
1226
  int64_t ts = -1;
351✔
1227
  int32_t taskId = -1;
351✔
1228

1229
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
9,586✔
1230
    STaskId          *p = taosArrayGet(pTaskList, i);
9,235✔
1231
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
9,235✔
1232
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
9,235!
1233
      continue;
7,614✔
1234
    }
1235

1236
    if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
1,621!
1237
      ts = pEntry->startTime;
800✔
1238
      taskId = pEntry->id.taskId;
800✔
1239
    }
1240
  }
1241

1242
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
351✔
1243
  return ts;
351✔
1244
}
1245

1246
typedef struct {
1247
  int64_t streamId;
1248
  int64_t duration;
1249
} SCheckpointInterval;
1250

1251
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
119✔
1252
  const SCheckpointInterval *pInt1 = p1;
119✔
1253
  const SCheckpointInterval *pInt2 = p2;
119✔
1254
  if (pInt1->duration == pInt2->duration) {
119✔
1255
    return 0;
47✔
1256
  }
1257

1258
  return pInt1->duration > pInt2->duration ? -1 : 1;
72✔
1259
}
1260

1261
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,096✔
1262
  SMnode     *pMnode = pReq->info.node;
1,096✔
1263
  SSdb       *pSdb = pMnode->pSdb;
1,096✔
1264
  void       *pIter = NULL;
1,096✔
1265
  SStreamObj *pStream = NULL;
1,096✔
1266
  int32_t     code = 0;
1,096✔
1267
  int32_t     numOfCheckpointTrans = 0;
1,096✔
1268

1269
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,096✔
1270
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
164✔
1271
  }
1272

1273
  SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
932✔
1274
  if (pList == NULL) {
932!
UNCOV
1275
    return terrno;
×
1276
  }
1277

1278
  int64_t now = taosGetTimestampMs();
932✔
1279

1280
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
1,450✔
1281
    int64_t duration = now - pStream->checkpointFreq;
518✔
1282
    if (duration < tsStreamCheckpointInterval * 1000) {
518✔
1283
      sdbRelease(pSdb, pStream);
167✔
1284
      continue;
403✔
1285
    }
1286

1287
    streamMutexLock(&execInfo.lock);
351✔
1288
    int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
351✔
1289
    if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
351✔
1290
      streamMutexUnlock(&execInfo.lock);
236✔
1291
      sdbRelease(pSdb, pStream);
236✔
1292
      continue;
236✔
1293
    }
1294
    streamMutexUnlock(&execInfo.lock);
115✔
1295

1296
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
115✔
1297
    void               *p = taosArrayPush(pList, &in);
115✔
1298
    if (p) {
115!
1299
      int32_t currentSize = taosArrayGetSize(pList);
115✔
1300
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
115✔
1301
             "s), concurrently launch threshold:%d",
1302
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1303
             tsMaxConcurrentCheckpoint);
1304
    } else {
UNCOV
1305
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1306
    }
1307
    sdbRelease(pSdb, pStream);
115✔
1308
  }
1309

1310
  int32_t size = taosArrayGetSize(pList);
932✔
1311
  if (size == 0) {
932✔
1312
    taosArrayDestroy(pList);
894✔
1313
    return code;
894✔
1314
  }
1315

1316
  taosArraySort(pList, streamWaitComparFn);
38✔
1317
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
38✔
1318
  if (code) {
38!
UNCOV
1319
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
UNCOV
1320
    taosArrayDestroy(pList);
×
1321
    return code;
×
1322
  }
1323

1324
  int32_t numOfQual = taosArrayGetSize(pList);
38✔
1325
  if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
38!
1326
    mDebug(
×
1327
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1328
        "checkpoint trans are not allowed, wait for 30s",
1329
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
UNCOV
1330
    taosArrayDestroy(pList);
×
UNCOV
1331
    return code;
×
1332
  }
1333

1334
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
38✔
1335
  mDebug(
38✔
1336
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1337
      "concurrent trans threshold:%d",
1338
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1339

1340
  int32_t started = 0;
38✔
1341
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
38✔
1342

1343
  for (int32_t i = 0; i < numOfQual; ++i) {
112✔
1344
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
86✔
1345
    if (pCheckpointInfo == NULL) {
86!
UNCOV
1346
      continue;
×
1347
    }
1348

1349
    SStreamObj *p = NULL;
86✔
1350
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
86✔
1351
    if (p != NULL && code == 0) {
86!
1352
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
86✔
1353
      sdbRelease(pSdb, p);
86✔
1354

1355
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
86!
1356
        started += 1;
16✔
1357

1358
        if (started >= capacity) {
16✔
1359
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
12✔
1360
                 (started + numOfCheckpointTrans));
1361
          break;
12✔
1362
        }
1363
      } else {
1364
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
70!
1365
      }
1366
    }
1367
  }
1368

1369
  taosArrayDestroy(pList);
38✔
1370
  return code;
38✔
1371
}
1372

1373
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
1,374✔
1374
  SMnode     *pMnode = pReq->info.node;
1,374✔
1375
  SStreamObj *pStream = NULL;
1,374✔
1376
  int32_t     code = 0;
1,374✔
1377

1378
  SMDropStreamReq dropReq = {0};
1,374✔
1379
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
1,374!
UNCOV
1380
    mError("invalid drop stream msg recv, discarded");
×
UNCOV
1381
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1382
    TAOS_RETURN(code);
×
1383
  }
1384

1385
  mDebug("recv drop stream:%s msg", dropReq.name);
1,374✔
1386

1387
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
1,374✔
1388
  if (pStream == NULL || code != 0) {
1,374!
1389
    if (dropReq.igNotExists) {
141✔
1390
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
131!
1391
      sdbRelease(pMnode->pSdb, pStream);
131✔
1392
      tFreeMDropStreamReq(&dropReq);
131✔
1393
      return 0;
131✔
1394
    } else {
1395
      mError("stream:%s not exist failed to drop it", dropReq.name);
10!
1396
      tFreeMDropStreamReq(&dropReq);
10✔
1397
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
10✔
1398
    }
1399
  }
1400

1401
  if (pStream->smaId != 0) {
1,233✔
1402
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
198!
1403

1404
    void    *pIter = NULL;
198✔
1405
    SSmaObj *pSma = NULL;
198✔
1406
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
198✔
1407
    while (pIter) {
332✔
1408
      if (pSma && pSma->uid == pStream->smaId) {
139!
1409
        sdbRelease(pMnode->pSdb, pSma);
5✔
1410
        sdbRelease(pMnode->pSdb, pStream);
5✔
1411

1412
        sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1413
        tFreeMDropStreamReq(&dropReq);
5✔
1414
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
5✔
1415

1416
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
5!
1417
               dropReq.name, pStream->uid, tstrerror(terrno));
1418
        TAOS_RETURN(code);
5✔
1419
      }
1420

1421
      if (pSma) {
134!
1422
        sdbRelease(pMnode->pSdb, pSma);
134✔
1423
      }
1424

1425
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
134✔
1426
    }
1427
  }
1428

1429
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
1,228!
UNCOV
1430
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1431
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1432
    return -1;
×
1433
  }
1434

1435
  // check if it is conflict with other trans in both sourceDb and targetDb.
1436
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
1,228✔
1437
  if (code) {
1,228!
UNCOV
1438
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1439
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1440
    return code;
×
1441
  }
1442

1443
  STrans *pTrans = NULL;
1,228✔
1444
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
1,228✔
1445
  if (pTrans == NULL || code) {
1,228!
UNCOV
1446
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
UNCOV
1447
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1448
    tFreeMDropStreamReq(&dropReq);
×
1449
    TAOS_RETURN(code);
×
1450
  }
1451

1452
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
1,228✔
1453
  if (code) {
1,228!
UNCOV
1454
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
UNCOV
1455
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1456
    mndTransDrop(pTrans);
×
UNCOV
1457
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1458
    TAOS_RETURN(code);
×
1459
  }
1460

1461
  // drop all tasks
1462
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
1,228✔
1463
  if (code) {
1,228!
UNCOV
1464
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
UNCOV
1465
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1466
    mndTransDrop(pTrans);
×
UNCOV
1467
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1468
    TAOS_RETURN(code);
×
1469
  }
1470

1471
  // drop stream
1472
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
1,228✔
1473
  if (code) {
1,228!
UNCOV
1474
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1475
    mndTransDrop(pTrans);
×
UNCOV
1476
    tFreeMDropStreamReq(&dropReq);
×
1477
    TAOS_RETURN(code);
×
1478
  }
1479

1480
  code = mndTransPrepare(pMnode, pTrans);
1,228✔
1481
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,228!
UNCOV
1482
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1483
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1484
    mndTransDrop(pTrans);
×
UNCOV
1485
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1486
    TAOS_RETURN(code);
×
1487
  }
1488

1489
  // kill the related checkpoint trans
1490
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
1,228✔
1491
  if (transId != 0) {
1,228!
UNCOV
1492
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
UNCOV
1493
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1494
  }
1495

1496
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
1,228✔
1497
         pStream->uid, transId);
1498

1499
  removeStreamTasksInBuf(pStream, &execInfo);
1,228✔
1500

1501
  SName name = {0};
1,228✔
1502
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,228✔
1503
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
1,228✔
1504

1505
  sdbRelease(pMnode->pSdb, pStream);
1,228✔
1506
  mndTransDrop(pTrans);
1,228✔
1507
  tFreeMDropStreamReq(&dropReq);
1,228✔
1508

1509
  if (code == 0) {
1,228✔
1510
    return TSDB_CODE_ACTION_IN_PROGRESS;
1,218✔
1511
  } else {
1512
    TAOS_RETURN(code);
10✔
1513
  }
1514
}
1515

1516
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,577✔
1517
  SSdb   *pSdb = pMnode->pSdb;
1,577✔
1518
  void   *pIter = NULL;
1,577✔
1519
  int32_t code = 0;
1,577✔
1520

1521
  while (1) {
149✔
1522
    SStreamObj *pStream = NULL;
1,726✔
1523
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
1,726✔
1524
    if (pIter == NULL) break;
1,726✔
1525

1526
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
150✔
1527
      if (pStream->sourceDbUid != pStream->targetDbUid) {
80✔
1528
        sdbRelease(pSdb, pStream);
1✔
1529
        sdbCancelFetch(pSdb, pIter);
1✔
1530
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1531
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1532
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1533
      } else {
1534
        // kill the related checkpoint trans
1535
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
79✔
1536
        if (transId != 0) {
79!
UNCOV
1537
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
UNCOV
1538
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1539
        }
1540

1541
        // drop the stream obj in execInfo
1542
        removeStreamTasksInBuf(pStream, &execInfo);
79✔
1543

1544
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
79✔
1545
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
79!
UNCOV
1546
          sdbRelease(pSdb, pStream);
×
UNCOV
1547
          sdbCancelFetch(pSdb, pIter);
×
UNCOV
1548
          return code;
×
1549
        }
1550
      }
1551
    }
1552

1553
    sdbRelease(pSdb, pStream);
149✔
1554
  }
1555

1556
  return 0;
1,576✔
1557
}
1558

1559
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,238✔
1560
  SMnode     *pMnode = pReq->info.node;
11,238✔
1561
  SSdb       *pSdb = pMnode->pSdb;
11,238✔
1562
  int32_t     numOfRows = 0;
11,238✔
1563
  SStreamObj *pStream = NULL;
11,238✔
1564
  int32_t     code = 0;
11,238✔
1565

1566
  while (numOfRows < rows) {
44,380!
1567
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
44,381✔
1568
    if (pShow->pIter == NULL) break;
44,398✔
1569

1570
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
33,143✔
1571
    if (code == 0) {
33,127!
1572
      numOfRows++;
33,129✔
1573
    }
1574
    sdbRelease(pSdb, pStream);
33,127✔
1575
  }
1576

1577
  pShow->numOfRows += numOfRows;
11,254✔
1578
  return numOfRows;
11,254✔
1579
}
1580

UNCOV
1581
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
UNCOV
1582
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1583
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
1584
}
×
1585

1586
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
19,700✔
1587
  SMnode     *pMnode = pReq->info.node;
19,700✔
1588
  SSdb       *pSdb = pMnode->pSdb;
19,700✔
1589
  int32_t     numOfRows = 0;
19,700✔
1590
  SStreamObj *pStream = NULL;
19,700✔
1591
  int32_t     code = 0;
19,700✔
1592

1593
  streamMutexLock(&execInfo.lock);
19,700✔
1594
  mndInitStreamExecInfo(pMnode, &execInfo);
19,717✔
1595
  streamMutexUnlock(&execInfo.lock);
19,717✔
1596

1597
  while (numOfRows < rowsCapacity) {
78,087✔
1598
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
78,047✔
1599
    if (pShow->pIter == NULL) {
78,053✔
1600
      break;
19,677✔
1601
    }
1602

1603
    // lock
1604
    taosRLockLatch(&pStream->lock);
58,376✔
1605

1606
    int32_t count = mndGetNumOfStreamTasks(pStream);
58,376✔
1607
    if (numOfRows + count > rowsCapacity) {
58,359✔
1608
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
30✔
1609
      if (code) {
30!
UNCOV
1610
        mError("failed to prepare the result block buffer, quit return value");
×
UNCOV
1611
        taosRUnLockLatch(&pStream->lock);
×
UNCOV
1612
        sdbRelease(pSdb, pStream);
×
UNCOV
1613
        continue;
×
1614
      }
1615
    }
1616

1617
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
58,359✔
1618
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
58,359✔
1619
    if (pSourceDb != NULL) {
58,348!
1620
      precision = pSourceDb->cfg.precision;
58,362✔
1621
      mndReleaseDb(pMnode, pSourceDb);
58,362✔
1622
    }
1623

1624
    // add row for each task
1625
    SStreamTaskIter *pIter = NULL;
58,364✔
1626
    code = createStreamTaskIter(pStream, &pIter);
58,364✔
1627
    if (code) {
58,376!
UNCOV
1628
      taosRUnLockLatch(&pStream->lock);
×
UNCOV
1629
      sdbRelease(pSdb, pStream);
×
UNCOV
1630
      mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
1631
      continue;
×
1632
    }
1633

1634
    while (streamTaskIterNextTask(pIter)) {
257,673✔
1635
      SStreamTask *pTask = NULL;
199,853✔
1636
      code = streamTaskIterGetCurrent(pIter, &pTask);
199,853✔
1637
      if (code) {
199,879!
UNCOV
1638
        destroyStreamTaskIter(pIter);
×
UNCOV
1639
        break;
×
1640
      }
1641

1642
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
199,879✔
1643
      if (code == TSDB_CODE_SUCCESS) {
199,297!
1644
        numOfRows++;
199,307✔
1645
      }
1646
    }
1647

1648
    pBlock->info.rows = numOfRows;
57,138✔
1649

1650
    destroyStreamTaskIter(pIter);
57,138✔
1651
    taosRUnLockLatch(&pStream->lock);
58,313✔
1652

1653
    sdbRelease(pSdb, pStream);
58,359✔
1654
  }
1655

1656
  pShow->numOfRows += numOfRows;
19,717✔
1657
  return numOfRows;
19,717✔
1658
}
1659

UNCOV
1660
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
UNCOV
1661
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1662
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
1663
}
×
1664

1665
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
690✔
1666
  SMnode     *pMnode = pReq->info.node;
690✔
1667
  SStreamObj *pStream = NULL;
690✔
1668
  int32_t     code = 0;
690✔
1669

1670
  SMPauseStreamReq pauseReq = {0};
690✔
1671
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
690!
UNCOV
1672
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1673
  }
1674

1675
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
690✔
1676
  if (pStream == NULL || code != 0) {
690!
1677
    if (pauseReq.igNotExists) {
359✔
1678
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
106!
1679
      return 0;
106✔
1680
    } else {
1681
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
253!
1682
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
253✔
1683
    }
1684
  }
1685

1686
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
331!
1687

1688
  if (pStream->status == STREAM_STATUS__PAUSE) {
331!
1689
    sdbRelease(pMnode->pSdb, pStream);
×
1690
    return 0;
×
1691
  }
1692

1693
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
331!
UNCOV
1694
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1695
    return code;
×
1696
  }
1697

1698
  // check if it is conflict with other trans in both sourceDb and targetDb.
1699
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
331✔
1700
  if (code) {
331!
UNCOV
1701
    sdbRelease(pMnode->pSdb, pStream);
×
1702
    TAOS_RETURN(code);
×
1703
  }
1704

1705
  bool updated = mndStreamNodeIsUpdated(pMnode);
331✔
1706
  if (updated) {
331!
UNCOV
1707
    mError("tasks are not ready for pause, node update detected");
×
UNCOV
1708
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1709
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1710
  }
1711

1712
  {  // check for tasks, if tasks are not ready, not allowed to pause
1713
    bool found = false;
331✔
1714
    bool readyToPause = true;
331✔
1715
    streamMutexLock(&execInfo.lock);
331✔
1716

1717
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
4,884✔
1718
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
4,553✔
1719
      if (p == NULL) {
4,553!
1720
        continue;
×
1721
      }
1722

1723
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
4,553✔
1724
      if (pEntry == NULL) {
4,553!
UNCOV
1725
        continue;
×
1726
      }
1727

1728
      if (pEntry->id.streamId != pStream->uid) {
4,553✔
1729
        continue;
2,945✔
1730
      }
1731

1732
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
1,608!
1733
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
180!
1734
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1735
        readyToPause = false;
180✔
1736
      }
1737

1738
      found = true;
1,608✔
1739
    }
1740

1741
    streamMutexUnlock(&execInfo.lock);
331✔
1742
    if (!found) {
331!
UNCOV
1743
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
UNCOV
1744
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1745
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1746
    }
1747

1748
    if (!readyToPause) {
331✔
1749
      mError("stream:%s task not ready for pause yet", pauseReq.name);
44!
1750
      sdbRelease(pMnode->pSdb, pStream);
44✔
1751
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
44✔
1752
    }
1753
  }
1754

1755
  STrans *pTrans = NULL;
287✔
1756
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
287✔
1757
  if (pTrans == NULL || code) {
287!
UNCOV
1758
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1759
    sdbRelease(pMnode->pSdb, pStream);
×
1760
    return code;
×
1761
  }
1762

1763
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
287✔
1764
  if (code) {
287!
UNCOV
1765
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1766
    mndTransDrop(pTrans);
×
UNCOV
1767
    return code;
×
1768
  }
1769

1770
  // if nodeUpdate happened, not send pause trans
1771
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
287✔
1772
  if (code) {
287!
UNCOV
1773
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1774
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1775
    mndTransDrop(pTrans);
×
UNCOV
1776
    return code;
×
1777
  }
1778

1779
  // pause stream
1780
  taosWLockLatch(&pStream->lock);
287✔
1781
  pStream->status = STREAM_STATUS__PAUSE;
287✔
1782
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
287✔
1783
  if (code) {
287!
UNCOV
1784
    taosWUnLockLatch(&pStream->lock);
×
UNCOV
1785
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1786
    mndTransDrop(pTrans);
×
UNCOV
1787
    return code;
×
1788
  }
1789

1790
  taosWUnLockLatch(&pStream->lock);
287✔
1791

1792
  code = mndTransPrepare(pMnode, pTrans);
287✔
1793
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
287!
UNCOV
1794
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1795
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1796
    mndTransDrop(pTrans);
×
UNCOV
1797
    return code;
×
1798
  }
1799

1800
  sdbRelease(pMnode->pSdb, pStream);
287✔
1801
  mndTransDrop(pTrans);
287✔
1802

1803
  return TSDB_CODE_ACTION_IN_PROGRESS;
287✔
1804
}
1805

1806
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
750✔
1807
  SMnode     *pMnode = pReq->info.node;
750✔
1808
  SStreamObj *pStream = NULL;
750✔
1809
  int32_t     code = 0;
750✔
1810

1811
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
750!
1812
    return code;
×
1813
  }
1814

1815
  SMResumeStreamReq resumeReq = {0};
750✔
1816
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
750!
UNCOV
1817
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1818
  }
1819

1820
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
750✔
1821
  if (pStream == NULL || code != 0) {
750!
1822
    if (resumeReq.igNotExists) {
212✔
1823
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
211!
1824
      sdbRelease(pMnode->pSdb, pStream);
211✔
1825
      return 0;
211✔
1826
    } else {
1827
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1828
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1829
    }
1830
  }
1831

1832
  if (pStream->status != STREAM_STATUS__PAUSE) {
538✔
1833
    sdbRelease(pMnode->pSdb, pStream);
252✔
1834
    return 0;
252✔
1835
  }
1836

1837
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
286!
1838
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
286!
UNCOV
1839
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1840
    return -1;
×
1841
  }
1842

1843
  // check if it is conflict with other trans in both sourceDb and targetDb.
1844
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
286✔
1845
  if (code) {
286!
UNCOV
1846
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1847
    return code;
×
1848
  }
1849

1850
  STrans *pTrans = NULL;
286✔
1851
  code =
1852
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
286✔
1853
  if (pTrans == NULL || code) {
286!
UNCOV
1854
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
UNCOV
1855
    sdbRelease(pMnode->pSdb, pStream);
×
1856
    return code;
×
1857
  }
1858

1859
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
286✔
1860
  if (code) {
286!
UNCOV
1861
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1862
    mndTransDrop(pTrans);
×
UNCOV
1863
    return code;
×
1864
  }
1865

1866
  // set the resume action
1867
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
286✔
1868
  if (code) {
286!
UNCOV
1869
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
UNCOV
1870
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1871
    mndTransDrop(pTrans);
×
UNCOV
1872
    return code;
×
1873
  }
1874

1875
  // resume stream
1876
  taosWLockLatch(&pStream->lock);
286✔
1877
  pStream->status = STREAM_STATUS__NORMAL;
286✔
1878
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
286!
UNCOV
1879
    taosWUnLockLatch(&pStream->lock);
×
1880

UNCOV
1881
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1882
    mndTransDrop(pTrans);
×
UNCOV
1883
    return code;
×
1884
  }
1885

1886
  taosWUnLockLatch(&pStream->lock);
286✔
1887
  code = mndTransPrepare(pMnode, pTrans);
286✔
1888
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
286!
UNCOV
1889
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1890
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1891
    mndTransDrop(pTrans);
×
UNCOV
1892
    return code;
×
1893
  }
1894

1895
  sdbRelease(pMnode->pSdb, pStream);
286✔
1896
  mndTransDrop(pTrans);
286✔
1897

1898
  return TSDB_CODE_ACTION_IN_PROGRESS;
286✔
1899
}
1900

1901
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
9✔
1902
  SSdb       *pSdb = pMnode->pSdb;
9✔
1903
  SStreamObj *pStream = NULL;
9✔
1904
  void       *pIter = NULL;
9✔
1905
  STrans     *pTrans = NULL;
9✔
1906
  int32_t     code = 0;
9✔
1907

1908
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
1909
  while (1) {
1910
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
18✔
1911
    if (pIter == NULL) {
18✔
1912
      break;
9✔
1913
    }
1914

1915
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
9✔
1916
    sdbRelease(pSdb, pStream);
9✔
1917

1918
    if (code) {
9!
UNCOV
1919
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
UNCOV
1920
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1921
      return code;
×
1922
    }
1923
  }
1924

1925
  while (1) {
1926
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
18✔
1927
    if (pIter == NULL) {
18✔
1928
      break;
9✔
1929
    }
1930

1931
    // here create only one trans
1932
    if (pTrans == NULL) {
9!
1933
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
9✔
1934
                           "update task epsets", &pTrans);
1935
      if (pTrans == NULL || code) {
9!
UNCOV
1936
        sdbRelease(pSdb, pStream);
×
UNCOV
1937
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1938
        return terrno = code;
×
1939
      }
1940
    }
1941

1942
    if (!includeAllNodes) {
9!
1943
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
9✔
1944
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
9✔
1945
      if (p1 == NULL && p2 == NULL) {
9!
UNCOV
1946
        mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
×
UNCOV
1947
        sdbRelease(pSdb, pStream);
×
UNCOV
1948
        continue;
×
1949
      }
1950
    }
1951

1952
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
9✔
1953
           pStream->name, pTrans->id);
1954

1955
    // NOTE: for each stream, we register one trans entry for task update
1956
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
9✔
1957
    if (code) {
9!
UNCOV
1958
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
1959
    }
1960

1961
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
9✔
1962

1963
    // todo: not continue, drop all and retry again
1964
    if (code != TSDB_CODE_SUCCESS) {
9!
UNCOV
1965
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
1966
             tstrerror(code));
UNCOV
1967
      sdbRelease(pSdb, pStream);
×
UNCOV
1968
      continue;
×
1969
    }
1970

1971
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
9✔
1972
    sdbRelease(pSdb, pStream);
9✔
1973

1974
    if (code != TSDB_CODE_SUCCESS) {
9!
UNCOV
1975
      sdbCancelFetch(pSdb, pIter);
×
1976
      return code;
×
1977
    }
1978
  }
1979

1980
  // no need to build the trans to handle the vgroup update
1981
  if (pTrans == NULL) {
9!
1982
    return 0;
×
1983
  }
1984

1985
  code = mndTransPrepare(pMnode, pTrans);
9✔
1986
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
9!
UNCOV
1987
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1988
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1989
    mndTransDrop(pTrans);
×
UNCOV
1990
    return code;
×
1991
  }
1992

1993
  sdbRelease(pMnode->pSdb, pStream);
9✔
1994
  mndTransDrop(pTrans);
9✔
1995
  return code;
9✔
1996
}
1997

1998
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
719✔
1999
  SSdb       *pSdb = pMnode->pSdb;
719✔
2000
  SStreamObj *pStream = NULL;
719✔
2001
  void       *pIter = NULL;
719✔
2002
  int32_t     code = 0;
719✔
2003

2004
  mDebug("start to refresh node list by existed streams");
719✔
2005

2006
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
719✔
2007
  if (pHash == NULL) {
719!
UNCOV
2008
    return terrno;
×
2009
  }
2010

2011
  while (1) {
9✔
2012
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
728✔
2013
    if (pIter == NULL) {
728✔
2014
      break;
719✔
2015
    }
2016

2017
    taosWLockLatch(&pStream->lock);
9✔
2018

2019
    SStreamTaskIter *pTaskIter = NULL;
9✔
2020
    code = createStreamTaskIter(pStream, &pTaskIter);
9✔
2021
    if (code) {
9!
UNCOV
2022
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
2023
      sdbRelease(pSdb, pStream);
×
UNCOV
2024
      mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
2025
      continue;
×
2026
    }
2027

2028
    while (streamTaskIterNextTask(pTaskIter)) {
64✔
2029
      SStreamTask *pTask = NULL;
55✔
2030
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
55✔
2031
      if (code) {
55!
UNCOV
2032
        break;
×
2033
      }
2034

2035
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
55✔
2036
      epsetAssign(&entry.epset, &pTask->info.epSet);
55✔
2037
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
55✔
2038
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
55!
UNCOV
2039
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2040
      }
2041
    }
2042

2043
    destroyStreamTaskIter(pTaskIter);
9✔
2044
    taosWUnLockLatch(&pStream->lock);
9✔
2045

2046
    sdbRelease(pSdb, pStream);
9✔
2047
  }
2048

2049
  taosArrayClear(pNodeList);
719✔
2050

2051
  // convert to list
2052
  pIter = NULL;
719✔
2053
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
748✔
2054
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
29✔
2055

2056
    void *p = taosArrayPush(pNodeList, pEntry);
29✔
2057
    if (p == NULL) {
29!
UNCOV
2058
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
UNCOV
2059
      if (code == 0) {
×
UNCOV
2060
        code = terrno;
×
2061
      }
2062
      continue;
×
2063
    }
2064

2065
    char    buf[256] = {0};
29✔
2066
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
29✔
2067
    if (ret != 0) {                                                // print error and continue
29!
UNCOV
2068
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2069
    }
2070

2071
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
29✔
2072
  }
2073

2074
  taosHashCleanup(pHash);
719✔
2075

2076
  mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
719✔
2077
  return code;
719✔
2078
}
2079

2080
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2081
  void   *pIter = NULL;
×
UNCOV
2082
  int32_t code = 0;
×
UNCOV
2083
  while (1) {
×
2084
    SVgObj *pVgroup = NULL;
×
2085
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
2086
    if (pIter == NULL) {
×
2087
      break;
×
2088
    }
2089

UNCOV
2090
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
UNCOV
2091
    sdbRelease(pSdb, pVgroup);
×
2092

UNCOV
2093
    if (code == 0) {
×
UNCOV
2094
      int32_t size = taosHashGetSize(pDBMap);
×
UNCOV
2095
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2096
    }
2097
  }
UNCOV
2098
}
×
2099

2100
// this function runs by only one thread, so it is not multi-thread safe
2101
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
555✔
2102
  int32_t code = 0;
555✔
2103
  bool    allReady = true;
555✔
2104
  SArray *pNodeSnapshot = NULL;
555✔
2105
  SMnode *pMnode = pMsg->info.node;
555✔
2106
  int64_t ts = taosGetTimestampSec();
555✔
2107
  bool    updateAllVgroups = false;
555✔
2108

2109
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
555✔
2110
  if (old != 0) {
555!
UNCOV
2111
    mDebug("still in checking node change");
×
UNCOV
2112
    return 0;
×
2113
  }
2114

2115
  mDebug("start to do node changing check");
555✔
2116

2117
  streamMutexLock(&execInfo.lock);
555✔
2118
  int32_t numOfNodes = extractStreamNodeList(pMnode);
555✔
2119
  streamMutexUnlock(&execInfo.lock);
555✔
2120

2121
  if (numOfNodes == 0) {
555!
UNCOV
2122
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
UNCOV
2123
    execInfo.ts = ts;
×
2124
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
UNCOV
2125
    return 0;
×
2126
  }
2127

2128
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
555✔
2129
  if (code) {
555!
UNCOV
2130
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2131
  }
2132

2133
  if (!allReady) {
555✔
2134
    taosArrayDestroy(pNodeSnapshot);
35✔
2135
    atomic_store_32(&mndNodeCheckSentinel, 0);
35✔
2136
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
35!
2137
    return 0;
35✔
2138
  }
2139

2140
  streamMutexLock(&execInfo.lock);
520✔
2141

2142
  code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
520✔
2143
  if (code) {
520!
2144
    goto _end;
×
2145
  }
2146

2147
  SVgroupChangeInfo changeInfo = {0};
520✔
2148
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
520✔
2149
  if (code) {
520!
2150
    goto _end;
×
2151
  }
2152

2153
  {
2154
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
520!
UNCOV
2155
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
UNCOV
2156
      updateAllVgroups = true;
×
UNCOV
2157
      execInfo.switchFromFollower = false;  // reset the flag
×
UNCOV
2158
      addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
×
2159
    }
2160
  }
2161

2162
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
520!
2163
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2164
    killAllCheckpointTrans(pMnode, &changeInfo);
9✔
2165
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
9✔
2166

2167
    // keep the new vnode snapshot if success
2168
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
9!
2169
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
9✔
2170
      if (code) {
9!
UNCOV
2171
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
2172
        goto _end;
×
2173
      }
2174

2175
      execInfo.ts = ts;
9✔
2176
      mDebug("create trans successfully, update cached node list, numOfNodes:%d",
9✔
2177
             (int)taosArrayGetSize(execInfo.pNodeList));
2178
    } else {
UNCOV
2179
      mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
×
2180
    }
2181
  } else {
2182
    mDebug("no update found in nodeList");
511✔
2183
  }
2184

2185
  mndDestroyVgroupChangeInfo(&changeInfo);
520✔
2186

2187
_end:
520✔
2188
  streamMutexUnlock(&execInfo.lock);
520✔
2189
  taosArrayDestroy(pNodeSnapshot);
520✔
2190

2191
  mDebug("end to do stream task node change checking");
520✔
2192
  atomic_store_32(&mndNodeCheckSentinel, 0);
520✔
2193
  return 0;
520✔
2194
}
2195

2196
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
1,799✔
2197
  SMnode *pMnode = pReq->info.node;
1,799✔
2198
  SSdb   *pSdb = pMnode->pSdb;
1,799✔
2199
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
1,799✔
2200
    return 0;
1,244✔
2201
  }
2202

2203
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
555✔
2204
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
555✔
2205
  if (pMsg == NULL) {
555!
UNCOV
2206
    return terrno;
×
2207
  }
2208

2209
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
555✔
2210
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
555✔
2211
}
2212

2213
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,773✔
2214
  SStreamTaskIter *pIter = NULL;
1,773✔
2215
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,773✔
2216
  if (code) {
1,773!
UNCOV
2217
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
2218
    return;
×
2219
  }
2220

2221
  while (streamTaskIterNextTask(pIter)) {
10,851✔
2222
    SStreamTask *pTask = NULL;
9,078✔
2223
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,078✔
2224
    if (code) {
9,078!
UNCOV
2225
      break;
×
2226
    }
2227

2228
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
9,078✔
2229
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
9,078✔
2230
    if (p == NULL) {
9,078✔
2231
      STaskStatusEntry entry = {0};
8,458✔
2232
      streamTaskStatusInit(&entry, pTask);
8,458✔
2233

2234
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
8,458✔
2235
      if (code == 0) {
8,458!
2236
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
8,458✔
2237
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
8,458✔
2238
        if (px) {
8,458!
2239
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
8,458!
2240
        } else {
UNCOV
2241
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2242
        }
2243
      } else {
UNCOV
2244
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2245
      }
2246

2247
      // add the new vgroups if not added yet
2248
      bool exist = false;
8,458✔
2249
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
45,913✔
2250
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
44,226✔
2251
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
44,226!
2252
          exist = true;
6,771✔
2253
          break;
6,771✔
2254
        }
2255
      }
2256

2257
      if (!exist) {
8,458✔
2258
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
1,687✔
2259
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
1,687✔
2260

2261
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
1,687✔
2262
        if (px) {
1,687!
2263
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
1,687!
2264
        } else {
UNCOV
2265
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2266
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2267
        }
2268
      }
2269
    }
2270
  }
2271

2272
  destroyStreamTaskIter(pIter);
1,773✔
2273
}
2274

2275
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
4,307✔
2276
  int32_t num = taosArrayGetSize(pList);
4,307✔
2277
  for (int32_t i = 0; i < num; ++i) {
15,817✔
2278
    int32_t *pId = taosArrayGet(pList, i);
11,518✔
2279
    if (pId == NULL) {
11,518!
UNCOV
2280
      continue;
×
2281
    }
2282

2283
    if (taskId == *pId) {
11,518✔
2284
      return;
8✔
2285
    }
2286
  }
2287

2288
  int32_t numOfTasks = taosArrayGetSize(pList);
4,299✔
2289
  void   *p = taosArrayPush(pList, &taskId);
4,299✔
2290
  if (p) {
4,299!
2291
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
4,299✔
2292
  } else {
UNCOV
2293
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2294
           uid, numOfTasks);
2295
  }
2296
}
2297

2298
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
4,307✔
2299
  SMnode                  *pMnode = pReq->info.node;
4,307✔
2300
  SStreamTaskCheckpointReq req = {0};
4,307✔
2301

2302
  SDecoder decoder = {0};
4,307✔
2303
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,307✔
2304

2305
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
4,307!
UNCOV
2306
    tDecoderClear(&decoder);
×
UNCOV
2307
    mError("invalid task checkpoint req msg received");
×
UNCOV
2308
    return TSDB_CODE_INVALID_MSG;
×
2309
  }
2310
  tDecoderClear(&decoder);
4,307✔
2311

2312
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
4,307✔
2313

2314
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2315
  streamMutexLock(&execInfo.lock);
4,307✔
2316

2317
  SStreamObj *pStream = NULL;
4,307✔
2318
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,307✔
2319
  if (pStream == NULL || code != 0) {
4,307!
2320
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2321
          req.streamId);
2322

2323
    // not in meta-store yet, try to acquire the task in exec buffer
2324
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2325
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
UNCOV
2326
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
2327
    if (p == NULL) {
×
UNCOV
2328
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
UNCOV
2329
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
2330
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2331
    } else {
UNCOV
2332
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2333
             req.streamId, req.taskId);
2334
    }
2335
  }
2336

2337
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,307!
2338

2339
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
4,307✔
2340
  if (pReqTaskList == NULL) {
4,307✔
2341
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
762✔
2342
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
762✔
2343
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
762✔
2344
    if (code) {
762!
UNCOV
2345
      mError("failed to put into transfer state stream map, code: out of memory");
×
2346
    }
2347
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
762✔
2348
  } else {
2349
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
3,545✔
2350
  }
2351

2352
  int32_t total = taosArrayGetSize(*pReqTaskList);
4,307✔
2353
  if (total == numOfTasks) {  // all tasks have sent the reqs
4,307✔
2354
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
718✔
2355
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
718!
2356

2357
    if (pStream != NULL) {  // TODO:handle error
718!
2358
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
718✔
2359
      if (code) {
718!
2360
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
718!
2361
      }
2362
    } else {
2363
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2364
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2365
      // sleep(500ms)
2366
    }
2367

2368
    // remove this entry
2369
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
718✔
2370

2371
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
718✔
2372
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
718✔
2373
  }
2374

2375
  if (pStream != NULL) {
4,307!
2376
    mndReleaseStream(pMnode, pStream);
4,307✔
2377
  }
2378

2379
  streamMutexUnlock(&execInfo.lock);
4,307✔
2380

2381
  {
2382
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
4,307✔
2383
    rsp.pCont = rpcMallocCont(rsp.contLen);
4,307✔
2384
    if (rsp.pCont == NULL) {
4,307!
UNCOV
2385
      return terrno;
×
2386
    }
2387

2388
    SMsgHead *pHead = rsp.pCont;
4,307✔
2389
    pHead->vgId = htonl(req.nodeId);
4,307✔
2390

2391
    tmsgSendRsp(&rsp);
4,307✔
2392
    pReq->info.handle = NULL;  // disable auto rsp
4,307✔
2393
  }
2394

2395
  return 0;
4,307✔
2396
}
2397

2398
// valid the info according to the HbMsg
2399
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
4,129✔
2400
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
4,129✔
2401
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
4,129✔
2402
  if (pTaskEntry == NULL) {
4,129✔
2403
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
22!
2404
    return false;
22✔
2405
  }
2406

2407
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
4,107!
2408
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2409
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
UNCOV
2410
    return false;
×
2411
  }
2412

2413
  // now the task in checkpoint procedure
2414
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
4,107!
2415
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2416
           " discard",
2417
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2418
    return false;
×
2419
  }
2420

2421
  if (reportChkptId >= pReport->checkpointId) {
4,107!
UNCOV
2422
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2423
           " discard",
2424
           pReport->taskId, pReport->checkpointId, reportChkptId);
UNCOV
2425
    return false;
×
2426
  }
2427

2428
  return true;
4,107✔
2429
}
2430

2431
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) {
4,129✔
2432
  bool valid = validateChkptReport(pReport, reportChkptId);
4,129✔
2433
  if (!valid) {
4,129✔
2434
    return;
22✔
2435
  }
2436

2437
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
15,358✔
2438
    STaskChkptInfo *p = taosArrayGet(pList, i);
11,251✔
2439
    if (p == NULL) {
11,251!
2440
      continue;
×
2441
    }
2442

2443
    if (p->taskId == pReport->taskId) {
11,251!
UNCOV
2444
      if (p->checkpointId > pReport->checkpointId) {
×
2445
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2446
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2447
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2448
        mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2449
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2450

2451
        // update the checkpoint report info
UNCOV
2452
        p->checkpointId = pReport->checkpointId;
×
2453
        p->ts = pReport->checkpointTs;
×
UNCOV
2454
        p->version = pReport->checkpointVer;
×
UNCOV
2455
        p->transId = pReport->transId;
×
UNCOV
2456
        p->dropHTask = pReport->dropHTask;
×
2457
      } else {
UNCOV
2458
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2459
      }
UNCOV
2460
      return;
×
2461
    }
2462
  }
2463

2464
  STaskChkptInfo info = {
4,107✔
2465
      .streamId = pReport->streamId,
4,107✔
2466
      .taskId = pReport->taskId,
4,107✔
2467
      .transId = pReport->transId,
4,107✔
2468
      .dropHTask = pReport->dropHTask,
4,107✔
2469
      .version = pReport->checkpointVer,
4,107✔
2470
      .ts = pReport->checkpointTs,
4,107✔
2471
      .checkpointId = pReport->checkpointId,
4,107✔
2472
      .nodeId = pReport->nodeId,
4,107✔
2473
  };
2474

2475
  void *p = taosArrayPush(pList, &info);
4,107✔
2476
  if (p == NULL) {
4,107!
UNCOV
2477
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2478
  } else {
2479
    int32_t size = taosArrayGetSize(pList);
4,107✔
2480
    mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size);
4,107✔
2481
  }
2482
}
2483

2484
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
4,129✔
2485
  SMnode           *pMnode = pReq->info.node;
4,129✔
2486
  SCheckpointReport req = {0};
4,129✔
2487

2488
  SDecoder decoder = {0};
4,129✔
2489
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,129✔
2490

2491
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
4,129!
UNCOV
2492
    tDecoderClear(&decoder);
×
UNCOV
2493
    mError("invalid task checkpoint-report msg received");
×
UNCOV
2494
    return TSDB_CODE_INVALID_MSG;
×
2495
  }
2496
  tDecoderClear(&decoder);
4,129✔
2497

2498
  streamMutexLock(&execInfo.lock);
4,129✔
2499
  mndInitStreamExecInfo(pMnode, &execInfo);
4,129✔
2500
  streamMutexUnlock(&execInfo.lock);
4,129✔
2501

2502
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
4,129✔
2503
         " checkpointVer:%" PRId64 " transId:%d",
2504
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2505

2506
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2507
  streamMutexLock(&execInfo.lock);
4,129✔
2508

2509
  SStreamObj *pStream = NULL;
4,129✔
2510
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,129✔
2511
  if (pStream == NULL || code != 0) {
4,129!
2512
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2513

2514
    // not in meta-store yet, try to acquire the task in exec buffer
2515
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2516
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
UNCOV
2517
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
2518
    if (p == NULL) {
×
UNCOV
2519
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
UNCOV
2520
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
2521
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2522
    } else {
UNCOV
2523
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2524
             req.streamId, req.taskId);
2525
    }
2526
  }
2527

2528
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,129!
2529

2530
  SChkptReportInfo *pInfo =
2531
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
4,129✔
2532
  if (pInfo == NULL) {
4,129✔
2533
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
702✔
2534
    if (info.pTaskList != NULL) {
702!
2535
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
702✔
2536
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
702✔
2537
      if (code) {
702!
UNCOV
2538
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2539
      }
2540

2541
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
702✔
2542
    }
2543
  } else {
2544
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
3,427✔
2545
  }
2546

2547
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
4,129✔
2548
  if (total == numOfTasks) {  // all tasks has send the reqs
4,129✔
2549
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
691!
2550
          " will be issued soon",
2551
          req.streamId, pStream->name, total, req.checkpointId);
2552
  }
2553

2554
  if (pStream != NULL) {
4,129!
2555
    mndReleaseStream(pMnode, pStream);
4,129✔
2556
  }
2557

2558
  streamMutexUnlock(&execInfo.lock);
4,129✔
2559

2560
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
4,129✔
2561
  return code;
4,129✔
2562
}
2563

2564
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
111✔
2565
  int32_t num = 0;
111✔
2566
  int64_t chkId = INT64_MAX;
111✔
2567
  *pExistedTasks = 0;
111✔
2568
  *pAllSame = true;
111✔
2569

2570
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
1,620✔
2571
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
1,509✔
2572
    if (p == NULL) {
1,509!
UNCOV
2573
      continue;
×
2574
    }
2575

2576
    if (p->streamId != streamId) {
1,509✔
2577
      continue;
868✔
2578
    }
2579

2580
    num += 1;
641✔
2581
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
641✔
2582
    if (chkId > pe->checkpointInfo.latestId) {
641✔
2583
      if (chkId != INT64_MAX) {
115✔
2584
        *pAllSame = false;
4✔
2585
      }
2586
      chkId = pe->checkpointInfo.latestId;
115✔
2587
    }
2588
  }
2589

2590
  *pExistedTasks = num;
111✔
2591
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
111!
UNCOV
2592
    return -1;
×
2593
  }
2594

2595
  return chkId;
111✔
2596
}
2597

2598
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
4,129✔
2599
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
4,129✔
2600
  rsp.pCont = rpcMallocCont(rsp.contLen);
4,129✔
2601
  if (rsp.pCont != NULL) {
4,129!
2602
    SMsgHead *pHead = rsp.pCont;
4,129✔
2603
    pHead->vgId = htonl(vgId);
4,129✔
2604

2605
    tmsgSendRsp(&rsp);
4,129✔
2606
    pInfo->handle = NULL;  // disable auto rsp
4,129✔
2607
  }
2608
}
4,129✔
2609

2610
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
8,318✔
2611
  SMnode *pMnode = pMsg->info.node;
8,318✔
2612
  int64_t now = taosGetTimestampMs();
8,318✔
2613
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
8,318✔
2614
  if (pStreamList == NULL) {
8,318!
UNCOV
2615
    return terrno;
×
2616
  }
2617

2618
  mDebug("start to process consensus-checkpointId in tmr");
8,318✔
2619

2620
  bool    allReady = true;
8,318✔
2621
  SArray *pNodeSnapshot = NULL;
8,318✔
2622

2623
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
8,318✔
2624
  taosArrayDestroy(pNodeSnapshot);
8,318✔
2625
  if (code) {
8,318✔
2626
    mError("failed to get the vgroup snapshot, ignore it and continue");
105!
2627
  }
2628

2629
  if (!allReady) {
8,318✔
2630
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1,502!
2631
    taosArrayDestroy(pStreamList);
1,502✔
2632
    return 0;
1,502✔
2633
  }
2634

2635
  streamMutexLock(&execInfo.lock);
6,816✔
2636

2637
  void *pIter = NULL;
6,816✔
2638
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
6,840✔
2639
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
24✔
2640

2641
    int64_t streamId = -1;
24✔
2642
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
24✔
2643
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
24✔
2644
    if (pList == NULL) {
24!
2645
      continue;
×
2646
    }
2647

2648
    SStreamObj *pStream = NULL;
24✔
2649
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
24✔
2650
    if (pStream == NULL || code != 0) {  // stream has been dropped already
24!
2651
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
UNCOV
2652
      taosArrayDestroy(pList);
×
UNCOV
2653
      continue;
×
2654
    }
2655

2656
    for (int32_t j = 0; j < num; ++j) {
135✔
2657
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
111✔
2658
      if (pe == NULL) {
111!
UNCOV
2659
        continue;
×
2660
      }
2661

2662
      streamId = pe->req.streamId;
111✔
2663

2664
      int32_t existed = 0;
111✔
2665
      bool    allSame = true;
111✔
2666
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
111✔
2667
      if (chkId == -1) {
111!
UNCOV
2668
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2669
               pInfo->numOfTasks, pe->req.taskId);
2670
        break;
×
2671
      }
2672

2673
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
111✔
2674
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
107✔
2675
               pe->req.startTs, (now - pe->ts) / 1000.0);
2676
        if (chkId > pe->req.checkpointId) {
107!
2677
          streamMutexUnlock(&execInfo.lock);
×
UNCOV
2678
          taosArrayDestroy(pStreamList);
×
UNCOV
2679
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2680
                 pe->req.checkpointId, chkId);
UNCOV
2681
          return TSDB_CODE_FAILED;
×
2682
        }
2683
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
107✔
2684
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
107!
UNCOV
2685
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2686
        }
2687

2688
        void *p = taosArrayPush(pList, &pe->req.taskId);
107✔
2689
        if (p == NULL) {
107!
UNCOV
2690
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2691
        }
2692
        streamId = pe->req.streamId;
107✔
2693
      } else {
2694
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
4!
2695
               pe->req.startTs, (now - pe->ts) / 1000.0);
2696
      }
2697
    }
2698

2699
    mndReleaseStream(pMnode, pStream);
24✔
2700

2701
    if (taosArrayGetSize(pList) > 0) {
24✔
2702
      for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
130✔
2703
        int32_t *taskId = taosArrayGet(pList, i);
107✔
2704
        if (taskId == NULL) {
107!
UNCOV
2705
          continue;
×
2706
        }
2707

2708
        for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
107!
2709
          SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
107✔
2710
          if ((pe != NULL) && (pe->req.taskId == *taskId)) {
107!
2711
            taosArrayRemove(pInfo->pTaskList, k);
107✔
2712
            break;
107✔
2713
          }
2714
        }
2715
      }
2716
    }
2717

2718
    taosArrayDestroy(pList);
24✔
2719

2720
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
24✔
2721
      mndClearConsensusRspEntry(pInfo);
23✔
2722
      if (streamId == -1) {
23!
UNCOV
2723
        streamMutexUnlock(&execInfo.lock);
×
UNCOV
2724
        taosArrayDestroy(pStreamList);
×
UNCOV
2725
        mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
×
UNCOV
2726
        return TSDB_CODE_FAILED;
×
2727
      }
2728
      void *p = taosArrayPush(pStreamList, &streamId);
23✔
2729
      if (p == NULL) {
23!
2730
        mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
×
2731
      }
2732
    }
2733
  }
2734

2735
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
6,839✔
2736
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
23✔
2737
    if (pStreamId == NULL) {
23!
UNCOV
2738
      continue;
×
2739
    }
2740

2741
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
23✔
2742
  }
2743

2744
  streamMutexUnlock(&execInfo.lock);
6,816✔
2745

2746
  taosArrayDestroy(pStreamList);
6,816✔
2747
  mDebug("end to process consensus-checkpointId in tmr");
6,816✔
2748
  return code;
6,816✔
2749
}
2750

2751
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
235✔
2752
  int32_t code = mndProcessCreateStreamReq(pReq);
235✔
2753
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
235!
UNCOV
2754
    pReq->info.rsp = rpcMallocCont(1);
×
UNCOV
2755
    if (pReq->info.rsp == NULL) {
×
UNCOV
2756
      return terrno;
×
2757
    }
2758

UNCOV
2759
    pReq->info.rspLen = 1;
×
UNCOV
2760
    pReq->info.noResp = false;
×
UNCOV
2761
    pReq->code = code;
×
2762
  }
2763
  return code;
235✔
2764
}
2765

2766
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
203✔
2767
  int32_t code = mndProcessDropStreamReq(pReq);
203✔
2768
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
203!
2769
    pReq->info.rsp = rpcMallocCont(1);
20✔
2770
    if (pReq->info.rsp == NULL) {
20!
UNCOV
2771
      return terrno;
×
2772
    }
2773

2774
    pReq->info.rspLen = 1;
20✔
2775
    pReq->info.noResp = false;
20✔
2776
    pReq->code = code;
20✔
2777
  }
2778
  return code;
203✔
2779
}
2780

2781
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
31,933✔
2782
  if (pExecInfo->initTaskList || pMnode == NULL) {
31,933✔
2783
    return;
31,808✔
2784
  }
2785

2786
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
125✔
2787
  pExecInfo->initTaskList = true;
125✔
2788
}
2789

2790
void mndStreamResetInitTaskListLoadFlag() {
1,353✔
2791
  mInfo("reset task list buffer init flag for leader");
1,353!
2792
  execInfo.initTaskList = false;
1,353✔
2793
}
1,353✔
2794

2795
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
1,548✔
2796
  execInfo.switchFromFollower = false;
1,548✔
2797

2798
  if (execInfo.role == NODE_ROLE_UNINIT) {
1,548✔
2799
    execInfo.role = role;
1,444✔
2800
    if (role == NODE_ROLE_LEADER) {
1,444✔
2801
      mInfo("init mnode is set to leader");
1,318!
2802
    } else {
2803
      mInfo("init mnode is set to follower");
126!
2804
    }
2805
  } else {
2806
    if (role == NODE_ROLE_LEADER) {
104✔
2807
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
35!
2808
        execInfo.role = role;
35✔
2809
        execInfo.switchFromFollower = true;
35✔
2810
        mInfo("mnode switch to be leader from follower");
35!
2811
      } else {
UNCOV
2812
        mInfo("mnode remain to be leader, do nothing");
×
2813
      }
2814
    } else {  // follower's
2815
      if (execInfo.role == NODE_ROLE_LEADER) {
69✔
2816
        execInfo.role = role;
1✔
2817
        mInfo("mnode switch to be follower from leader");
1!
2818
      } else {
2819
        mInfo("mnode remain to be follower, do nothing");
68!
2820
      }
2821
    }
2822
  }
2823
}
1,548✔
2824

2825
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
125✔
2826
  SSdb       *pSdb = pMnode->pSdb;
125✔
2827
  SStreamObj *pStream = NULL;
125✔
2828
  void       *pIter = NULL;
125✔
2829

2830
  while (1) {
2831
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
324✔
2832
    if (pIter == NULL) {
324✔
2833
      break;
125✔
2834
    }
2835

2836
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
199✔
2837
    sdbRelease(pSdb, pStream);
199✔
2838
  }
2839
}
125✔
2840

2841
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
539✔
2842
  STrans *pTrans = NULL;
539✔
2843
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
539✔
2844
                               "update checkpoint-info", &pTrans);
2845
  if (pTrans == NULL || code) {
539!
2846
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2847
    return code;
×
2848
  }
2849

2850
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
539✔
2851
  if (code) {
539!
2852
    sdbRelease(pMnode->pSdb, pStream);
×
2853
    mndTransDrop(pTrans);
×
UNCOV
2854
    return code;
×
2855
  }
2856

2857
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
539✔
2858
  if (code) {
539!
2859
    sdbRelease(pMnode->pSdb, pStream);
×
2860
    mndTransDrop(pTrans);
×
UNCOV
2861
    return code;
×
2862
  }
2863

2864
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
539✔
2865
  if (code) {
539!
2866
    sdbRelease(pMnode->pSdb, pStream);
×
2867
    mndTransDrop(pTrans);
×
2868
    return code;
×
2869
  }
2870

2871
  code = mndTransPrepare(pMnode, pTrans);
539✔
2872
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
539!
UNCOV
2873
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
2874
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2875
    mndTransDrop(pTrans);
×
UNCOV
2876
    return code;
×
2877
  }
2878

2879
  sdbRelease(pMnode->pSdb, pStream);
539✔
2880
  mndTransDrop(pTrans);
539✔
2881

2882
  return TSDB_CODE_ACTION_IN_PROGRESS;
539✔
2883
}
2884

2885
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
2✔
2886
  SMnode      *pMnode = pReq->info.node;
2✔
2887
  int32_t      code = 0;
2✔
2888
  SOrphanTask *pTask = NULL;
2✔
2889
  int32_t      i = 0;
2✔
2890
  STrans      *pTrans = NULL;
2✔
2891
  int32_t      numOfTasks = 0;
2✔
2892

2893
  SMStreamDropOrphanMsg msg = {0};
2✔
2894
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
2✔
2895
  if (code) {
2!
UNCOV
2896
    return code;
×
2897
  }
2898

2899
  numOfTasks = taosArrayGetSize(msg.pList);
2✔
2900
  if (numOfTasks == 0) {
2!
2901
    mDebug("no orphan tasks to drop, no need to create trans");
×
UNCOV
2902
    goto _err;
×
2903
  }
2904

2905
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
2!
2906

2907
  i = 0;
2✔
2908
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
2!
UNCOV
2909
    i += 1;
×
2910
  }
2911

2912
  if (pTask == NULL) {
2!
UNCOV
2913
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
UNCOV
2914
    goto _err;
×
2915
  }
2916

2917
  // check if it is conflict with other trans in both sourceDb and targetDb.
2918
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
2✔
2919
  if (code) {
2!
2920
    goto _err;
×
2921
  }
2922

2923
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
2✔
2924

2925
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
2✔
2926
  if (pTrans == NULL || code != 0) {
2!
UNCOV
2927
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
2928
    goto _err;
×
2929
  }
2930

2931
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
2✔
2932
  if (code) {
2!
UNCOV
2933
    goto _err;
×
2934
  }
2935

2936
  // drop all tasks
2937
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
2!
UNCOV
2938
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
2939
    goto _err;
×
2940
  }
2941

2942
  // drop stream
2943
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
2!
UNCOV
2944
    goto _err;
×
2945
  }
2946

2947
  code = mndTransPrepare(pMnode, pTrans);
2✔
2948
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
UNCOV
2949
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
2950
    goto _err;
×
2951
  }
2952

2953
_err:
2✔
2954
  tDestroyDropOrphanTaskMsg(&msg);
2✔
2955
  mndTransDrop(pTrans);
2✔
2956

2957
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2958
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
2!
2959
  }
2960
  return code;
2✔
2961
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc