• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.14
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndStream.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq);
44
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq);
45
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
46

47
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
48
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
49

50
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
51
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
53
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
55
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
56
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
57
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
58
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
59
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
60
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
61
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
62
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
63
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
64
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
65
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
66
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
67
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
68
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
69

70
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
71
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
72

73
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
74
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
75
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
77
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
78

79
int32_t mndInitStream(SMnode *pMnode) {
1,748✔
80
  SSdbTable table = {
1,748✔
81
      .sdbType = SDB_STREAM,
82
      .keyType = SDB_KEY_BINARY,
83
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
84
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
85
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
86
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
87
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
88
  };
89
  SSdbTable tableSeq = {
1,748✔
90
      .sdbType = SDB_STREAM_SEQ,
91
      .keyType = SDB_KEY_BINARY,
92
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
93
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
94
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
95
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
96
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
97
  };
98

99
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,748✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_FAILED_STREAM, mndProcessFailedStreamReq);
1,748✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_CHECK_STREAM_TIMER, mndProcessCheckStreamStatusReq);
1,748✔
102
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,748✔
103
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
1,748✔
104

105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
1,748✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
1,748✔
107
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
1,748✔
108
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
1,748✔
109
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
1,748✔
110
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
1,748✔
111
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
1,748✔
112
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
1,748✔
113
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
1,748✔
114

115
  // for msgs inside mnode
116
  // TODO change the name
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
1,748✔
118
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
1,748✔
119
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
1,748✔
120
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
1,748✔
121

122
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
1,748✔
123
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_ALL_STOP_RSP, mndTransProcessRsp);
1,748✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
1,748✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
1,748✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
1,748✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
1,748✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
1,748✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
1,748✔
130
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
1,748✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
1,748✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
1,748✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
1,748✔
134

135
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
1,748✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
1,748✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
1,748✔
138

139
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,748✔
140
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,748✔
141
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,748✔
142
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,748✔
143

144
  int32_t code = mndInitExecInfo();
1,748✔
145
  if (code) {
1,748!
146
    return code;
×
147
  }
148

149
  code = sdbSetTable(pMnode->pSdb, table);
1,748✔
150
  if (code) {
1,748!
151
    return code;
×
152
  }
153

154
  code = sdbSetTable(pMnode->pSdb, tableSeq);
1,748✔
155
  return code;
1,748✔
156
}
157

158
void mndCleanupStream(SMnode *pMnode) {
1,747✔
159
  taosArrayDestroy(execInfo.pTaskList);
1,747✔
160
  taosArrayDestroy(execInfo.pNodeList);
1,747✔
161
  taosArrayDestroy(execInfo.pKilledChkptTrans);
1,747✔
162
  taosHashCleanup(execInfo.pTaskMap);
1,747✔
163
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
1,747✔
164
  taosHashCleanup(execInfo.pTransferStateStreams);
1,747✔
165
  taosHashCleanup(execInfo.pChkptStreams);
1,747✔
166
  taosHashCleanup(execInfo.pStreamConsensus);
1,747✔
167
  (void)taosThreadMutexDestroy(&execInfo.lock);
1,747✔
168
  mDebug("mnd stream exec info cleanup");
1,747✔
169
}
1,747✔
170

171
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
6,934✔
172
  int32_t     code = 0;
6,934✔
173
  int32_t     lino = 0;
6,934✔
174
  SSdbRow    *pRow = NULL;
6,934✔
175
  SStreamObj *pStream = NULL;
6,934✔
176
  void       *buf = NULL;
6,934✔
177
  int8_t      sver = 0;
6,934✔
178
  int32_t     tlen;
179
  int32_t     dataPos = 0;
6,934✔
180

181
  code = sdbGetRawSoftVer(pRaw, &sver);
6,934✔
182
  TSDB_CHECK_CODE(code, lino, _over);
6,934!
183

184
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
6,934!
185
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
186
    goto _over;
×
187
  }
188

189
  pRow = sdbAllocRow(sizeof(SStreamObj));
6,934✔
190
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
6,934!
191

192
  pStream = sdbGetRowObj(pRow);
6,934✔
193
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
6,934!
194

195
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
6,934!
196

197
  buf = taosMemoryMalloc(tlen + 1);
6,934!
198
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,934!
199

200
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,934!
201

202
  SDecoder decoder;
203
  tDecoderInit(&decoder, buf, tlen + 1);
6,934✔
204
  code = tDecodeSStreamObj(&decoder, pStream, sver);
6,934✔
205
  tDecoderClear(&decoder);
6,934✔
206

207
  if (code < 0) {
6,934!
208
    tFreeStreamObj(pStream);
×
209
  }
210

211
_over:
6,934✔
212
  taosMemoryFreeClear(buf);
6,934!
213

214
  if (code != TSDB_CODE_SUCCESS) {
6,934!
215
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
216
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
217
    taosMemoryFreeClear(pRow);
×
218

219
    terrno = code;
×
220
    return NULL;
×
221
  } else {
222
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
6,934✔
223
           pStream->checkpointId);
224

225
    terrno = 0;
6,934✔
226
    return pRow;
6,934✔
227
  }
228
}
229

230
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
1,852✔
231
  mTrace("stream:%s, perform insert action", pStream->name);
1,852✔
232
  return 0;
1,852✔
233
}
234

235
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
6,934✔
236
  mInfo("stream:%s, perform delete action", pStream->name);
6,934!
237
  taosWLockLatch(&pStream->lock);
6,934✔
238
  tFreeStreamObj(pStream);
6,934✔
239
  taosWUnLockLatch(&pStream->lock);
6,934✔
240
  return 0;
6,934✔
241
}
242

243
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
3,722✔
244
  mTrace("stream:%s, perform update action", pOldStream->name);
3,722✔
245
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
3,722✔
246

247
  taosWLockLatch(&pOldStream->lock);
3,722✔
248

249
  pOldStream->status = pNewStream->status;
3,722✔
250
  pOldStream->updateTime = pNewStream->updateTime;
3,722✔
251
  pOldStream->checkpointId = pNewStream->checkpointId;
3,722✔
252
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
3,722✔
253
  if (pOldStream->pTaskList == NULL) {
3,722✔
254
    pOldStream->pTaskList = pNewStream->pTaskList;
2✔
255
    pNewStream->pTaskList = NULL;
2✔
256
  }
257
  if (pOldStream->pHTaskList == NULL) {
3,722!
258
    pOldStream->pHTaskList = pNewStream->pHTaskList;
3,722✔
259
    pNewStream->pHTaskList = NULL;
3,722✔
260
  }
261
  taosWUnLockLatch(&pOldStream->lock);
3,722✔
262
  return 0;
3,722✔
263
}
264

265
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
6,776✔
266
  int32_t code = 0;
6,776✔
267
  SSdb   *pSdb = pMnode->pSdb;
6,776✔
268
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
6,776✔
269
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
6,776!
270
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
2,695✔
271
  }
272
  return code;
6,776✔
273
}
274

275
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
15,244✔
276
  SSdb *pSdb = pMnode->pSdb;
15,244✔
277
  sdbRelease(pSdb, pStream);
15,244✔
278
}
15,244✔
279

280
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
281
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
282
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
283
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
284
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
285

286
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
1,785✔
287
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
1,785!
288
      pCreate->targetStbFullName[0] == 0) {
1,785!
289
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
290
  }
291
  return TSDB_CODE_SUCCESS;
1,785✔
292
}
293

294
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
1,781✔
295
  pWrapper->nCols = taosArrayGetSize(pFields);
1,781✔
296
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
1,781!
297
  if (NULL == pWrapper->pSchema) {
1,781!
298
    return terrno;
×
299
  }
300

301
  int32_t index = 0;
1,781✔
302
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
62,618✔
303
    SField *pField = (SField *)taosArrayGet(pFields, i);
60,837✔
304
    if (pField == NULL) {
60,837!
305
      return terrno;
×
306
    }
307

308
    if (TSDB_DATA_TYPE_NULL == pField->type) {
60,837!
309
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
310
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
311
    } else {
312
      pWrapper->pSchema[index].type = pField->type;
60,837✔
313
      pWrapper->pSchema[index].bytes = pField->bytes;
60,837✔
314
    }
315
    pWrapper->pSchema[index].colId = index + 1;
60,837✔
316
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
60,837✔
317
    pWrapper->pSchema[index].flags = pField->flags;
60,837✔
318
    index += 1;
60,837✔
319
  }
320

321
  return TSDB_CODE_SUCCESS;
1,781✔
322
}
323

324
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
1,781✔
325
  if (pWrapper->nCols < 2) {
1,781!
326
    return false;
×
327
  }
328
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
61,014✔
329
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
59,262✔
330
      return true;
29✔
331
    }
332
  }
333
  return false;
1,752✔
334
}
335

336
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
1,781✔
337
  SNode      *pAst = NULL;
1,781✔
338
  SQueryPlan *pPlan = NULL;
1,781✔
339
  int32_t     code = 0;
1,781✔
340

341
  mInfo("stream:%s to create", pCreate->name);
1,781!
342
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
1,781✔
343
  pObj->createTime = taosGetTimestampMs();
1,781✔
344
  pObj->updateTime = pObj->createTime;
1,781✔
345
  pObj->version = 1;
1,781✔
346

347
  if (pCreate->smaId > 0) {
1,781✔
348
    pObj->subTableWithoutMd5 = 1;
263✔
349
  }
350

351
  pObj->smaId = pCreate->smaId;
1,781✔
352
  pObj->indexForMultiAggBalance = -1;
1,781✔
353

354
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,781✔
355

356
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
1,781✔
357
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
1,781✔
358

359
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,781✔
360
  pObj->status = STREAM_STATUS__NORMAL;
1,781✔
361

362
  pObj->conf.igExpired = pCreate->igExpired;
1,781✔
363
  pObj->conf.trigger = pCreate->triggerType;
1,781✔
364
  pObj->conf.triggerParam = pCreate->maxDelay;
1,781✔
365
  pObj->conf.watermark = pCreate->watermark;
1,781✔
366
  pObj->conf.fillHistory = pCreate->fillHistory;
1,781✔
367
  pObj->deleteMark = pCreate->deleteMark;
1,781✔
368
  pObj->igCheckUpdate = pCreate->igUpdate;
1,781✔
369

370
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
1,781✔
371
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
1,781✔
372
  if (pSourceDb == NULL) {
1,781!
373
    code = terrno;
×
374
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
375
          tstrerror(code));
376
    goto _ERR;
×
377
  }
378

379
  pObj->sourceDbUid = pSourceDb->uid;
1,781✔
380
  mndReleaseDb(pMnode, pSourceDb);
1,781✔
381

382
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
1,781✔
383

384
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
1,781✔
385
  if (pTargetDb == NULL) {
1,781!
386
    code = terrno;
×
387
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
388
           tstrerror(code));
389
    goto _ERR;
×
390
  }
391

392
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
1,781✔
393

394
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
1,781✔
395
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,627✔
396
  } else {
397
    pObj->targetStbUid = pCreate->targetStbUid;
154✔
398
  }
399
  pObj->targetDbUid = pTargetDb->uid;
1,781✔
400
  mndReleaseDb(pMnode, pTargetDb);
1,781✔
401

402
  pObj->sql = pCreate->sql;
1,781✔
403
  pObj->ast = pCreate->ast;
1,781✔
404

405
  pCreate->sql = NULL;
1,781✔
406
  pCreate->ast = NULL;
1,781✔
407

408
  // deserialize ast
409
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
1,781!
410
    goto _ERR;
×
411
  }
412

413
  // create output schema
414
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
1,781!
415
    goto _ERR;
×
416
  }
417

418
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
1,781✔
419
  if (numOfNULL > 0) {
1,781✔
420
    pObj->outputSchema.nCols += numOfNULL;
26✔
421
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
26!
422
    if (!pFullSchema) {
26!
423
      code = terrno;
×
424
      goto _ERR;
×
425
    }
426

427
    int32_t nullIndex = 0;
26✔
428
    int32_t dataIndex = 0;
26✔
429
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
332✔
430
      if (nullIndex >= numOfNULL) {
306!
431
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
432
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
433
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
434
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
435
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
436
        dataIndex++;
×
437
      } else {
438
        SColLocation *pos = NULL;
306✔
439
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
306!
440
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
306✔
441
        }
442

443
        if (pos == NULL) {
306!
444
          mError("invalid null column index, %d", nullIndex);
×
445
          continue;
×
446
        }
447

448
        if (i < pos->slotId) {
306✔
449
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
79✔
450
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
79✔
451
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
79✔
452
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
79✔
453
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
79✔
454
          dataIndex++;
79✔
455
        } else {
456
          pFullSchema[i].bytes = 0;
227✔
457
          pFullSchema[i].colId = pos->colId;
227✔
458
          pFullSchema[i].flags = COL_SET_NULL;
227✔
459
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
227✔
460
          pFullSchema[i].type = pos->type;
227✔
461
          nullIndex++;
227✔
462
        }
463
      }
464
    }
465

466
    taosMemoryFree(pObj->outputSchema.pSchema);
26!
467
    pObj->outputSchema.pSchema = pFullSchema;
26✔
468
  }
469

470
  SPlanContext cxt = {
1,781✔
471
      .pAstRoot = pAst,
472
      .topicQuery = false,
473
      .streamQuery = true,
474
      .triggerType =
475
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
1,781✔
476
      .watermark = pObj->conf.watermark,
1,781✔
477
      .igExpired = pObj->conf.igExpired,
1,781✔
478
      .deleteMark = pObj->deleteMark,
1,781✔
479
      .igCheckUpdate = pObj->igCheckUpdate,
1,781✔
480
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
1,781✔
481
      .recalculateInterval = pCreate->recalculateInterval,
1,781✔
482
  };
483
  char *pTargetFStable = strchr(pCreate->targetStbFullName, '.');
1,781✔
484
  if (pTargetFStable != NULL) {
1,781!
485
    pTargetFStable = pTargetFStable + 1;
1,781✔
486
  }
487
  tstrncpy(cxt.pStbFullName, pTargetFStable, TSDB_TABLE_FNAME_LEN);
1,781✔
488
  tstrncpy(cxt.pWstartName, pCreate->pWstartName, TSDB_COL_NAME_LEN);
1,781✔
489
  tstrncpy(cxt.pWendName, pCreate->pWendName, TSDB_COL_NAME_LEN);
1,781✔
490
  tstrncpy(cxt.pGroupIdName, pCreate->pGroupIdName, TSDB_COL_NAME_LEN);
1,781✔
491
  tstrncpy(cxt.pIsWindowFilledName, pCreate->pIsWindowFilledName, TSDB_COL_NAME_LEN);
1,781✔
492

493
  // using ast and param to build physical plan
494
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
1,781!
UNCOV
495
    goto _ERR;
×
496
  }
497

498
  // save physcial plan
499
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
1,781!
UNCOV
500
    goto _ERR;
×
501
  }
502

503
  pObj->tagSchema.nCols = pCreate->numOfTags;
1,781✔
504
  if (pCreate->numOfTags) {
1,781✔
505
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
290!
506
    if (pObj->tagSchema.pSchema == NULL) {
290!
UNCOV
507
      code = terrno;
×
UNCOV
508
      goto _ERR;
×
509
    }
510
  }
511

512
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
513
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
3,385✔
514
    SField *pField = taosArrayGet(pCreate->pTags, i);
1,604✔
515
    if (pField == NULL) {
1,604!
UNCOV
516
      continue;
×
517
    }
518

519
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
1,604✔
520
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
1,604✔
521
    pObj->tagSchema.pSchema[i].flags = pField->flags;
1,604✔
522
    pObj->tagSchema.pSchema[i].type = pField->type;
1,604✔
523
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
1,604✔
524
  }
525

526
_ERR:
1,781✔
527
  if (pAst != NULL) nodesDestroyNode(pAst);
1,781!
528
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
1,781!
529
  return code;
1,781✔
530
}
531

532
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
14,041✔
533
  SEncoder encoder;
534
  tEncoderInit(&encoder, NULL, 0);
14,041✔
535

536
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
14,041!
UNCOV
537
    pTask->ver = SSTREAM_TASK_VER;
×
538
  }
539

540
  int32_t code = tEncodeStreamTask(&encoder, pTask);
14,041✔
541
  if (code == -1) {
14,041!
542
    tEncoderClear(&encoder);
×
UNCOV
543
    return TSDB_CODE_INVALID_MSG;
×
544
  }
545

546
  int32_t size = encoder.pos;
14,041✔
547
  int32_t tlen = sizeof(SMsgHead) + size;
14,041✔
548
  tEncoderClear(&encoder);
14,041✔
549

550
  void *buf = taosMemoryCalloc(1, tlen);
14,041!
551
  if (buf == NULL) {
14,041!
UNCOV
552
    return terrno;
×
553
  }
554

555
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
14,041✔
556

557
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
14,041✔
558
  tEncoderInit(&encoder, abuf, size);
14,041✔
559
  code = tEncodeStreamTask(&encoder, pTask);
14,041✔
560
  tEncoderClear(&encoder);
14,041✔
561

562
  if (code != 0) {
14,041!
UNCOV
563
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
UNCOV
564
    taosMemoryFree(buf);
×
UNCOV
565
    return code;
×
566
  }
567

568
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
14,041✔
569
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
570
  if (code) {
14,041!
571
    taosMemoryFree(buf);
×
572
  }
573

574
  return code;
14,041✔
575
}
576

577
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
1,780✔
578
  SStreamTaskIter *pIter = NULL;
1,780✔
579
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,780✔
580
  if (code) {
1,780!
UNCOV
581
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
582
    return code;
×
583
  }
584

585
  while (streamTaskIterNextTask(pIter)) {
11,013✔
586
    SStreamTask *pTask = NULL;
9,233✔
587
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,233✔
588
    if (code) {
9,233!
UNCOV
589
      destroyStreamTaskIter(pIter);
×
UNCOV
590
      return code;
×
591
    }
592

593
    code = mndPersistTaskDeployReq(pTrans, pTask);
9,233✔
594
    if (code) {
9,233!
UNCOV
595
      destroyStreamTaskIter(pIter);
×
UNCOV
596
      return code;
×
597
    }
598
  }
599

600
  destroyStreamTaskIter(pIter);
1,780✔
601

602
  // persistent stream task for already stored ts data
603
  if (pStream->conf.fillHistory || (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE)) {
1,780✔
604
    int32_t level = taosArrayGetSize(pStream->pHTaskList);
825✔
605

606
    for (int32_t i = 0; i < level; i++) {
2,520✔
607
      SArray *pLevel = taosArrayGetP(pStream->pHTaskList, i);
1,695✔
608

609
      int32_t numOfTasks = taosArrayGetSize(pLevel);
1,695✔
610
      for (int32_t j = 0; j < numOfTasks; j++) {
6,503✔
611
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
4,808✔
612
        code = mndPersistTaskDeployReq(pTrans, pTask);
4,808✔
613
        if (code) {
4,808!
UNCOV
614
          return code;
×
615
        }
616
      }
617
    }
618
  }
619

620
  return code;
1,780✔
621
}
622

623
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
1,780✔
624
  int32_t code = 0;
1,780✔
625
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
1,780!
UNCOV
626
    return code;
×
627
  }
628

629
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,780✔
630
}
631

632
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
1,627✔
633
  SStbObj *pStb = NULL;
1,627✔
634
  SDbObj  *pDb = NULL;
1,627✔
635
  int32_t  code = 0;
1,627✔
636
  int32_t  lino = 0;
1,627✔
637

638
  SMCreateStbReq createReq = {0};
1,627✔
639
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,627✔
640
  createReq.numOfColumns = pStream->outputSchema.nCols;
1,627✔
641
  createReq.numOfTags = 1;  // group id
1,627✔
642
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
1,627✔
643
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
1,627!
644

645
  // build fields
646
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
60,805✔
647
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
59,178✔
648
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
59,178!
649

650
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
59,178✔
651
    pField->flags = pStream->outputSchema.pSchema[i].flags;
59,178✔
652
    pField->type = pStream->outputSchema.pSchema[i].type;
59,178✔
653
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
59,178✔
654
    pField->compress = createDefaultColCmprByType(pField->type);
59,178✔
655
    if (IS_DECIMAL_TYPE(pField->type)) {
59,178✔
656
      uint8_t prec = 0, scale = 0;
52✔
657
      extractDecimalTypeInfoFromBytes(&pField->bytes, &prec, &scale);
52✔
658
      pField->typeMod = decimalCalcTypeMod(prec, scale);
52✔
659
    }
660
  }
661

662
  if (pStream->tagSchema.nCols == 0) {
1,627✔
663
    createReq.numOfTags = 1;
1,337✔
664
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
1,337✔
665
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
1,337!
666

667
    // build tags
668
    SField *pField = taosArrayGet(createReq.pTags, 0);
1,337✔
669
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
1,337!
670

671
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
1,337✔
672
    pField->type = TSDB_DATA_TYPE_UBIGINT;
1,337✔
673
    pField->flags = 0;
1,337✔
674
    pField->bytes = 8;
1,337✔
675
  } else {
676
    createReq.numOfTags = pStream->tagSchema.nCols;
290✔
677
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
290✔
678
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
290!
679

680
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
1,894✔
681
      SField *pField = taosArrayGet(createReq.pTags, i);
1,604✔
682
      if (pField == NULL) {
1,604!
UNCOV
683
        continue;
×
684
      }
685

686
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
1,604✔
687
      pField->flags = pStream->tagSchema.pSchema[i].flags;
1,604✔
688
      pField->type = pStream->tagSchema.pSchema[i].type;
1,604✔
689
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
1,604✔
690
    }
691
  }
692

693
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
1,627!
UNCOV
694
    goto _OVER;
×
695
  }
696

697
  pStb = mndAcquireStb(pMnode, createReq.name);
1,627✔
698
  if (pStb != NULL) {
1,627!
UNCOV
699
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
700
    goto _OVER;
×
701
  }
702

703
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
1,627✔
704
  if (pDb == NULL) {
1,627!
UNCOV
705
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
UNCOV
706
    goto _OVER;
×
707
  }
708

709
  int32_t numOfStbs = -1;
1,627✔
710
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
1,627!
UNCOV
711
    goto _OVER;
×
712
  }
713

714
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
1,627!
UNCOV
715
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
UNCOV
716
    goto _OVER;
×
717
  }
718

719
  SStbObj stbObj = {0};
1,627✔
720

721
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
1,627!
UNCOV
722
    goto _OVER;
×
723
  }
724

725
  stbObj.uid = pStream->targetStbUid;
1,627✔
726

727
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
1,627!
UNCOV
728
    mndFreeStb(&stbObj);
×
729
    goto _OVER;
×
730
  }
731

732
  tFreeSMCreateStbReq(&createReq);
1,627✔
733
  mndFreeStb(&stbObj);
1,627✔
734
  mndReleaseStb(pMnode, pStb);
1,627✔
735
  mndReleaseDb(pMnode, pDb);
1,627✔
736
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
1,627✔
737
  return code;
1,627✔
738

UNCOV
739
_OVER:
×
UNCOV
740
  tFreeSMCreateStbReq(&createReq);
×
UNCOV
741
  mndReleaseStb(pMnode, pStb);
×
UNCOV
742
  mndReleaseDb(pMnode, pDb);
×
743

UNCOV
744
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
745
         tstrerror(code));
UNCOV
746
  return code;
×
747
}
748

749
// 1. stream number check
750
// 2. target stable can not be target table of other existed streams.
751
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
1,781✔
752
  int32_t     numOfStream = 0;
1,781✔
753
  SStreamObj *pStream = NULL;
1,781✔
754
  void       *pIter = NULL;
1,781✔
755

756
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
4,144✔
757
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
2,364✔
758
      ++numOfStream;
1,644✔
759
    }
760

761

762
    if (numOfStream > MND_STREAM_MAX_NUM) {
2,364!
UNCOV
763
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
764
             pStreamObj->name);
UNCOV
765
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
766
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
767
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
768
    }
769

770
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
2,364✔
771
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
772
             pStreamObj->name);
773
      sdbRelease(pMnode->pSdb, pStream);
1✔
774
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
775
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
776
    }
777
    sdbRelease(pMnode->pSdb, pStream);
2,363✔
778
  }
779

780
  return TSDB_CODE_SUCCESS;
1,780✔
781
}
782

783
static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
×
784

785
static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
×
786
                                       SStreamTask *pTask) {
787
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
788
  int32_t lino = 0;
×
789

790
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
791
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
792

793
  pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
×
UNCOV
794
  TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
×
UNCOV
795
  pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
×
UNCOV
796
  pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
×
UNCOV
797
  pTask->notifyInfo.streamName = taosStrdup(mndGetDbStr(createReq->name));
×
UNCOV
798
  TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
×
UNCOV
799
  pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
×
UNCOV
800
  TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
×
UNCOV
801
  pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
×
UNCOV
802
  TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
×
803

UNCOV
804
_end:
×
UNCOV
805
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
806
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
807
  }
UNCOV
808
  return code;
×
809
}
810

811
static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
1,780✔
812
  int32_t code = TSDB_CODE_SUCCESS;
1,780✔
813
  int32_t lino = 0;
1,780✔
814
  int32_t level = 0;
1,780✔
815
  int32_t nTasks = 0;
1,780✔
816
  SArray *pLevel = NULL;
1,780✔
817

818
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,780!
819
  TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,780!
820

821
  if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
1,780!
822
    goto _end;
1,780✔
823
  }
824

825
  level = taosArrayGetSize(pStream->pTaskList);
×
826
  for (int32_t i = 0; i < level; ++i) {
×
827
    pLevel = taosArrayGetP(pStream->pTaskList, i);
×
UNCOV
828
    nTasks = taosArrayGetSize(pLevel);
×
UNCOV
829
    for (int32_t j = 0; j < nTasks; ++j) {
×
UNCOV
830
      code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
UNCOV
831
      TSDB_CHECK_CODE(code, lino, _end);
×
832
    }
833
  }
834

UNCOV
835
  if (pStream->conf.fillHistory && createReq->notifyHistory) {
×
UNCOV
836
    level = taosArrayGetSize(pStream->pHTaskList);
×
UNCOV
837
    for (int32_t i = 0; i < level; ++i) {
×
UNCOV
838
      pLevel = taosArrayGetP(pStream->pHTaskList, i);
×
UNCOV
839
      nTasks = taosArrayGetSize(pLevel);
×
UNCOV
840
      for (int32_t j = 0; j < nTasks; ++j) {
×
UNCOV
841
        code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
UNCOV
842
        TSDB_CHECK_CODE(code, lino, _end);
×
843
      }
844
    }
845
  }
846

847
_end:
×
848
  if (code != TSDB_CODE_SUCCESS) {
1,780!
UNCOV
849
    mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
×
850
  }
851
  return code;
1,780✔
852
}
853

854
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq) {
11✔
855
  SMnode     *pMnode = pReq->info.node;
11✔
856
  SStreamObj *pStream = NULL;
11✔
857
  void       *pIter = NULL;
11✔
858

859
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
42✔
860
    taosWLockLatch(&pStream->lock);
31✔
861
    if (pStream->status == STREAM_STATUS__INIT && (taosGetTimestampMs() - pStream->createTime > tsStreamFailedTimeout ||
32!
862
                                                   taosGetTimestampMs() - pStream->createTime < 0)){
×
863
      pStream->status = STREAM_STATUS__FAILED;
1✔
864
      tstrncpy(pStream->reserve, "timeout", sizeof(pStream->reserve));
1✔
865
      mInfo("stream:%s, set status to failed success because of timeout", pStream->name);
1!
866
    }
867
    taosWUnLockLatch(&pStream->lock);
31✔
868
    sdbRelease(pMnode->pSdb, pStream);
31✔
869
  }
870

871
  return 0;
11✔
872
}
873

874
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq) {
×
875
  SMnode     *pMnode = pReq->info.node;
×
876
  SStreamObj *pStream = NULL;
×
877
  int32_t     code = TSDB_CODE_SUCCESS;
×
UNCOV
878
  int32_t     errCode = *(int32_t*)pReq->pCont;
×
UNCOV
879
  char streamName[TSDB_STREAM_FNAME_LEN] = {0};
×
880
  memcpy(streamName, POINTER_SHIFT(pReq->pCont,INT_BYTES), TMIN(pReq->contLen - INT_BYTES, TSDB_STREAM_FNAME_LEN - 1));
×
881

882
#ifdef WINDOWS
883
  code = TSDB_CODE_MND_INVALID_PLATFORM;
884
  return code;
885
#endif
886

UNCOV
887
  mInfo("stream:%s, start to set stream failed", streamName);
×
888

UNCOV
889
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
UNCOV
890
  if (pStream == NULL) {
×
UNCOV
891
    mError("stream:%s, failed to get stream when failed stream since %s", streamName, tstrerror(code));
×
UNCOV
892
    return code;
×
893
  }
894

UNCOV
895
  taosWLockLatch(&pStream->lock);
×
UNCOV
896
  pStream->status = STREAM_STATUS__FAILED;
×
UNCOV
897
  tstrncpy(pStream->reserve, tstrerror(errCode), sizeof(pStream->reserve));
×
UNCOV
898
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
899
  mndReleaseStream(pMnode, pStream);
×
900

UNCOV
901
  mInfo("stream:%s, end to set stream failed success", streamName);
×
902

UNCOV
903
  return code;
×
904
}
905

906
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
1,785✔
907
  SMnode     *pMnode = pReq->info.node;
1,785✔
908
  SStreamObj *pStream = NULL;
1,785✔
909
  SStreamObj  streamObj = {0};
1,785✔
910
  char       *sql = NULL;
1,785✔
911
  int32_t     sqlLen = 0;
1,785✔
912
  const char *pMsg = "create stream tasks on dnodes";
1,785✔
913
  int32_t     code = TSDB_CODE_SUCCESS;
1,785✔
914
  int32_t     lino = 0;
1,785✔
915
  STrans     *pTrans = NULL;
1,785✔
916

917
  SCMCreateStreamReq createReq = {0};
1,785✔
918
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
1,785✔
919
  TSDB_CHECK_CODE(code, lino, _OVER);
1,785!
920

921
#ifdef WINDOWS
922
  code = TSDB_CODE_MND_INVALID_PLATFORM;
923
  goto _OVER;
924
#endif
925

926
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
1,785!
927
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
1,785!
UNCOV
928
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
UNCOV
929
    goto _OVER;
×
930
  }
931

932
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
1,785✔
933
  if (pStream != NULL && code == 0) {
1,785!
934
    if (pStream->pTaskList != NULL){
2!
935
      if (createReq.igExists) {
2✔
936
        mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
937
        mndReleaseStream(pMnode, pStream);
1✔
938
        tFreeSCMCreateStreamReq(&createReq);
1✔
939
        return code;
1✔
940
      } else {
941
        code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
942
        goto _OVER;
1✔
943
      }
944
    }
945
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
1,783!
946
    goto _OVER;
×
947
  }
948

949
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
1,783!
UNCOV
950
    goto _OVER;
×
951
  }
952

953
  if (createReq.sql != NULL) {
1,783!
954
    sql = taosStrdup(createReq.sql);
1,783!
955
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
1,783!
956
  }
957

958
  // check for the taskEp update trans
959
  if (isNodeUpdateTransActive()) {
1,783!
UNCOV
960
    mError("stream:%s failed to create stream, node update trans is active", createReq.name);
×
UNCOV
961
    code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
UNCOV
962
    goto _OVER;
×
963
  }
964

965
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
1,783✔
966
  if (pSourceDb == NULL) {
1,783!
967
    code = terrno;
×
UNCOV
968
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
969
          tstrerror(code));
UNCOV
970
    goto _OVER;
×
971
  }
972

973
  code = mndCheckForSnode(pMnode, pSourceDb);
1,783✔
974
  mndReleaseDb(pMnode, pSourceDb);
1,783✔
975
  if (code != 0) {
1,783✔
976
    goto _OVER;
2✔
977
  }
978

979
  // build stream obj from request
980
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
1,781!
981
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
UNCOV
982
    goto _OVER;
×
983
  }
984

985
  bool buildEmptyStream = false;
1,781✔
986
  if (createReq.lastTs == 0 && createReq.fillHistory != STREAM_FILL_HISTORY_OFF){
1,781!
UNCOV
987
    streamObj.status = STREAM_STATUS__INIT;
×
UNCOV
988
    buildEmptyStream = true;
×
989
  }
990

991
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
1,781!
992
    goto _OVER;
×
993
  }
994

995
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
1,781!
UNCOV
996
    goto _OVER;
×
997
  }
998

999
  code = doStreamCheck(pMnode, &streamObj);
1,781✔
1000
  TSDB_CHECK_CODE(code, lino, _OVER);
1,781✔
1001

1002
  // schedule stream task for stream obj
1003
  if (!buildEmptyStream) {
1,780!
1004
    code = mndScheduleStream(pMnode, &streamObj, &createReq);
1,780✔
1005
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,780!
UNCOV
1006
      mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
NEW
1007
      mndTransDrop(pTrans);
×
UNCOV
1008
      goto _OVER;
×
1009
    }
1010

1011
    // add notify info into all stream tasks
1012
    code = addStreamNotifyInfo(&createReq, &streamObj);
1,780✔
1013
    if (code != TSDB_CODE_SUCCESS) {
1,780!
UNCOV
1014
      mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
×
NEW
1015
      mndTransDrop(pTrans);
×
UNCOV
1016
      goto _OVER;
×
1017
    }
1018

1019
    // add into buffer firstly
1020
    // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
1021
    streamMutexLock(&execInfo.lock);
1,780✔
1022
    mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
1,780✔
1023
    saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
1,780✔
1024
    streamMutexUnlock(&execInfo.lock);
1,780✔
1025
  }
1026

1027
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
1,780✔
1028
  if (pTrans == NULL || code) {
1,780!
UNCOV
1029
    goto _OVER;
×
1030
  }
1031

1032
  // create stb for stream
1033
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE && !buildEmptyStream) {
1,780!
1034
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
1,627!
UNCOV
1035
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
UNCOV
1036
      goto _OVER;
×
1037
    }
1038
  } else {
1039
    mDebug("stream:%s no need create stable", createReq.name);
153✔
1040
  }
1041

1042
  // add stream to trans
1043
  code = mndPersistStream(pTrans, &streamObj);
1,780✔
1044
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,780!
UNCOV
1045
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
UNCOV
1046
    goto _OVER;
×
1047
  }
1048

1049
  // execute creation
1050
  code = mndTransPrepare(pMnode, pTrans);
1,780✔
1051
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,780!
UNCOV
1052
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
1053
    goto _OVER;
×
1054
  }
1055

1056
  SName dbname = {0};
1,780✔
1057
  if (tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) != 0) {
1,780!
UNCOV
1058
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
1059
  }
1060

1061
  SName name = {0};
1,780✔
1062
  if (tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE) != 0) {
1,780!
UNCOV
1063
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
1064
  }
1065

1066
  // reuse this function for stream
1067
  if (sql != NULL && sqlLen > 0) {
1,780!
UNCOV
1068
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
1069
  } else {
1070
    char detail[1000] = {0};
1,780✔
1071
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
1,780✔
1072
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
1,780✔
1073
  }
1074

1075
_OVER:
1,784✔
1076
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,784!
1077
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
4!
1078
  } else {
1079
    mDebug("stream:%s create stream completed", createReq.name);
1,780✔
1080
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,780✔
1081
  }
1082

1083
  mndTransDrop(pTrans);
1,784✔
1084
  mndReleaseStream(pMnode, pStream);
1,784✔
1085
  tFreeSCMCreateStreamReq(&createReq);
1,784✔
1086
  tFreeStreamObj(&streamObj);
1,784✔
1087

1088
  if (sql != NULL) {
1,784✔
1089
    taosMemoryFreeClear(sql);
1,783!
1090
  }
1091

1092
  return code;
1,784✔
1093
}
1094

UNCOV
1095
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
1096
  SMnode          *pMnode = pReq->info.node;
×
1097
  SStreamObj      *pStream = NULL;
×
UNCOV
1098
  int32_t          code = 0;
×
UNCOV
1099
  SMPauseStreamReq pauseReq = {0};
×
1100

1101
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1102
    return TSDB_CODE_INVALID_MSG;
×
1103
  }
1104

UNCOV
1105
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
UNCOV
1106
  if (pStream == NULL || code != 0) {
×
UNCOV
1107
    if (pauseReq.igNotExists) {
×
1108
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
1109
      return 0;
×
1110
    } else {
1111
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
UNCOV
1112
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1113
    }
1114
  }
1115

1116
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
1117
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
1118
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1119
    return code;
×
1120
  }
1121

1122
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
1123
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
1124
  if (code) {
×
1125
    sdbRelease(pMnode->pSdb, pStream);
×
1126
    return code;
×
1127
  }
1128

UNCOV
1129
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
1130
  if (updated) {
×
1131
    mError("tasks are not ready for restart, node update detected");
×
1132
    sdbRelease(pMnode->pSdb, pStream);
×
1133
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1134
  }
1135

UNCOV
1136
  STrans *pTrans = NULL;
×
UNCOV
1137
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
1138
                       &pTrans);
1139
  if (pTrans == NULL || code) {
×
1140
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1141
    sdbRelease(pMnode->pSdb, pStream);
×
1142
    return code;
×
1143
  }
1144

UNCOV
1145
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
1146
  if (code) {
×
1147
    sdbRelease(pMnode->pSdb, pStream);
×
1148
    mndTransDrop(pTrans);
×
1149
    return code;
×
1150
  }
1151

1152
  // if nodeUpdate happened, not send pause trans
UNCOV
1153
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
1154
  if (code) {
×
1155
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1156
    sdbRelease(pMnode->pSdb, pStream);
×
1157
    mndTransDrop(pTrans);
×
UNCOV
1158
    return code;
×
1159
  }
1160

UNCOV
1161
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1162
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1163
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1164
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1165
    mndTransDrop(pTrans);
×
UNCOV
1166
    return code;
×
1167
  }
1168

UNCOV
1169
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1170
  mndTransDrop(pTrans);
×
1171

UNCOV
1172
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1173
}
1174

1175
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
1,406✔
1176
  SStreamObj *pStream = NULL;
1,406✔
1177
  void       *pIter = NULL;
1,406✔
1178
  SSdb       *pSdb = pMnode->pSdb;
1,406✔
1179
  int64_t     maxChkptId = 0;
1,406✔
1180

1181
  while (1) {
1182
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4,906✔
1183
    if (pIter == NULL) break;
4,906✔
1184

1185
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
3,500✔
1186
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
3,500✔
1187
           pStream->checkpointId);
1188
    sdbRelease(pSdb, pStream);
3,500✔
1189
  }
1190

1191
  {  // check the max checkpoint id from all vnodes.
1192
    int64_t maxCheckpointId = -1;
1,406✔
1193
    if (lock) {
1,406✔
1194
      streamMutexLock(&execInfo.lock);
669✔
1195
    }
1196

1197
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
16,659✔
1198
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
15,253✔
1199
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
15,253✔
1200
      if (p == NULL || pEntry == NULL) {
15,253!
UNCOV
1201
        continue;
×
1202
      }
1203

1204
      if (pEntry->checkpointInfo.failed) {
15,253!
1205
        continue;
×
1206
      }
1207

1208
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
15,253✔
1209
        maxCheckpointId = pEntry->checkpointInfo.latestId;
1,892✔
1210
      }
1211
    }
1212

1213
    if (lock) {
1,406✔
1214
      streamMutexUnlock(&execInfo.lock);
669✔
1215
    }
1216

1217
    if (maxCheckpointId > maxChkptId) {
1,406!
UNCOV
1218
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1219
             maxCheckpointId);
UNCOV
1220
      maxChkptId = maxCheckpointId;
×
1221
    }
1222
  }
1223

1224
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
1,406✔
1225
  return maxChkptId + 1;
1,406✔
1226
}
1227

1228
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
1,408✔
1229
                                               int8_t mndTrigger, bool lock) {
1230
  int32_t code = TSDB_CODE_SUCCESS;
1,408✔
1231
  bool    conflict = false;
1,408✔
1232
  int64_t ts = taosGetTimestampMs();
1,408✔
1233
  STrans *pTrans = NULL;
1,408✔
1234

1235
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
1,408!
1236
    return code;
×
1237
  }
1238

1239
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
1,408✔
1240
  if (code) {
1,408!
1241
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1242
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
UNCOV
1243
    goto _ERR;
×
1244
  }
1245

1246
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
1,408✔
1247
                       "gen checkpoint for stream", &pTrans);
1248
  if (code) {
1,408!
UNCOV
1249
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1250
           tstrerror(code));
UNCOV
1251
    goto _ERR;
×
1252
  }
1253

1254
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
1,408✔
1255
  if (code) {
1,408!
UNCOV
1256
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
UNCOV
1257
    goto _ERR;
×
1258
  }
1259

1260
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
1,408✔
1261

1262
  taosWLockLatch(&pStream->lock);
1,408✔
1263
  pStream->currentTick = 1;
1,408✔
1264

1265
  // 1. redo action: broadcast checkpoint source msg for all source vg
1266
  int32_t totalLevel = taosArrayGetSize(pStream->pTaskList);
1,408✔
1267
  for (int32_t i = 0; i < totalLevel; i++) {
4,252✔
1268
    SArray      *pLevel = taosArrayGetP(pStream->pTaskList, i);
2,844✔
1269
    SStreamTask *p = taosArrayGetP(pLevel, 0);
2,844✔
1270

1271
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
2,844✔
1272
      int32_t sz = taosArrayGetSize(pLevel);
1,408✔
1273
      for (int32_t j = 0; j < sz; j++) {
4,705✔
1274
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
3,297✔
1275
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
3,297✔
1276

1277
        if (code != TSDB_CODE_SUCCESS) {
3,297!
UNCOV
1278
          taosWUnLockLatch(&pStream->lock);
×
UNCOV
1279
          goto _ERR;
×
1280
        }
1281
      }
1282
    }
1283
  }
1284

1285
  // 2. reset tick
1286
  pStream->checkpointId = checkpointId;
1,408✔
1287
  pStream->checkpointFreq = taosGetTimestampMs();
1,408✔
1288
  pStream->currentTick = 0;
1,408✔
1289

1290
  // 3. commit log: stream checkpoint info
1291
  pStream->version = pStream->version + 1;
1,408✔
1292
  taosWUnLockLatch(&pStream->lock);
1,408✔
1293

1294
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
1,408!
UNCOV
1295
    goto _ERR;
×
1296
  }
1297

1298
  code = mndTransPrepare(pMnode, pTrans);
1,408✔
1299
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,408!
1300
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1301
  } else {
1302
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,408✔
1303
  }
1304

1305
_ERR:
1,408✔
1306
  mndTransDrop(pTrans);
1,408✔
1307
  return code;
1,408✔
1308
}
1309

1310
int32_t extractStreamNodeList(SMnode *pMnode) {
3,840✔
1311
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
3,840✔
1312
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
764✔
1313
    if (code) {
764!
UNCOV
1314
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
1315
      return code;
×
1316
    }
1317
  }
1318

1319
  return taosArrayGetSize(execInfo.pNodeList);
3,840✔
1320
}
1321

1322
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,860✔
1323
  int32_t code = 0;
1,860✔
1324
  if (mndStreamNodeIsUpdated(pMnode)) {
1,860✔
1325
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
42✔
1326
  }
1327

1328
  streamMutexLock(&execInfo.lock);
1,818✔
1329
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,818✔
1330
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
764✔
1331
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
764!
UNCOV
1332
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
UNCOV
1333
      code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1334
    }
1335
  }
1336

1337
  streamMutexUnlock(&execInfo.lock);
1,818✔
1338
  return code;
1,818✔
1339
}
1340

1341
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
1,945✔
1342
  int64_t ts = -1;
1,945✔
1343
  int32_t taskId = -1;
1,945✔
1344

1345
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
32,653✔
1346
    STaskId          *p = taosArrayGet(pTaskList, i);
30,816✔
1347
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
30,816✔
1348
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
30,816!
1349
      continue;
23,882✔
1350
    }
1351

1352
    // -1 denote not ready now or never ready till now
1353
    if (pEntry->hTaskId != 0) {
6,934✔
1354
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
15!
1355
            " exists, checkpoint not issued",
1356
            pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1357
            pEntry->hTaskId);
1358
      return -1;
15✔
1359
    }
1360

1361
    if (pEntry->status != TASK_STATUS__READY) {
6,919✔
1362
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
93!
1363
            (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1364
      return -1;
93✔
1365
    }
1366

1367
    if (ts < pEntry->startTime) {
6,826✔
1368
      ts = pEntry->startTime;
4,083✔
1369
      taskId = pEntry->id.taskId;
4,083✔
1370
    }
1371
  }
1372

1373
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
1,837✔
1374
  return ts;
1,837✔
1375
}
1376

1377
typedef struct {
1378
  int64_t streamId;
1379
  int64_t duration;
1380
} SCheckpointInterval;
1381

1382
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
716✔
1383
  const SCheckpointInterval *pInt1 = p1;
716✔
1384
  const SCheckpointInterval *pInt2 = p2;
716✔
1385
  if (pInt1->duration == pInt2->duration) {
716✔
1386
    return 0;
54✔
1387
  }
1388

1389
  return pInt1->duration > pInt2->duration ? -1 : 1;
662✔
1390
}
1391

1392
// all tasks of this stream should be ready, otherwise do nothing
1393
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
1,945✔
1394
  bool ready = false;
1,945✔
1395

1396
  streamMutexLock(&execInfo.lock);
1,945✔
1397

1398
  int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
1,945✔
1399
  if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
1,945!
1400

1401
    if (lastReadyTs != -1) {
589✔
1402
      mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64
480!
1403
            "ms less than threshold",
1404
            pStream->uid, lastReadyTs, (now - lastReadyTs));
1405
    }
1406

1407
    ready = false;
589✔
1408
  } else {
1409
    ready = true;
1,356✔
1410
  }
1411

1412
  streamMutexUnlock(&execInfo.lock);
1,945✔
1413
  return ready;
1,945✔
1414
}
1415

1416
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,860✔
1417
  SMnode     *pMnode = pReq->info.node;
1,860✔
1418
  SSdb       *pSdb = pMnode->pSdb;
1,860✔
1419
  void       *pIter = NULL;
1,860✔
1420
  SStreamObj *pStream = NULL;
1,860✔
1421
  int32_t     code = 0;
1,860✔
1422
  int32_t     numOfCheckpointTrans = 0;
1,860✔
1423
  SArray     *pLongChkpts = NULL;
1,860✔
1424
  SArray     *pList = NULL;
1,860✔
1425
  int64_t     now = taosGetTimestampMs();
1,860✔
1426

1427
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,860✔
1428
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
42✔
1429
  }
1430

1431
  pList = taosArrayInit(4, sizeof(SCheckpointInterval));
1,818✔
1432
  if (pList == NULL) {
1,818!
UNCOV
1433
    mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
1434
    return terrno;
×
1435
  }
1436

1437
  pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo));
1,818✔
1438
  if (pLongChkpts == NULL) {
1,818!
UNCOV
1439
    mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
UNCOV
1440
    taosArrayDestroy(pList);
×
1441
    return terrno;
×
1442
  }
1443

1444
  // check if ongong checkpoint trans or long chkpt trans exist.
1445
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts);
1,818✔
1446
  if (code) {
1,818!
UNCOV
1447
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1448

UNCOV
1449
    taosArrayDestroy(pList);
×
UNCOV
1450
    taosArrayDestroy(pLongChkpts);
×
UNCOV
1451
    return code;
×
1452
  }
1453

1454
  // kill long exec checkpoint and set task status
1455
  if (taosArrayGetSize(pLongChkpts) > 0) {
1,818!
UNCOV
1456
    killChkptAndResetStreamTask(pMnode, pLongChkpts);
×
1457

UNCOV
1458
    taosArrayDestroy(pList);
×
UNCOV
1459
    taosArrayDestroy(pLongChkpts);
×
UNCOV
1460
    return TSDB_CODE_SUCCESS;
×
1461
  }
1462

1463
  taosArrayDestroy(pLongChkpts);
1,818✔
1464

1465
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
4,676✔
1466
    int64_t duration = now - pStream->checkpointFreq;
2,858✔
1467
    if (duration < tsStreamCheckpointInterval * 1000) {
2,858✔
1468
      sdbRelease(pSdb, pStream);
913✔
1469
      continue;
1,502✔
1470
    }
1471

1472
    bool ready = isStreamReadyHelp(now, pStream);
1,945✔
1473
    if (!ready) {
1,945✔
1474
      sdbRelease(pSdb, pStream);
589✔
1475
      continue;
589✔
1476
    }
1477

1478
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
1,356✔
1479
    void               *p = taosArrayPush(pList, &in);
1,356✔
1480
    if (p) {
1,356!
1481
      int32_t currentSize = taosArrayGetSize(pList);
1,356✔
1482
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
1,356✔
1483
             "s), concurrently launch threshold:%d",
1484
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1485
             tsMaxConcurrentCheckpoint);
1486
    } else {
1487
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1488
    }
1489
    sdbRelease(pSdb, pStream);
1,356✔
1490
  }
1491

1492
  int32_t size = taosArrayGetSize(pList);
1,818✔
1493
  if (size == 0) {
1,818✔
1494
    taosArrayDestroy(pList);
1,149✔
1495
    return code;
1,149✔
1496
  }
1497

1498
  taosArraySort(pList, streamWaitComparFn);
669✔
1499

1500
  int32_t numOfQual = taosArrayGetSize(pList);
669✔
1501
  if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
669!
UNCOV
1502
    mDebug(
×
1503
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1504
        "checkpoint trans are not allowed, wait for 30s",
1505
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
UNCOV
1506
    taosArrayDestroy(pList);
×
1507
    return code;
×
1508
  }
1509

1510
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
669✔
1511
  mDebug(
669✔
1512
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1513
      "concurrent trans threshold:%d",
1514
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1515

1516
  int32_t started = 0;
669✔
1517
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
669✔
1518

1519
  for (int32_t i = 0; i < numOfQual; ++i) {
673✔
1520
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
671✔
1521
    if (pCheckpointInfo == NULL) {
671!
UNCOV
1522
      continue;
×
1523
    }
1524

1525
    SStreamObj *p = NULL;
671✔
1526
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
671✔
1527
    if (p != NULL && code == 0) {
671!
1528
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
671✔
1529
      sdbRelease(pSdb, p);
671✔
1530

1531
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
671!
1532
        started += 1;
671✔
1533

1534
        if (started >= capacity) {
671✔
1535
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
667✔
1536
                 (started + numOfCheckpointTrans));
1537
          break;
667✔
1538
        }
1539
      } else {
UNCOV
1540
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1541
      }
1542
    }
1543
  }
1544

1545
  taosArrayDestroy(pList);
669✔
1546
  return code;
669✔
1547
}
1548

1549
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
1,415✔
1550
  SMnode     *pMnode = pReq->info.node;
1,415✔
1551
  SStreamObj *pStream = NULL;
1,415✔
1552
  int32_t     code = 0;
1,415✔
1553

1554
  SMDropStreamReq dropReq = {0};
1,415✔
1555
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
1,415!
UNCOV
1556
    mError("invalid drop stream msg recv, discarded");
×
UNCOV
1557
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1558
    TAOS_RETURN(code);
×
1559
  }
1560

1561
  mDebug("recv drop stream:%s msg", dropReq.name);
1,415✔
1562

1563
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
1,415✔
1564
  if (pStream == NULL || code != 0) {
1,415!
1565
    if (dropReq.igNotExists) {
141✔
1566
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
131!
1567
      sdbRelease(pMnode->pSdb, pStream);
131✔
1568
      tFreeMDropStreamReq(&dropReq);
131✔
1569
      return 0;
131✔
1570
    } else {
1571
      mError("stream:%s not exist failed to drop it", dropReq.name);
10!
1572
      tFreeMDropStreamReq(&dropReq);
10✔
1573
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
10✔
1574
    }
1575
  }
1576

1577
  if (pStream->smaId != 0) {
1,274✔
1578
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
219!
1579

1580
    void    *pIter = NULL;
219✔
1581
    SSmaObj *pSma = NULL;
219✔
1582
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
219✔
1583
    while (pIter) {
361✔
1584
      if (pSma && pSma->uid == pStream->smaId) {
147!
1585
        sdbRelease(pMnode->pSdb, pSma);
5✔
1586
        sdbRelease(pMnode->pSdb, pStream);
5✔
1587

1588
        sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1589
        tFreeMDropStreamReq(&dropReq);
5✔
1590
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
5✔
1591

1592
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
5!
1593
               dropReq.name, pStream->uid, tstrerror(terrno));
1594
        TAOS_RETURN(code);
5✔
1595
      }
1596

1597
      if (pSma) {
142!
1598
        sdbRelease(pMnode->pSdb, pSma);
142✔
1599
      }
1600

1601
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
142✔
1602
    }
1603
  }
1604

1605
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
1,269!
UNCOV
1606
    sdbRelease(pMnode->pSdb, pStream);
×
1607
    tFreeMDropStreamReq(&dropReq);
×
1608
    return -1;
×
1609
  }
1610

1611
  // check if it is conflict with other trans in both sourceDb and targetDb.
1612
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
1,269✔
1613
  if (code) {
1,269!
UNCOV
1614
    sdbRelease(pMnode->pSdb, pStream);
×
1615
    tFreeMDropStreamReq(&dropReq);
×
1616
    return code;
×
1617
  }
1618

1619
  STrans *pTrans = NULL;
1,269✔
1620
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
1,269✔
1621
  if (pTrans == NULL || code) {
1,269!
UNCOV
1622
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
UNCOV
1623
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1624
    tFreeMDropStreamReq(&dropReq);
×
1625
    TAOS_RETURN(code);
×
1626
  }
1627

1628
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
1,269✔
1629
  if (code) {
1,269!
UNCOV
1630
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
UNCOV
1631
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1632
    mndTransDrop(pTrans);
×
UNCOV
1633
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1634
    TAOS_RETURN(code);
×
1635
  }
1636

1637
  // drop all tasks
1638
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
1,269✔
1639
  if (code) {
1,269!
UNCOV
1640
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
UNCOV
1641
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1642
    mndTransDrop(pTrans);
×
1643
    tFreeMDropStreamReq(&dropReq);
×
1644
    TAOS_RETURN(code);
×
1645
  }
1646

1647
  // drop stream
1648
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
1,269✔
1649
  if (code) {
1,269!
UNCOV
1650
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1651
    mndTransDrop(pTrans);
×
UNCOV
1652
    tFreeMDropStreamReq(&dropReq);
×
1653
    TAOS_RETURN(code);
×
1654
  }
1655

1656
  code = mndTransPrepare(pMnode, pTrans);
1,269✔
1657
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,269!
UNCOV
1658
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1659
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1660
    mndTransDrop(pTrans);
×
UNCOV
1661
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1662
    TAOS_RETURN(code);
×
1663
  }
1664

1665
  // kill the related checkpoint trans
1666
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
1,269✔
1667
  if (transId != 0) {
1,269!
UNCOV
1668
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
UNCOV
1669
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1670
  }
1671

1672
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
1,269✔
1673
         pStream->uid, transId);
1674

1675
  removeStreamTasksInBuf(pStream, &execInfo);
1,269✔
1676

1677
  SName name = {0};
1,269✔
1678
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,269✔
1679
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
1,269✔
1680

1681
  sdbRelease(pMnode->pSdb, pStream);
1,269✔
1682
  mndTransDrop(pTrans);
1,269✔
1683
  tFreeMDropStreamReq(&dropReq);
1,269✔
1684

1685
  if (code == 0) {
1,269✔
1686
    return TSDB_CODE_ACTION_IN_PROGRESS;
1,259✔
1687
  } else {
1688
    TAOS_RETURN(code);
10✔
1689
  }
1690
}
1691

1692
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,923✔
1693
  SSdb   *pSdb = pMnode->pSdb;
1,923✔
1694
  void   *pIter = NULL;
1,923✔
1695
  int32_t code = 0;
1,923✔
1696

1697
  while (1) {
1,033✔
1698
    SStreamObj *pStream = NULL;
2,956✔
1699
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,956✔
1700
    if (pIter == NULL) break;
2,956✔
1701

1702
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
1,034✔
1703
      if (pStream->sourceDbUid != pStream->targetDbUid) {
80✔
1704
        sdbRelease(pSdb, pStream);
1✔
1705
        sdbCancelFetch(pSdb, pIter);
1✔
1706
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1707
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1708
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1709
      } else {
1710
        // kill the related checkpoint trans
1711
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
79✔
1712
        if (transId != 0) {
79!
UNCOV
1713
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
UNCOV
1714
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1715
        }
1716

1717
        // drop the stream obj in execInfo
1718
        removeStreamTasksInBuf(pStream, &execInfo);
79✔
1719

1720
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
79✔
1721
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
79!
UNCOV
1722
          sdbRelease(pSdb, pStream);
×
UNCOV
1723
          sdbCancelFetch(pSdb, pIter);
×
UNCOV
1724
          return code;
×
1725
        }
1726
      }
1727
    }
1728

1729
    sdbRelease(pSdb, pStream);
1,033✔
1730
  }
1731

1732
  return 0;
1,922✔
1733
}
1734

1735
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,998✔
1736
  SMnode     *pMnode = pReq->info.node;
11,998✔
1737
  SSdb       *pSdb = pMnode->pSdb;
11,998✔
1738
  int32_t     numOfRows = 0;
11,998✔
1739
  SStreamObj *pStream = NULL;
11,998✔
1740
  int32_t     code = 0;
11,998✔
1741

1742
  while (numOfRows < rows) {
47,393!
1743
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
47,393✔
1744
    if (pShow->pIter == NULL) break;
47,400✔
1745

1746
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
35,398✔
1747
    if (code == 0) {
35,334!
1748
      numOfRows++;
35,335✔
1749
    }
1750
    sdbRelease(pSdb, pStream);
35,334✔
1751
  }
1752

1753
  pShow->numOfRows += numOfRows;
12,002✔
1754
  return numOfRows;
12,002✔
1755
}
1756

UNCOV
1757
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
UNCOV
1758
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1759
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
1760
}
×
1761

1762
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
21,651✔
1763
  SMnode     *pMnode = pReq->info.node;
21,651✔
1764
  SSdb       *pSdb = pMnode->pSdb;
21,651✔
1765
  int32_t     numOfRows = 0;
21,651✔
1766
  SStreamObj *pStream = NULL;
21,651✔
1767
  int32_t     code = 0;
21,651✔
1768

1769
  streamMutexLock(&execInfo.lock);
21,651✔
1770
  mndInitStreamExecInfo(pMnode, &execInfo);
21,659✔
1771
  streamMutexUnlock(&execInfo.lock);
21,659✔
1772

1773
  while (numOfRows < rowsCapacity) {
87,277✔
1774
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
87,227✔
1775
    if (pShow->pIter == NULL) {
87,215✔
1776
      break;
21,609✔
1777
    }
1778

1779
    // lock
1780
    taosRLockLatch(&pStream->lock);
65,606✔
1781

1782
    int32_t count = mndGetNumOfStreamTasks(pStream);
65,618✔
1783
    if (numOfRows + count > rowsCapacity) {
65,573✔
1784
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
40✔
1785
      if (code) {
40!
UNCOV
1786
        mError("failed to prepare the result block buffer, quit return value");
×
UNCOV
1787
        taosRUnLockLatch(&pStream->lock);
×
UNCOV
1788
        sdbRelease(pSdb, pStream);
×
1789
        continue;
×
1790
      }
1791
    }
1792

1793
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
65,573✔
1794
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
65,573✔
1795
    if (pSourceDb != NULL) {
65,596!
1796
      precision = pSourceDb->cfg.precision;
65,602✔
1797
      mndReleaseDb(pMnode, pSourceDb);
65,602✔
1798
    }
1799

1800
    // add row for each task
1801
    SStreamTaskIter *pIter = NULL;
65,614✔
1802
    code = createStreamTaskIter(pStream, &pIter);
65,614✔
1803
    if (code) {
65,617!
UNCOV
1804
      taosRUnLockLatch(&pStream->lock);
×
UNCOV
1805
      sdbRelease(pSdb, pStream);
×
UNCOV
1806
      mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
1807
      continue;
×
1808
    }
1809

1810
    while (streamTaskIterNextTask(pIter)) {
291,788✔
1811
      SStreamTask *pTask = NULL;
226,071✔
1812
      code = streamTaskIterGetCurrent(pIter, &pTask);
226,071✔
1813
      if (code) {
226,177!
UNCOV
1814
        destroyStreamTaskIter(pIter);
×
UNCOV
1815
        break;
×
1816
      }
1817

1818
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
226,177✔
1819
      if (code == TSDB_CODE_SUCCESS) {
226,171!
1820
        numOfRows++;
226,173✔
1821
      }
1822
    }
1823

1824
    pBlock->info.rows = numOfRows;
65,273✔
1825

1826
    destroyStreamTaskIter(pIter);
65,273✔
1827
    taosRUnLockLatch(&pStream->lock);
65,576✔
1828

1829
    sdbRelease(pSdb, pStream);
65,610✔
1830
  }
1831

1832
  pShow->numOfRows += numOfRows;
21,659✔
1833
  return numOfRows;
21,659✔
1834
}
1835

UNCOV
1836
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
UNCOV
1837
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1838
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
1839
}
×
1840

1841
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
676✔
1842
  SMnode     *pMnode = pReq->info.node;
676✔
1843
  SStreamObj *pStream = NULL;
676✔
1844
  int32_t     code = 0;
676✔
1845

1846
  SMPauseStreamReq pauseReq = {0};
676✔
1847
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
676!
UNCOV
1848
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1849
  }
1850

1851
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
676✔
1852
  if (pStream == NULL || code != 0) {
676!
1853
    if (pauseReq.igNotExists) {
338✔
1854
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
85!
1855
      return 0;
85✔
1856
    } else {
1857
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
253!
1858
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
253✔
1859
    }
1860
  }
1861

1862
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
338!
1863

1864
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
338!
1865
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1866
    return code;
×
1867
  }
1868

1869
  // check if it is conflict with other trans in both sourceDb and targetDb.
1870
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
338✔
1871
  if (code) {
338!
UNCOV
1872
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1873
    TAOS_RETURN(code);
×
1874
  }
1875

1876
  bool updated = mndStreamNodeIsUpdated(pMnode);
338✔
1877
  if (updated) {
338!
UNCOV
1878
    mError("tasks are not ready for pause, node update detected");
×
UNCOV
1879
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1880
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1881
  }
1882

1883
  {  // check for tasks, if tasks are not ready, not allowed to pause
1884
    bool found = false;
338✔
1885
    bool readyToPause = true;
338✔
1886
    streamMutexLock(&execInfo.lock);
338✔
1887

1888
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
4,915✔
1889
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
4,577✔
1890
      if (p == NULL) {
4,577!
UNCOV
1891
        continue;
×
1892
      }
1893

1894
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
4,577✔
1895
      if (pEntry == NULL) {
4,577!
UNCOV
1896
        continue;
×
1897
      }
1898

1899
      if (pEntry->id.streamId != pStream->uid) {
4,577✔
1900
        continue;
2,959✔
1901
      }
1902

1903
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
1,618!
1904
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
160!
1905
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1906
        readyToPause = false;
160✔
1907
      }
1908

1909
      found = true;
1,618✔
1910
    }
1911

1912
    streamMutexUnlock(&execInfo.lock);
338✔
1913
    if (!found) {
338!
1914
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1915
      sdbRelease(pMnode->pSdb, pStream);
×
1916
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1917
    }
1918

1919
    if (!readyToPause) {
338✔
1920
      mError("stream:%s task not ready for pause yet", pauseReq.name);
45!
1921
      sdbRelease(pMnode->pSdb, pStream);
45✔
1922
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
45✔
1923
    }
1924
  }
1925

1926
  STrans *pTrans = NULL;
293✔
1927
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
293✔
1928
  if (pTrans == NULL || code) {
293!
1929
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1930
    sdbRelease(pMnode->pSdb, pStream);
×
1931
    return code;
×
1932
  }
1933

1934
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
293✔
1935
  if (code) {
293!
UNCOV
1936
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1937
    mndTransDrop(pTrans);
×
UNCOV
1938
    return code;
×
1939
  }
1940

1941
  // if nodeUpdate happened, not send pause trans
1942
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
293✔
1943
  if (code) {
293!
UNCOV
1944
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1945
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1946
    mndTransDrop(pTrans);
×
UNCOV
1947
    return code;
×
1948
  }
1949

1950
  // pause stream
1951
  taosWLockLatch(&pStream->lock);
293✔
1952
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
293✔
1953
  if (code) {
293!
UNCOV
1954
    taosWUnLockLatch(&pStream->lock);
×
UNCOV
1955
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1956
    mndTransDrop(pTrans);
×
UNCOV
1957
    return code;
×
1958
  }
1959

1960
  taosWUnLockLatch(&pStream->lock);
293✔
1961

1962
  code = mndTransPrepare(pMnode, pTrans);
293✔
1963
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
293!
UNCOV
1964
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1965
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1966
    mndTransDrop(pTrans);
×
1967
    return code;
×
1968
  }
1969

1970
  sdbRelease(pMnode->pSdb, pStream);
293✔
1971
  mndTransDrop(pTrans);
293✔
1972

1973
  return TSDB_CODE_ACTION_IN_PROGRESS;
293✔
1974
}
1975

1976
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
714✔
1977
  SMnode     *pMnode = pReq->info.node;
714✔
1978
  SStreamObj *pStream = NULL;
714✔
1979
  int32_t     code = 0;
714✔
1980

1981
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
714!
UNCOV
1982
    return code;
×
1983
  }
1984

1985
  SMResumeStreamReq resumeReq = {0};
714✔
1986
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
714!
UNCOV
1987
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1988
  }
1989

1990
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
714✔
1991
  if (pStream == NULL || code != 0) {
714!
1992
    if (resumeReq.igNotExists) {
170✔
1993
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
169!
1994
      sdbRelease(pMnode->pSdb, pStream);
169✔
1995
      return 0;
169✔
1996
    } else {
1997
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1998
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1999
    }
2000
  }
2001

2002
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
544!
2003
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
544!
2004
    sdbRelease(pMnode->pSdb, pStream);
×
2005
    return -1;
×
2006
  }
2007

2008
  // check if it is conflict with other trans in both sourceDb and targetDb.
2009
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
544✔
2010
  if (code) {
544!
2011
    sdbRelease(pMnode->pSdb, pStream);
×
2012
    return code;
×
2013
  }
2014

2015
  STrans *pTrans = NULL;
544✔
2016
  code =
2017
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
544✔
2018
  if (pTrans == NULL || code) {
544!
2019
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
2020
    sdbRelease(pMnode->pSdb, pStream);
×
2021
    return code;
×
2022
  }
2023

2024
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
544✔
2025
  if (code) {
544!
UNCOV
2026
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2027
    mndTransDrop(pTrans);
×
UNCOV
2028
    return code;
×
2029
  }
2030

2031
  // set the resume action
2032
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
544✔
2033
  if (code) {
544!
UNCOV
2034
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
UNCOV
2035
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2036
    mndTransDrop(pTrans);
×
UNCOV
2037
    return code;
×
2038
  }
2039

2040
  // resume stream
2041
  taosWLockLatch(&pStream->lock);
544✔
2042
  pStream->status = STREAM_STATUS__NORMAL;
544✔
2043
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
544!
UNCOV
2044
    taosWUnLockLatch(&pStream->lock);
×
2045

UNCOV
2046
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2047
    mndTransDrop(pTrans);
×
UNCOV
2048
    return code;
×
2049
  }
2050

2051
  taosWUnLockLatch(&pStream->lock);
544✔
2052
  code = mndTransPrepare(pMnode, pTrans);
544✔
2053
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
544!
2054
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
2055
    sdbRelease(pMnode->pSdb, pStream);
×
2056
    mndTransDrop(pTrans);
×
2057
    return code;
×
2058
  }
2059

2060
  sdbRelease(pMnode->pSdb, pStream);
544✔
2061
  mndTransDrop(pTrans);
544✔
2062

2063
  return TSDB_CODE_ACTION_IN_PROGRESS;
544✔
2064
}
2065

UNCOV
2066
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
2067
  SMnode     *pMnode = pReq->info.node;
×
2068
  SStreamObj *pStream = NULL;
×
2069
  int32_t     code = 0;
×
2070

2071
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
UNCOV
2072
    return code;
×
2073
  }
2074

UNCOV
2075
  SMResetStreamReq resetReq = {0};
×
UNCOV
2076
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
UNCOV
2077
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
2078
  }
2079

UNCOV
2080
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
2081

UNCOV
2082
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
UNCOV
2083
  if (pStream == NULL || code != 0) {
×
UNCOV
2084
    if (resetReq.igNotExists) {
×
UNCOV
2085
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
UNCOV
2086
      return 0;
×
2087
    } else {
UNCOV
2088
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
UNCOV
2089
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2090
    }
2091
  }
2092

2093
  //todo(liao hao jun)
UNCOV
2094
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
2095
}
2096

2097
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes, STrans** pUpdateTrans) {
9✔
2098
  SSdb       *pSdb = pMnode->pSdb;
9✔
2099
  void       *pIter = NULL;
9✔
2100
  STrans     *pTrans = NULL;
9✔
2101
  int32_t     code = 0;
9✔
2102
  *pUpdateTrans = NULL;
9✔
2103

2104
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
2105
  while (1) {
9✔
2106
    SStreamObj *pStream = NULL;
18✔
2107
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
18✔
2108
    if (pIter == NULL) {
18✔
2109
      break;
9✔
2110
    }
2111

2112
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
9✔
2113
    sdbRelease(pSdb, pStream);
9✔
2114

2115
    if (code) {
9!
UNCOV
2116
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
UNCOV
2117
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
2118
      return code;
×
2119
    }
2120
  }
2121

2122
  while (1) {
9✔
2123
    SStreamObj *pStream = NULL;
18✔
2124
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
18✔
2125
    if (pIter == NULL) {
18✔
2126
      break;
9✔
2127
    }
2128

2129
    // here create only one trans
2130
    if (pTrans == NULL) {
9!
2131
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
9✔
2132
                           "update task epsets", &pTrans);
2133
      if (pTrans == NULL || code) {
9!
UNCOV
2134
        sdbRelease(pSdb, pStream);
×
UNCOV
2135
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
2136
        return terrno = code;
×
2137
      }
2138
    }
2139

2140
    if (!includeAllNodes) {
9!
2141
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
9✔
2142
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
9✔
2143
      if (p1 == NULL && p2 == NULL) {
9!
UNCOV
2144
        mDebug("stream:0x%" PRIx64 " %s not involved in nodeUpdate, ignore", pStream->uid, pStream->name);
×
UNCOV
2145
        sdbRelease(pSdb, pStream);
×
UNCOV
2146
        continue;
×
2147
      }
2148
    }
2149

2150
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
9✔
2151
           pStream->name, pTrans->id);
2152

2153
    // NOTE: for each stream, we register one trans entry for task update
2154
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
9✔
2155
    if (code) {
9!
UNCOV
2156
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
2157
    }
2158

2159
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
9✔
2160

2161
    // todo: not continue, drop all and retry again
2162
    if (code != TSDB_CODE_SUCCESS) {
9!
UNCOV
2163
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
2164
             tstrerror(code));
UNCOV
2165
      sdbRelease(pSdb, pStream);
×
UNCOV
2166
      continue;
×
2167
    }
2168

2169
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
9✔
2170
    sdbRelease(pSdb, pStream);
9✔
2171

2172
    if (code != TSDB_CODE_SUCCESS) {
9!
UNCOV
2173
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
2174
      return code;
×
2175
    }
2176
  }
2177

2178
  // no need to build the trans to handle the vgroup update
2179
  *pUpdateTrans = pTrans;
9✔
2180
  return code;
9✔
2181
}
2182

2183
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
773✔
2184
  SSdb       *pSdb = pMnode->pSdb;
773✔
2185
  SStreamObj *pStream = NULL;
773✔
2186
  void       *pIter = NULL;
773✔
2187
  int32_t     code = 0;
773✔
2188

2189
  mDebug("start to refresh node list by existed streams");
773✔
2190

2191
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
773✔
2192
  if (pHash == NULL) {
773!
2193
    return terrno;
×
2194
  }
2195

2196
  while (1) {
9✔
2197
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
782✔
2198
    if (pIter == NULL) {
782✔
2199
      break;
773✔
2200
    }
2201

2202
    taosWLockLatch(&pStream->lock);
9✔
2203

2204
    SStreamTaskIter *pTaskIter = NULL;
9✔
2205
    code = createStreamTaskIter(pStream, &pTaskIter);
9✔
2206
    if (code) {
9!
UNCOV
2207
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
2208
      sdbRelease(pSdb, pStream);
×
2209
      mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
2210
      continue;
×
2211
    }
2212

2213
    while (streamTaskIterNextTask(pTaskIter)) {
64✔
2214
      SStreamTask *pTask = NULL;
55✔
2215
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
55✔
2216
      if (code) {
55!
UNCOV
2217
        break;
×
2218
      }
2219

2220
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
55✔
2221
      epsetAssign(&entry.epset, &pTask->info.epSet);
55✔
2222
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
55✔
2223
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
55!
UNCOV
2224
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2225
      }
2226
    }
2227

2228
    destroyStreamTaskIter(pTaskIter);
9✔
2229
    taosWUnLockLatch(&pStream->lock);
9✔
2230

2231
    sdbRelease(pSdb, pStream);
9✔
2232
  }
2233

2234
  taosArrayClear(pNodeList);
773✔
2235

2236
  // convert to list
2237
  pIter = NULL;
773✔
2238
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
802✔
2239
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
29✔
2240

2241
    void *p = taosArrayPush(pNodeList, pEntry);
29✔
2242
    if (p == NULL) {
29!
UNCOV
2243
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
UNCOV
2244
      if (code == 0) {
×
UNCOV
2245
        code = terrno;
×
2246
      }
UNCOV
2247
      continue;
×
2248
    }
2249

2250
    char    buf[256] = {0};
29✔
2251
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
29✔
2252
    if (ret != 0) {                                                // print error and continue
29!
2253
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2254
    }
2255

2256
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
29✔
2257
  }
2258

2259
  taosHashCleanup(pHash);
773✔
2260

2261
  mDebug("numOfvNodes:%d get after extracting nodeInfo from all streams", (int32_t)taosArrayGetSize(pNodeList));
773✔
2262
  return code;
773✔
2263
}
2264

2265
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
UNCOV
2266
  void   *pIter = NULL;
×
UNCOV
2267
  int32_t code = 0;
×
2268
  while (1) {
×
UNCOV
2269
    SVgObj *pVgroup = NULL;
×
UNCOV
2270
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
2271
    if (pIter == NULL) {
×
UNCOV
2272
      break;
×
2273
    }
2274

2275
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
UNCOV
2276
    sdbRelease(pSdb, pVgroup);
×
2277

UNCOV
2278
    if (code == 0) {
×
UNCOV
2279
      int32_t size = taosHashGetSize(pDBMap);
×
2280
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2281
    }
2282
  }
UNCOV
2283
}
×
2284

2285
static int32_t doProcessNodeCheckHelp(SArray *pNodeSnapshot, SMnode *pMnode, SVgroupChangeInfo *pChangeInfo,
1,601✔
2286
                                      bool *pUpdateAllVgroups) {
2287
  int32_t code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
1,601✔
2288
  if (code) {
1,601!
2289
    mDebug("failed to remove expired node entry in buf, code:%s", tstrerror(code));
×
UNCOV
2290
    return code;
×
2291
  }
2292

2293
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, pChangeInfo);
1,601✔
2294
  if (code) {
1,601!
UNCOV
2295
    mDebug("failed to find changed vnode(s) during vnode(s) check, code:%s", tstrerror(code));
×
UNCOV
2296
    return code;
×
2297
  }
2298

2299
  {
2300
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
1,601!
UNCOV
2301
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
UNCOV
2302
      *pUpdateAllVgroups = true;
×
UNCOV
2303
      execInfo.switchFromFollower = false;  // reset the flag
×
UNCOV
2304
      addAllDbsIntoHashmap(pChangeInfo->pDBMap, pMnode->pSdb);
×
2305
    }
2306
  }
2307

2308
  if (taosArrayGetSize(pChangeInfo->pUpdateNodeList) > 0 || (*pUpdateAllVgroups)) {
1,601!
2309
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2310
    killAllCheckpointTrans(pMnode, pChangeInfo);
9✔
2311
  } else {
2312
    mDebug("no update found in vnode(s) list");
1,592✔
2313
  }
2314

2315
  return code;
1,601✔
2316
}
2317

2318
// this function runs by only one thread, so it is not multi-thread safe
2319
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
1,642✔
2320
  int32_t           code = 0;
1,642✔
2321
  bool              allReady = true;
1,642✔
2322
  SArray           *pNodeSnapshot = NULL;
1,642✔
2323
  SMnode           *pMnode = pMsg->info.node;
1,642✔
2324
  int64_t           tsms = taosGetTimestampMs();
1,642✔
2325
  int64_t           ts = tsms / 1000;
1,642✔
2326
  bool              updateAllVgroups = false;
1,642✔
2327
  SVgroupChangeInfo changeInfo = {0};
1,642✔
2328

2329
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
1,642✔
2330
  if (old != 0) {
1,642!
UNCOV
2331
    mDebug("still in checking node change");
×
UNCOV
2332
    return 0;
×
2333
  }
2334

2335
  mDebug("start to do node changing check, ts:%" PRId64, tsms);
1,642✔
2336

2337
  streamMutexLock(&execInfo.lock);
1,642✔
2338
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,642✔
2339
  streamMutexUnlock(&execInfo.lock);
1,642✔
2340

2341
  if (numOfNodes == 0) {
1,642!
UNCOV
2342
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
UNCOV
2343
    execInfo.ts = ts;
×
UNCOV
2344
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
UNCOV
2345
    return 0;
×
2346
  }
2347

2348
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
1,642✔
2349
  if (code) {
1,642!
2350
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2351
  }
2352

2353
  if (!allReady) {
1,642✔
2354
    taosArrayDestroy(pNodeSnapshot);
41✔
2355
    atomic_store_32(&mndNodeCheckSentinel, 0);
41✔
2356
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
41!
2357
    return 0;
41✔
2358
  }
2359

2360
  streamMutexLock(&execInfo.lock);
1,601✔
2361
  code = doProcessNodeCheckHelp(pNodeSnapshot, pMnode, &changeInfo, &updateAllVgroups);
1,601✔
2362
  streamMutexUnlock(&execInfo.lock);
1,601✔
2363

2364
  if (code) {
1,601!
UNCOV
2365
    goto _end;
×
2366
  }
2367

2368
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
1,601!
2369
    mDebug("vnode(s) change detected, build trans to update stream task epsets");
9✔
2370

2371
    STrans *pTrans = NULL;
9✔
2372

2373
    streamMutexLock(&execInfo.lock);
9✔
2374
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans);
9✔
2375
    streamMutexUnlock(&execInfo.lock);
9✔
2376

2377
    // NOTE: sync trans out of lock
2378
    if (code == 0 && pTrans != NULL) {
9!
2379
      code = mndTransPrepare(pMnode, pTrans);
9✔
2380
      if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
9!
UNCOV
2381
        mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2382
      }
2383

2384
      mndTransDrop(pTrans);
9✔
2385
    }
2386

2387
    // keep the new vnode snapshot if success
2388
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
9!
2389
      streamMutexLock(&execInfo.lock);
9✔
2390

2391
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
9✔
2392
      int32_t num = (int)taosArrayGetSize(execInfo.pNodeList);
9✔
2393
      if (code == 0) {
9!
2394
        execInfo.ts = ts;
9✔
2395
        mDebug("create trans successfully, update cached node list, numOfNodes:%d", num);
9✔
2396
      }
2397

2398
      streamMutexUnlock(&execInfo.lock);
9✔
2399

2400
      if (code) {
9!
UNCOV
2401
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
2402
        goto _end;
×
2403
      }
2404
    }
2405
  }
2406

2407
  mndDestroyVgroupChangeInfo(&changeInfo);
1,601✔
2408

2409
_end:
1,601✔
2410
  taosArrayDestroy(pNodeSnapshot);
1,601✔
2411

2412
  mDebug("end to do stream task node change checking, elapsed time:%" PRId64 "ms", taosGetTimestampMs() - tsms);
2,001✔
2413
  atomic_store_32(&mndNodeCheckSentinel, 0);
1,601✔
2414

2415
  return 0;
1,601✔
2416
}
2417

2418
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
2,961✔
2419
  SMnode *pMnode = pReq->info.node;
2,961✔
2420
  SSdb   *pSdb = pMnode->pSdb;
2,961✔
2421
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
2,961✔
2422
    return 0;
1,319✔
2423
  }
2424

2425
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
1,642✔
2426
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
1,642✔
2427
  if (pMsg == NULL) {
1,642!
2428
    return terrno;
×
2429
  }
2430

2431
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
1,642✔
2432
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1,642✔
2433
}
2434

UNCOV
2435
static int32_t mndProcessStatusCheck(SRpcMsg *pReq) {
×
UNCOV
2436
  SMnode *pMnode = pReq->info.node;
×
UNCOV
2437
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2438
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
×
UNCOV
2439
    return 0;
×
2440
  }
2441

2442
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
×
UNCOV
2443
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
×
UNCOV
2444
  if (pMsg == NULL) {
×
UNCOV
2445
    return terrno;
×
2446
  }
2447

UNCOV
2448
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
×
2449
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
2450
}
2451

2452
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
2,056✔
2453
  SStreamTaskIter *pIter = NULL;
2,056✔
2454
  int32_t          code = createStreamTaskIter(pStream, &pIter);
2,056✔
2455
  if (code) {
2,056!
UNCOV
2456
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
2457
    return;
×
2458
  }
2459

2460
  while (streamTaskIterNextTask(pIter)) {
12,341✔
2461
    SStreamTask *pTask = NULL;
10,285✔
2462
    code = streamTaskIterGetCurrent(pIter, &pTask);
10,285✔
2463
    if (code) {
10,285!
UNCOV
2464
      break;
×
2465
    }
2466

2467
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
10,285✔
2468
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
10,285✔
2469
    if (p == NULL) {
10,285✔
2470
      STaskStatusEntry entry = {0};
9,402✔
2471
      streamTaskStatusInit(&entry, pTask);
9,402✔
2472

2473
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
9,402✔
2474
      if (code == 0) {
9,402!
2475
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
9,402✔
2476
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
9,402✔
2477
        if (px) {
9,402!
2478
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
9,402!
2479
        } else {
UNCOV
2480
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2481
        }
2482
      } else {
UNCOV
2483
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2484
      }
2485

2486
      // add the new vgroups if not added yet
2487
      bool exist = false;
9,402✔
2488
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
49,017✔
2489
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
47,125✔
2490
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
47,125!
2491
          exist = true;
7,510✔
2492
          break;
7,510✔
2493
        }
2494
      }
2495

2496
      if (!exist) {
9,402✔
2497
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
1,892✔
2498
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
1,892✔
2499

2500
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
1,892✔
2501
        if (px) {
1,892!
2502
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
1,892!
2503
        } else {
2504
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2505
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2506
        }
2507
      }
2508
    }
2509
  }
2510

2511
  destroyStreamTaskIter(pIter);
2,056✔
2512
}
2513

2514
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
4,413✔
2515
  int32_t num = taosArrayGetSize(pList);
4,413✔
2516
  for (int32_t i = 0; i < num; ++i) {
16,421✔
2517
    int32_t *pId = taosArrayGet(pList, i);
12,011✔
2518
    if (pId == NULL) {
12,011!
UNCOV
2519
      continue;
×
2520
    }
2521

2522
    if (taskId == *pId) {
12,011✔
2523
      return;
3✔
2524
    }
2525
  }
2526

2527
  int32_t numOfTasks = taosArrayGetSize(pList);
4,410✔
2528
  void   *p = taosArrayPush(pList, &taskId);
4,410✔
2529
  if (p) {
4,410!
2530
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
4,410✔
2531
  } else {
2532
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2533
           uid, numOfTasks);
2534
  }
2535
}
2536

2537
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
4,413✔
2538
  SMnode                  *pMnode = pReq->info.node;
4,413✔
2539
  SStreamTaskCheckpointReq req = {0};
4,413✔
2540

2541
  SDecoder decoder = {0};
4,413✔
2542
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,413✔
2543

2544
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
4,413!
UNCOV
2545
    tDecoderClear(&decoder);
×
UNCOV
2546
    mError("invalid task checkpoint req msg received");
×
UNCOV
2547
    return TSDB_CODE_INVALID_MSG;
×
2548
  }
2549
  tDecoderClear(&decoder);
4,413✔
2550

2551
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
4,413✔
2552

2553
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2554
  streamMutexLock(&execInfo.lock);
4,413✔
2555

2556
  SStreamObj *pStream = NULL;
4,413✔
2557
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,413✔
2558
  if (pStream == NULL || code != 0) {
4,413!
UNCOV
2559
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2560
          req.streamId);
2561

2562
    // not in meta-store yet, try to acquire the task in exec buffer
2563
    // the checkpoint req arrives too soon before the completion of the create stream trans.
UNCOV
2564
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
UNCOV
2565
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
2566
    if (p == NULL) {
×
UNCOV
2567
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
UNCOV
2568
      streamMutexUnlock(&execInfo.lock);
×
2569
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2570
    } else {
UNCOV
2571
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2572
             req.streamId, req.taskId);
2573
    }
2574
  }
2575

2576
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,413!
2577

2578
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
4,413✔
2579
  if (pReqTaskList == NULL) {
4,413✔
2580
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
766✔
2581
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
766✔
2582
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
766✔
2583
    if (code) {
766!
UNCOV
2584
      mError("failed to put into transfer state stream map, code: out of memory");
×
2585
    }
2586
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
766✔
2587
  } else {
2588
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
3,647✔
2589
  }
2590

2591
  int32_t total = taosArrayGetSize(*pReqTaskList);
4,413✔
2592
  if (total == numOfTasks) {  // all tasks have sent the reqs
4,413✔
2593
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
737✔
2594
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
737!
2595

2596
    if (pStream != NULL) {  // TODO:handle error
737!
2597
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
737✔
2598
      if (code) {
737!
2599
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
737!
2600
      }
2601
    } else {
2602
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2603
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2604
      // sleep(500ms)
2605
    }
2606

2607
    // remove this entry
2608
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
737✔
2609

2610
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
737✔
2611
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
737✔
2612
  }
2613

2614
  if (pStream != NULL) {
4,413!
2615
    mndReleaseStream(pMnode, pStream);
4,413✔
2616
  }
2617

2618
  streamMutexUnlock(&execInfo.lock);
4,413✔
2619

2620
  {
2621
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
4,413✔
2622
    rsp.pCont = rpcMallocCont(rsp.contLen);
4,413✔
2623
    if (rsp.pCont == NULL) {
4,413!
UNCOV
2624
      return terrno;
×
2625
    }
2626

2627
    SMsgHead *pHead = rsp.pCont;
4,413✔
2628
    pHead->vgId = htonl(req.nodeId);
4,413✔
2629

2630
    tmsgSendRsp(&rsp);
4,413✔
2631
    pReq->info.handle = NULL;  // disable auto rsp
4,413✔
2632
  }
2633

2634
  return 0;
4,413✔
2635
}
2636

2637
// valid the info according to the HbMsg
2638
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
6,499✔
2639
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
6,499✔
2640
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
6,499✔
2641
  if (pTaskEntry == NULL) {
6,499✔
2642
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
20!
2643
    return false;
20✔
2644
  }
2645

2646
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
6,479!
UNCOV
2647
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2648
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2649
    return false;
×
2650
  }
2651

2652
  // now the task in checkpoint procedure
2653
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
6,479!
UNCOV
2654
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2655
           " discard",
2656
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
UNCOV
2657
    return false;
×
2658
  }
2659

2660
  if (reportChkptId >= pReport->checkpointId) {
6,479!
UNCOV
2661
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2662
           " discard",
2663
           pReport->taskId, pReport->checkpointId, reportChkptId);
2664
    return false;
×
2665
  }
2666

2667
  return true;
6,479✔
2668
}
2669

2670
static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) {
6,499✔
2671
  bool valid = validateChkptReport(pReport, reportedChkptId);
6,499✔
2672
  if (!valid) {
6,499✔
2673
    return;
20✔
2674
  }
2675

2676
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
21,815✔
2677
    STaskChkptInfo *p = taosArrayGet(pList, i);
15,336✔
2678
    if (p == NULL) {
15,336!
2679
      continue;
×
2680
    }
2681

2682
    if (p->taskId == pReport->taskId) {
15,336!
UNCOV
2683
      if (p->checkpointId > pReport->checkpointId) {
×
2684
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2685
               pReport->taskId, p->checkpointId, pReport->checkpointId);
UNCOV
2686
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
UNCOV
2687
        mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2688
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2689

2690
        // update the checkpoint report info
UNCOV
2691
        p->checkpointId = pReport->checkpointId;
×
UNCOV
2692
        p->ts = pReport->checkpointTs;
×
UNCOV
2693
        p->version = pReport->checkpointVer;
×
UNCOV
2694
        p->transId = pReport->transId;
×
UNCOV
2695
        p->dropHTask = pReport->dropHTask;
×
2696
      } else {
UNCOV
2697
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2698
      }
UNCOV
2699
      return;
×
2700
    }
2701
  }
2702

2703
  STaskChkptInfo info = {
6,479✔
2704
      .streamId = pReport->streamId,
6,479✔
2705
      .taskId = pReport->taskId,
6,479✔
2706
      .transId = pReport->transId,
6,479✔
2707
      .dropHTask = pReport->dropHTask,
6,479✔
2708
      .version = pReport->checkpointVer,
6,479✔
2709
      .ts = pReport->checkpointTs,
6,479✔
2710
      .checkpointId = pReport->checkpointId,
6,479✔
2711
      .nodeId = pReport->nodeId,
6,479✔
2712
  };
2713

2714
  void *p = taosArrayPush(pList, &info);
6,479✔
2715
  if (p == NULL) {
6,479!
UNCOV
2716
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2717
  } else {
2718
    int32_t size = taosArrayGetSize(pList);
6,479✔
2719
    mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report",
6,479✔
2720
           pReport->streamId, pReport->taskId, size);
2721
  }
2722
}
2723

2724
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
6,499✔
2725
  SMnode           *pMnode = pReq->info.node;
6,499✔
2726
  SCheckpointReport req = {0};
6,499✔
2727

2728
  SDecoder decoder = {0};
6,499✔
2729
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
6,499✔
2730

2731
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
6,499!
UNCOV
2732
    tDecoderClear(&decoder);
×
UNCOV
2733
    mError("invalid task checkpoint-report msg received");
×
UNCOV
2734
    return TSDB_CODE_INVALID_MSG;
×
2735
  }
2736
  tDecoderClear(&decoder);
6,499✔
2737

2738
  streamMutexLock(&execInfo.lock);
6,499✔
2739
  mndInitStreamExecInfo(pMnode, &execInfo);
6,499✔
2740
  streamMutexUnlock(&execInfo.lock);
6,499✔
2741

2742
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
6,499✔
2743
         " checkpointVer:%" PRId64 " transId:%d",
2744
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2745

2746
  // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans.
2747
  streamMutexLock(&execInfo.lock);
6,499✔
2748

2749
  SStreamObj *pStream = NULL;
6,499✔
2750
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
6,499✔
2751
  if (pStream == NULL || code != 0) {
6,499!
UNCOV
2752
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2753

2754
    // not in meta-store yet, try to acquire the task in exec buffer
2755
    // the checkpoint req arrives too soon before the completion of the creation of stream trans.
UNCOV
2756
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
UNCOV
2757
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
2758
    if (p == NULL) {
×
UNCOV
2759
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
UNCOV
2760
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
2761
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2762
    } else {
2763
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2764
             req.streamId, req.taskId);
2765
    }
2766
  }
2767

2768
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
6,499!
2769

2770
  SChkptReportInfo *pInfo =
2771
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
6,499✔
2772
  if (pInfo == NULL) {
6,499✔
2773
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
755✔
2774
    if (info.pTaskList != NULL) {
755!
2775
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
755✔
2776
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
755✔
2777
      if (code) {
755!
UNCOV
2778
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2779
      }
2780

2781
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
755✔
2782
    }
2783
  } else {
2784
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
5,744✔
2785
  }
2786

2787
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
6,499✔
2788
  if (total == numOfTasks) {  // all tasks have sent the reqs
6,499✔
2789
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
1,360!
2790
          " will be issued soon",
2791
          req.streamId, pStream->name, total, req.checkpointId);
2792
  }
2793

2794
  if (pStream != NULL) {
6,499!
2795
    mndReleaseStream(pMnode, pStream);
6,499✔
2796
  }
2797

2798
  streamMutexUnlock(&execInfo.lock);
6,499✔
2799

2800
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
6,499✔
2801
  return code;
6,499✔
2802
}
2803

2804
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
248✔
2805
  int32_t num = 0;
248✔
2806
  int64_t chkId = INT64_MAX;
248✔
2807
  *pExistedTasks = 0;
248✔
2808
  *pAllSame = true;
248✔
2809

2810
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
6,952✔
2811
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
6,704✔
2812
    if (p == NULL) {
6,704!
UNCOV
2813
      continue;
×
2814
    }
2815

2816
    if (p->streamId != streamId) {
6,704✔
2817
      continue;
5,118✔
2818
    }
2819

2820
    num += 1;
1,586✔
2821
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
1,586✔
2822
    if (chkId > pe->checkpointInfo.latestId) {
1,586✔
2823
      if (chkId != INT64_MAX) {
260✔
2824
        *pAllSame = false;
12✔
2825
      }
2826
      chkId = pe->checkpointInfo.latestId;
260✔
2827
    }
2828
  }
2829

2830
  *pExistedTasks = num;
248✔
2831
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
248!
UNCOV
2832
    return -1;
×
2833
  }
2834

2835
  return chkId;
248✔
2836
}
2837

2838
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
6,499✔
2839
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
6,499✔
2840
  rsp.pCont = rpcMallocCont(rsp.contLen);
6,499✔
2841
  if (rsp.pCont != NULL) {
6,499!
2842
    SMsgHead *pHead = rsp.pCont;
6,499✔
2843
    pHead->vgId = htonl(vgId);
6,499✔
2844

2845
    tmsgSendRsp(&rsp);
6,499✔
2846
    pInfo->handle = NULL;  // disable auto rsp
6,499✔
2847
  }
2848
}
6,499✔
2849

2850
static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) {
55✔
2851
  int32_t alreadySend = taosArrayGetSize(pList);
55✔
2852

2853
  for (int32_t i = 0; i < alreadySend; ++i) {
295✔
2854
    int32_t *taskId = taosArrayGet(pList, i);
240✔
2855
    if (taskId == NULL) {
240!
UNCOV
2856
      continue;
×
2857
    }
2858

2859
    for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
240!
2860
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
240✔
2861
      if ((pe != NULL) && (pe->req.taskId == *taskId)) {
240!
2862
        taosArrayRemove(pInfo->pTaskList, k);
240✔
2863
        break;
240✔
2864
      }
2865
    }
2866
  }
2867

2868
  return alreadySend;
55✔
2869
}
2870

2871
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
13,189✔
2872
  SMnode *pMnode = pMsg->info.node;
13,189✔
2873
  int64_t now = taosGetTimestampMs();
13,189✔
2874
  bool    allReady = true;
13,189✔
2875
  SArray *pNodeSnapshot = NULL;
13,189✔
2876
  int32_t maxAllowedTrans = 20;
13,189✔
2877
  int32_t numOfTrans = 0;
13,189✔
2878
  int32_t code = 0;
13,189✔
2879
  void   *pIter = NULL;
13,189✔
2880

2881
  SArray *pList = taosArrayInit(4, sizeof(int32_t));
13,189✔
2882
  if (pList == NULL) {
13,189!
UNCOV
2883
    return terrno;
×
2884
  }
2885

2886
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
13,189✔
2887
  if (pStreamList == NULL) {
13,189!
UNCOV
2888
    taosArrayDestroy(pList);
×
UNCOV
2889
    return terrno;
×
2890
  }
2891

2892
  mDebug("start to process consensus-checkpointId in tmr");
13,189✔
2893

2894
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
13,189✔
2895
  taosArrayDestroy(pNodeSnapshot);
13,189✔
2896
  if (code) {
13,189✔
2897
    mError("failed to get the vgroup snapshot, ignore it and continue");
126!
2898
  }
2899

2900
  if (!allReady) {
13,189✔
2901
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1,379!
2902
    taosArrayDestroy(pStreamList);
1,379✔
2903
    taosArrayDestroy(pList);
1,379✔
2904
    return 0;
1,379✔
2905
  }
2906

2907
  streamMutexLock(&execInfo.lock);
11,810✔
2908

2909
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
11,863✔
2910
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
55✔
2911

2912
    taosArrayClear(pList);
55✔
2913

2914
    int64_t     streamId = -1;
55✔
2915
    int32_t     num = taosArrayGetSize(pInfo->pTaskList);
55✔
2916
    SStreamObj *pStream = NULL;
55✔
2917

2918
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
55✔
2919
    if (pStream == NULL || code != 0) {  // stream has been dropped already
55!
UNCOV
2920
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
UNCOV
2921
      void *p = taosArrayPush(pStreamList, &pInfo->streamId);
×
UNCOV
2922
      if (p == NULL) {
×
UNCOV
2923
        mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
×
2924
               " code:%s, continue",
2925
               pInfo->streamId, tstrerror(terrno));
2926
      }
UNCOV
2927
      continue;
×
2928
    }
2929

2930
    for (int32_t j = 0; j < num; ++j) {
303✔
2931
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
248✔
2932
      if (pe == NULL) {
248!
UNCOV
2933
        continue;
×
2934
      }
2935

2936
      if (streamId == -1) {
248✔
2937
        streamId = pe->req.streamId;
55✔
2938
      }
2939

2940
      int32_t existed = 0;
248✔
2941
      bool    allSame = true;
248✔
2942
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
248✔
2943
      if (chkId == -1) {
248!
2944
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2945
               pInfo->numOfTasks, pe->req.taskId);
UNCOV
2946
        break;
×
2947
      }
2948

2949
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
488✔
2950
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
240✔
2951
               pe->req.startTs, (now - pe->ts) / 1000.0);
2952
        if (chkId > pe->req.checkpointId) {
240!
UNCOV
2953
          streamMutexUnlock(&execInfo.lock);
×
UNCOV
2954
          taosArrayDestroy(pStreamList);
×
UNCOV
2955
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2956
                 pe->req.checkpointId, chkId);
2957

UNCOV
2958
          mndReleaseStream(pMnode, pStream);
×
UNCOV
2959
          taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
UNCOV
2960
          return TSDB_CODE_FAILED;
×
2961
        }
2962

2963
        // todo: check for redundant consensus-checkpoint trans, if this kinds of trans repeatly failed.
2964
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
240✔
2965
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
240!
UNCOV
2966
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2967
        }
2968

2969
        void *p = taosArrayPush(pList, &pe->req.taskId);
240✔
2970
        if (p == NULL) {
240!
UNCOV
2971
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2972
        }
2973
      } else {
2974
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
8!
2975
               pe->req.startTs, (now - pe->ts) / 1000.0);
2976
      }
2977
    }
2978

2979
    mndReleaseStream(pMnode, pStream);
55✔
2980

2981
    int32_t alreadySend = doCleanReqList(pList, pInfo);
55✔
2982

2983
    // clear request stream item with empty task list
2984
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
55✔
2985
      mndClearConsensusRspEntry(pInfo);
53✔
2986
      if (streamId == -1) {
53!
UNCOV
2987
        mError("streamId is -1, streamId:%" PRIx64 " in consensus-checkpointId hashMap, cont", pInfo->streamId);
×
2988
      }
2989

2990
      void *p = taosArrayPush(pStreamList, &streamId);
53✔
2991
      if (p == NULL) {
53!
2992
        mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId);
×
2993
      }
2994
    }
2995

2996
    numOfTrans += alreadySend;
55✔
2997
    if (numOfTrans > maxAllowedTrans) {
55✔
2998
      mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend);
2!
2999
      taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
2✔
3000
      break;
2✔
3001
    }
3002
  }
3003

3004
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
11,863✔
3005
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
53✔
3006
    if (pStreamId == NULL) {
53!
UNCOV
3007
      continue;
×
3008
    }
3009

3010
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
53✔
3011
  }
3012

3013
  streamMutexUnlock(&execInfo.lock);
11,810✔
3014

3015
  taosArrayDestroy(pStreamList);
11,810✔
3016
  taosArrayDestroy(pList);
11,810✔
3017

3018
  mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
11,810✔
3019
  return code;
11,810✔
3020
}
3021

3022
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
263✔
3023
  int32_t code = mndProcessCreateStreamReq(pReq);
263✔
3024
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
263!
UNCOV
3025
    pReq->info.rsp = rpcMallocCont(1);
×
UNCOV
3026
    if (pReq->info.rsp == NULL) {
×
3027
      return terrno;
×
3028
    }
3029

UNCOV
3030
    pReq->info.rspLen = 1;
×
UNCOV
3031
    pReq->info.noResp = false;
×
UNCOV
3032
    pReq->code = code;
×
3033
  }
3034
  return code;
263✔
3035
}
3036

3037
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
224✔
3038
  int32_t code = mndProcessDropStreamReq(pReq);
224✔
3039
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
224!
3040
    pReq->info.rsp = rpcMallocCont(1);
20✔
3041
    if (pReq->info.rsp == NULL) {
20!
UNCOV
3042
      return terrno;
×
3043
    }
3044

3045
    pReq->info.rspLen = 1;
20✔
3046
    pReq->info.noResp = false;
20✔
3047
    pReq->code = code;
20✔
3048
  }
3049
  return code;
224✔
3050
}
3051

3052
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
60,169✔
3053
  if (pExecInfo->initTaskList || pMnode == NULL) {
60,169✔
3054
    return;
60,005✔
3055
  }
3056

3057
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
164✔
3058
  pExecInfo->initTaskList = true;
164✔
3059
}
3060

3061
void mndStreamResetInitTaskListLoadFlag() {
1,516✔
3062
  mInfo("reset task list buffer init flag for leader");
1,516!
3063
  execInfo.initTaskList = false;
1,516✔
3064
}
1,516✔
3065

3066
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
1,837✔
3067
  execInfo.switchFromFollower = false;
1,837✔
3068

3069
  if (execInfo.role == NODE_ROLE_UNINIT) {
1,837✔
3070
    execInfo.role = role;
1,649✔
3071
    if (role == NODE_ROLE_LEADER) {
1,649✔
3072
      mInfo("init mnode is set to leader");
1,462!
3073
    } else {
3074
      mInfo("init mnode is set to follower");
187!
3075
    }
3076
  } else {
3077
    if (role == NODE_ROLE_LEADER) {
188✔
3078
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
54!
3079
        execInfo.role = role;
54✔
3080
        execInfo.switchFromFollower = true;
54✔
3081
        mInfo("mnode switch to be leader from follower");
54!
3082
      } else {
UNCOV
3083
        mInfo("mnode remain to be leader, do nothing");
×
3084
      }
3085
    } else {  // follower's
3086
      if (execInfo.role == NODE_ROLE_LEADER) {
134!
UNCOV
3087
        execInfo.role = role;
×
UNCOV
3088
        mInfo("mnode switch to be follower from leader");
×
3089
      } else {
3090
        mInfo("mnode remain to be follower, do nothing");
134!
3091
      }
3092
    }
3093
  }
3094
}
1,837✔
3095

3096
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
164✔
3097
  SSdb       *pSdb = pMnode->pSdb;
164✔
3098
  SStreamObj *pStream = NULL;
164✔
3099
  void       *pIter = NULL;
164✔
3100

3101
  while (1) {
3102
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
440✔
3103
    if (pIter == NULL) {
440✔
3104
      break;
164✔
3105
    }
3106

3107
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
276✔
3108
    sdbRelease(pSdb, pStream);
276✔
3109
  }
3110
}
164✔
3111

3112
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
1,226✔
3113
  STrans *pTrans = NULL;
1,226✔
3114
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
1,226✔
3115
                               "update checkpoint-info", &pTrans);
3116
  if (pTrans == NULL || code) {
1,226!
3117
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3118
    return code;
×
3119
  }
3120

3121
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
1,226✔
3122
  if (code) {
1,226!
3123
    sdbRelease(pMnode->pSdb, pStream);
×
3124
    mndTransDrop(pTrans);
×
UNCOV
3125
    return code;
×
3126
  }
3127

3128
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
1,226✔
3129
  if (code) {
1,226!
3130
    sdbRelease(pMnode->pSdb, pStream);
×
3131
    mndTransDrop(pTrans);
×
3132
    return code;
×
3133
  }
3134

3135
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,226✔
3136
  if (code) {
1,226!
UNCOV
3137
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3138
    mndTransDrop(pTrans);
×
UNCOV
3139
    return code;
×
3140
  }
3141

3142
  code = mndTransPrepare(pMnode, pTrans);
1,226✔
3143
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,226!
UNCOV
3144
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
3145
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3146
    mndTransDrop(pTrans);
×
UNCOV
3147
    return code;
×
3148
  }
3149

3150
  sdbRelease(pMnode->pSdb, pStream);
1,226✔
3151
  mndTransDrop(pTrans);
1,226✔
3152

3153
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,226✔
3154
}
3155

3156
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
2✔
3157
  SMnode      *pMnode = pReq->info.node;
2✔
3158
  int32_t      code = 0;
2✔
3159
  SOrphanTask *pTask = NULL;
2✔
3160
  int32_t      i = 0;
2✔
3161
  STrans      *pTrans = NULL;
2✔
3162
  int32_t      numOfTasks = 0;
2✔
3163

3164
  SMStreamDropOrphanMsg msg = {0};
2✔
3165
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
2✔
3166
  if (code) {
2!
UNCOV
3167
    return code;
×
3168
  }
3169

3170
  numOfTasks = taosArrayGetSize(msg.pList);
2✔
3171
  if (numOfTasks == 0) {
2!
UNCOV
3172
    mDebug("no orphan tasks to drop, no need to create trans");
×
UNCOV
3173
    goto _err;
×
3174
  }
3175

3176
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
2!
3177

3178
  i = 0;
2✔
3179
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
2!
UNCOV
3180
    i += 1;
×
3181
  }
3182

3183
  if (pTask == NULL) {
2!
3184
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
UNCOV
3185
    goto _err;
×
3186
  }
3187

3188
  // check if it is conflict with other trans in both sourceDb and targetDb.
3189
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
2✔
3190
  if (code) {
2!
UNCOV
3191
    goto _err;
×
3192
  }
3193

3194
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
2✔
3195

3196
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
2✔
3197
  if (pTrans == NULL || code != 0) {
2!
UNCOV
3198
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
3199
    goto _err;
×
3200
  }
3201

3202
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
2✔
3203
  if (code) {
2!
UNCOV
3204
    goto _err;
×
3205
  }
3206

3207
  // drop all tasks
3208
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
2!
UNCOV
3209
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
3210
    goto _err;
×
3211
  }
3212

3213
  // drop stream
3214
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
2!
UNCOV
3215
    goto _err;
×
3216
  }
3217

3218
  code = mndTransPrepare(pMnode, pTrans);
2✔
3219
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
UNCOV
3220
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
3221
    goto _err;
×
3222
  }
3223

3224
_err:
2✔
3225
  tDestroyDropOrphanTaskMsg(&msg);
2✔
3226
  mndTransDrop(pTrans);
2✔
3227

3228
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
3229
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
2!
3230
  }
3231
  return code;
2✔
3232
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc