• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3591

24 Jan 2025 08:57AM UTC coverage: 63.688% (+0.1%) from 63.566%
#3591

push

travis-ci

web-flow
Merge pull request #29638 from taosdata/docs/TS-5846-3.0

enh: TDengine modify taosBenchmark new query rule cases and add doc

141693 of 285630 branches covered (49.61%)

Branch coverage included in aggregate %.

220359 of 282844 relevant lines covered (77.91%)

19402441.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.67
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndStream.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
44

45
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
46
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
47

48
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
49
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
51
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
53
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
54
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
55
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
56
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
57
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
58
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
59
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
60
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
61
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
62
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
63
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
64
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
65
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
66
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
67

68
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
69
static void     removeExpiredNodeInfo(const SArray *pNodeSnapshot);
70
static int32_t  doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
71
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
72

73
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
74
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
75
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
77
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
78

79
int32_t mndInitStream(SMnode *pMnode) {
1,912✔
80
  SSdbTable table = {
1,912✔
81
      .sdbType = SDB_STREAM,
82
      .keyType = SDB_KEY_BINARY,
83
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
84
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
85
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
86
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
87
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
88
  };
89
  SSdbTable tableSeq = {
1,912✔
90
      .sdbType = SDB_STREAM_SEQ,
91
      .keyType = SDB_KEY_BINARY,
92
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
93
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
94
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
95
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
96
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
97
  };
98

99
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,912✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,912✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
1,912✔
102

103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
1,912✔
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
1,912✔
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
1,912✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
1,912✔
107
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
1,912✔
108
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
1,912✔
109
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
1,912✔
110
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
1,912✔
111
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
1,912✔
112

113
  // for msgs inside mnode
114
  // TODO change the name
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
1,912✔
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
1,912✔
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
1,912✔
118
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
1,912✔
119

120
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
1,912✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
1,912✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
1,912✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
1,912✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
1,912✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
1,912✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
1,912✔
127
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
1,912✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
1,912✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
1,912✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
1,912✔
131

132
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
1,912✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
1,912✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
1,912✔
135

136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,912✔
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,912✔
138
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,912✔
139
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,912✔
140

141
  int32_t code = mndInitExecInfo();
1,912✔
142
  if (code) {
1,912!
143
    return code;
×
144
  }
145

146
  code = sdbSetTable(pMnode->pSdb, table);
1,912✔
147
  if (code) {
1,912!
148
    return code;
×
149
  }
150

151
  code = sdbSetTable(pMnode->pSdb, tableSeq);
1,912✔
152
  return code;
1,912✔
153
}
154

155
void mndCleanupStream(SMnode *pMnode) {
1,911✔
156
  taosArrayDestroy(execInfo.pTaskList);
1,911✔
157
  taosArrayDestroy(execInfo.pNodeList);
1,911✔
158
  taosArrayDestroy(execInfo.pKilledChkptTrans);
1,911✔
159
  taosHashCleanup(execInfo.pTaskMap);
1,911✔
160
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
1,911✔
161
  taosHashCleanup(execInfo.pTransferStateStreams);
1,911✔
162
  taosHashCleanup(execInfo.pChkptStreams);
1,911✔
163
  taosHashCleanup(execInfo.pStreamConsensus);
1,911✔
164
  (void)taosThreadMutexDestroy(&execInfo.lock);
1,911✔
165
  mDebug("mnd stream exec info cleanup");
1,911✔
166
}
1,911✔
167

168
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
6,744✔
169
  int32_t     code = 0;
6,744✔
170
  int32_t     lino = 0;
6,744✔
171
  SSdbRow    *pRow = NULL;
6,744✔
172
  SStreamObj *pStream = NULL;
6,744✔
173
  void       *buf = NULL;
6,744✔
174
  int8_t      sver = 0;
6,744✔
175
  int32_t     tlen;
176
  int32_t     dataPos = 0;
6,744✔
177

178
  code = sdbGetRawSoftVer(pRaw, &sver);
6,744✔
179
  TSDB_CHECK_CODE(code, lino, _over);
6,744!
180

181
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
6,744!
182
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
183
    goto _over;
×
184
  }
185

186
  pRow = sdbAllocRow(sizeof(SStreamObj));
6,744✔
187
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
6,744!
188

189
  pStream = sdbGetRowObj(pRow);
6,744✔
190
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
6,744!
191

192
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
6,744!
193

194
  buf = taosMemoryMalloc(tlen + 1);
6,744!
195
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,744!
196

197
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,744!
198

199
  SDecoder decoder;
200
  tDecoderInit(&decoder, buf, tlen + 1);
6,744✔
201
  code = tDecodeSStreamObj(&decoder, pStream, sver);
6,744✔
202
  tDecoderClear(&decoder);
6,744✔
203

204
  if (code < 0) {
6,744!
205
    tFreeStreamObj(pStream);
×
206
  }
207

208
_over:
6,744✔
209
  taosMemoryFreeClear(buf);
6,744!
210

211
  if (code != TSDB_CODE_SUCCESS) {
6,744!
212
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
213
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
214
    taosMemoryFreeClear(pRow);
×
215

216
    terrno = code;
×
217
    return NULL;
×
218
  } else {
219
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
6,744✔
220
           pStream->checkpointId);
221

222
    terrno = 0;
6,744✔
223
    return pRow;
6,744✔
224
  }
225
}
226

227
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
1,806✔
228
  mTrace("stream:%s, perform insert action", pStream->name);
1,806✔
229
  return 0;
1,806✔
230
}
231

232
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
6,744✔
233
  mTrace("stream:%s, perform delete action", pStream->name);
6,744✔
234
  taosWLockLatch(&pStream->lock);
6,744✔
235
  tFreeStreamObj(pStream);
6,744✔
236
  taosWUnLockLatch(&pStream->lock);
6,744✔
237
  return 0;
6,744✔
238
}
239

240
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
3,593✔
241
  mTrace("stream:%s, perform update action", pOldStream->name);
3,593✔
242
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
3,593✔
243

244
  taosWLockLatch(&pOldStream->lock);
3,593✔
245

246
  pOldStream->status = pNewStream->status;
3,593✔
247
  pOldStream->updateTime = pNewStream->updateTime;
3,593✔
248
  pOldStream->checkpointId = pNewStream->checkpointId;
3,593✔
249
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
3,593✔
250

251
  taosWUnLockLatch(&pOldStream->lock);
3,593✔
252
  return 0;
3,593✔
253
}
254

255
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
6,561✔
256
  int32_t code = 0;
6,561✔
257
  SSdb   *pSdb = pMnode->pSdb;
6,561✔
258
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
6,561✔
259
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
6,561!
260
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
2,743✔
261
  }
262
  return code;
6,561✔
263
}
264

265
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
14,441✔
266
  SSdb *pSdb = pMnode->pSdb;
14,441✔
267
  sdbRelease(pSdb, pStream);
14,441✔
268
}
14,441✔
269

270
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
271
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
272
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
273
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
274
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
275

276
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
1,714✔
277
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
1,714!
278
      pCreate->targetStbFullName[0] == 0) {
1,714!
279
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
280
  }
281
  return TSDB_CODE_SUCCESS;
1,714✔
282
}
283

284
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
1,710✔
285
  pWrapper->nCols = taosArrayGetSize(pFields);
1,710✔
286
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
1,710!
287
  if (NULL == pWrapper->pSchema) {
1,710!
288
    return terrno;
×
289
  }
290

291
  int32_t index = 0;
1,710✔
292
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
54,068✔
293
    SField *pField = (SField *)taosArrayGet(pFields, i);
52,358✔
294
    if (pField == NULL) {
52,358!
295
      return terrno;
×
296
    }
297

298
    if (TSDB_DATA_TYPE_NULL == pField->type) {
52,358!
299
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
300
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
301
    } else {
302
      pWrapper->pSchema[index].type = pField->type;
52,358✔
303
      pWrapper->pSchema[index].bytes = pField->bytes;
52,358✔
304
    }
305
    pWrapper->pSchema[index].colId = index + 1;
52,358✔
306
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
52,358✔
307
    pWrapper->pSchema[index].flags = pField->flags;
52,358✔
308
    index += 1;
52,358✔
309
  }
310

311
  return TSDB_CODE_SUCCESS;
1,710✔
312
}
313

314
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
1,710✔
315
  if (pWrapper->nCols < 2) {
1,710!
316
    return false;
×
317
  }
318
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
52,539✔
319
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
50,856✔
320
      return true;
27✔
321
    }
322
  }
323
  return false;
1,683✔
324
}
325

326
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
1,710✔
327
  SNode      *pAst = NULL;
1,710✔
328
  SQueryPlan *pPlan = NULL;
1,710✔
329
  int32_t     code = 0;
1,710✔
330

331
  mInfo("stream:%s to create", pCreate->name);
1,710!
332
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
1,710✔
333
  pObj->createTime = taosGetTimestampMs();
1,710✔
334
  pObj->updateTime = pObj->createTime;
1,710✔
335
  pObj->version = 1;
1,710✔
336

337
  if (pCreate->smaId > 0) {
1,710✔
338
    pObj->subTableWithoutMd5 = 1;
228✔
339
  }
340

341
  pObj->smaId = pCreate->smaId;
1,710✔
342
  pObj->indexForMultiAggBalance = -1;
1,710✔
343

344
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,710✔
345

346
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
1,710✔
347
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
1,710✔
348

349
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,710✔
350
  pObj->status = 0;
1,710✔
351

352
  pObj->conf.igExpired = pCreate->igExpired;
1,710✔
353
  pObj->conf.trigger = pCreate->triggerType;
1,710✔
354
  pObj->conf.triggerParam = pCreate->maxDelay;
1,710✔
355
  pObj->conf.watermark = pCreate->watermark;
1,710✔
356
  pObj->conf.fillHistory = pCreate->fillHistory;
1,710✔
357
  pObj->deleteMark = pCreate->deleteMark;
1,710✔
358
  pObj->igCheckUpdate = pCreate->igUpdate;
1,710✔
359

360
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
1,710✔
361
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
1,710✔
362
  if (pSourceDb == NULL) {
1,710!
363
    code = terrno;
×
364
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
365
          tstrerror(code));
366
    goto FAIL;
×
367
  }
368

369
  pObj->sourceDbUid = pSourceDb->uid;
1,710✔
370
  mndReleaseDb(pMnode, pSourceDb);
1,710✔
371

372
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
1,710✔
373

374
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
1,710✔
375
  if (pTargetDb == NULL) {
1,710!
376
    code = terrno;
×
377
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
378
           tstrerror(code));
379
    goto FAIL;
×
380
  }
381

382
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
1,710✔
383

384
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
1,710✔
385
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,557✔
386
  } else {
387
    pObj->targetStbUid = pCreate->targetStbUid;
153✔
388
  }
389
  pObj->targetDbUid = pTargetDb->uid;
1,710✔
390
  mndReleaseDb(pMnode, pTargetDb);
1,710✔
391

392
  pObj->sql = pCreate->sql;
1,710✔
393
  pObj->ast = pCreate->ast;
1,710✔
394

395
  pCreate->sql = NULL;
1,710✔
396
  pCreate->ast = NULL;
1,710✔
397

398
  // deserialize ast
399
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
1,710!
400
    goto FAIL;
×
401
  }
402

403
  // create output schema
404
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
1,710!
405
    goto FAIL;
×
406
  }
407

408
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
1,710✔
409
  if (numOfNULL > 0) {
1,710✔
410
    pObj->outputSchema.nCols += numOfNULL;
26✔
411
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
26!
412
    if (!pFullSchema) {
26!
413
      code = terrno;
×
414
      goto FAIL;
×
415
    }
416

417
    int32_t nullIndex = 0;
26✔
418
    int32_t dataIndex = 0;
26✔
419
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
332✔
420
      if (nullIndex >= numOfNULL) {
306!
421
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
422
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
423
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
424
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
425
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
426
        dataIndex++;
×
427
      } else {
428
        SColLocation *pos = NULL;
306✔
429
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
306!
430
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
306✔
431
        }
432

433
        if (pos == NULL) {
306!
434
          mError("invalid null column index, %d", nullIndex);
×
435
          continue;
×
436
        }
437

438
        if (i < pos->slotId) {
306✔
439
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
79✔
440
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
79✔
441
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
79✔
442
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
79✔
443
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
79✔
444
          dataIndex++;
79✔
445
        } else {
446
          pFullSchema[i].bytes = 0;
227✔
447
          pFullSchema[i].colId = pos->colId;
227✔
448
          pFullSchema[i].flags = COL_SET_NULL;
227✔
449
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
227✔
450
          pFullSchema[i].type = pos->type;
227✔
451
          nullIndex++;
227✔
452
        }
453
      }
454
    }
455

456
    taosMemoryFree(pObj->outputSchema.pSchema);
26!
457
    pObj->outputSchema.pSchema = pFullSchema;
26✔
458
  }
459

460
  SPlanContext cxt = {
1,710✔
461
      .pAstRoot = pAst,
462
      .topicQuery = false,
463
      .streamQuery = true,
464
      .triggerType =
465
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
1,710✔
466
      .watermark = pObj->conf.watermark,
1,710✔
467
      .igExpired = pObj->conf.igExpired,
1,710✔
468
      .deleteMark = pObj->deleteMark,
1,710✔
469
      .igCheckUpdate = pObj->igCheckUpdate,
1,710✔
470
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
1,710✔
471
  };
472

473
  // using ast and param to build physical plan
474
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
1,710!
475
    goto FAIL;
×
476
  }
477

478
  // save physcial plan
479
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
1,710!
480
    goto FAIL;
×
481
  }
482

483
  pObj->tagSchema.nCols = pCreate->numOfTags;
1,710✔
484
  if (pCreate->numOfTags) {
1,710✔
485
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
253!
486
    if (pObj->tagSchema.pSchema == NULL) {
253!
487
      code = terrno;
×
488
      goto FAIL;
×
489
    }
490
  }
491

492
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
493
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
3,129✔
494
    SField *pField = taosArrayGet(pCreate->pTags, i);
1,419✔
495
    if (pField == NULL) {
1,419!
496
      continue;
×
497
    }
498

499
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
1,419✔
500
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
1,419✔
501
    pObj->tagSchema.pSchema[i].flags = pField->flags;
1,419✔
502
    pObj->tagSchema.pSchema[i].type = pField->type;
1,419✔
503
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
1,419✔
504
  }
505

506
FAIL:
1,710✔
507
  if (pAst != NULL) nodesDestroyNode(pAst);
1,710!
508
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
1,710!
509
  return code;
1,710✔
510
}
511

512
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
13,553✔
513
  SEncoder encoder;
514
  tEncoderInit(&encoder, NULL, 0);
13,553✔
515

516
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
13,553!
517
    pTask->ver = SSTREAM_TASK_VER;
×
518
  }
519

520
  int32_t code = tEncodeStreamTask(&encoder, pTask);
13,553✔
521
  if (code == -1) {
13,553!
522
    tEncoderClear(&encoder);
×
523
    return TSDB_CODE_INVALID_MSG;
×
524
  }
525

526
  int32_t size = encoder.pos;
13,553✔
527
  int32_t tlen = sizeof(SMsgHead) + size;
13,553✔
528
  tEncoderClear(&encoder);
13,553✔
529

530
  void *buf = taosMemoryCalloc(1, tlen);
13,553!
531
  if (buf == NULL) {
13,553!
532
    return terrno;
×
533
  }
534

535
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
13,553✔
536

537
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
13,553✔
538
  tEncoderInit(&encoder, abuf, size);
13,553✔
539
  code = tEncodeStreamTask(&encoder, pTask);
13,553✔
540
  tEncoderClear(&encoder);
13,553✔
541

542
  if (code != 0) {
13,553!
543
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
544
    taosMemoryFree(buf);
×
545
    return code;
×
546
  }
547

548
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
13,553✔
549
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
550
  if (code) {
13,553!
551
    taosMemoryFree(buf);
×
552
  }
553

554
  return code;
13,553✔
555
}
556

557
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
1,738✔
558
  SStreamTaskIter *pIter = NULL;
1,738✔
559
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,738✔
560
  if (code) {
1,738!
561
    mError("failed to create task iter for stream:%s", pStream->name);
×
562
    return code;
×
563
  }
564

565
  while (streamTaskIterNextTask(pIter)) {
10,669✔
566
    SStreamTask *pTask = NULL;
8,931✔
567
    code = streamTaskIterGetCurrent(pIter, &pTask);
8,931✔
568
    if (code) {
8,931!
569
      destroyStreamTaskIter(pIter);
×
570
      return code;
×
571
    }
572

573
    code = mndPersistTaskDeployReq(pTrans, pTask);
8,931✔
574
    if (code) {
8,931!
575
      destroyStreamTaskIter(pIter);
×
576
      return code;
×
577
    }
578
  }
579

580
  destroyStreamTaskIter(pIter);
1,738✔
581

582
  // persistent stream task for already stored ts data
583
  if (pStream->conf.fillHistory) {
1,738✔
584
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
803✔
585

586
    for (int32_t i = 0; i < level; i++) {
2,472✔
587
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
1,669✔
588

589
      int32_t numOfTasks = taosArrayGetSize(pLevel);
1,669✔
590
      for (int32_t j = 0; j < numOfTasks; j++) {
6,291✔
591
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
4,622✔
592
        code = mndPersistTaskDeployReq(pTrans, pTask);
4,622✔
593
        if (code) {
4,622!
594
          return code;
×
595
        }
596
      }
597
    }
598
  }
599

600
  return code;
1,738✔
601
}
602

603
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
1,738✔
604
  int32_t code = 0;
1,738✔
605
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
1,738!
606
    return code;
×
607
  }
608

609
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,738✔
610
}
611

612
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
1,557✔
613
  SStbObj *pStb = NULL;
1,557✔
614
  SDbObj  *pDb = NULL;
1,557✔
615
  int32_t  code = 0;
1,557✔
616
  int32_t  lino = 0;
1,557✔
617

618
  SMCreateStbReq createReq = {0};
1,557✔
619
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,557✔
620
  createReq.numOfColumns = pStream->outputSchema.nCols;
1,557✔
621
  createReq.numOfTags = 1;  // group id
1,557✔
622
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
1,557✔
623
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
1,557!
624

625
  // build fields
626
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
52,259✔
627
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
50,702✔
628
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
50,702!
629

630
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
50,702✔
631
    pField->flags = pStream->outputSchema.pSchema[i].flags;
50,702✔
632
    pField->type = pStream->outputSchema.pSchema[i].type;
50,702✔
633
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
50,702✔
634
    pField->compress = createDefaultColCmprByType(pField->type);
50,702✔
635
  }
636

637
  if (pStream->tagSchema.nCols == 0) {
1,557✔
638
    createReq.numOfTags = 1;
1,304✔
639
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
1,304✔
640
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
1,304!
641

642
    // build tags
643
    SField *pField = taosArrayGet(createReq.pTags, 0);
1,304✔
644
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
1,304!
645

646
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
1,304✔
647
    pField->type = TSDB_DATA_TYPE_UBIGINT;
1,304✔
648
    pField->flags = 0;
1,304✔
649
    pField->bytes = 8;
1,304✔
650
  } else {
651
    createReq.numOfTags = pStream->tagSchema.nCols;
253✔
652
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
253✔
653
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
253!
654

655
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
1,672✔
656
      SField *pField = taosArrayGet(createReq.pTags, i);
1,419✔
657
      if (pField == NULL) {
1,419!
658
        continue;
×
659
      }
660

661
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
1,419✔
662
      pField->flags = pStream->tagSchema.pSchema[i].flags;
1,419✔
663
      pField->type = pStream->tagSchema.pSchema[i].type;
1,419✔
664
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
1,419✔
665
    }
666
  }
667

668
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
1,557!
669
    goto _OVER;
×
670
  }
671

672
  pStb = mndAcquireStb(pMnode, createReq.name);
1,557✔
673
  if (pStb != NULL) {
1,557!
674
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
675
    goto _OVER;
×
676
  }
677

678
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
1,557✔
679
  if (pDb == NULL) {
1,557!
680
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
681
    goto _OVER;
×
682
  }
683

684
  int32_t numOfStbs = -1;
1,557✔
685
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
1,557!
686
    goto _OVER;
×
687
  }
688

689
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
1,557!
690
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
691
    goto _OVER;
×
692
  }
693

694
  SStbObj stbObj = {0};
1,557✔
695

696
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
1,557!
697
    goto _OVER;
×
698
  }
699

700
  stbObj.uid = pStream->targetStbUid;
1,557✔
701

702
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
1,557!
703
    mndFreeStb(&stbObj);
×
704
    goto _OVER;
×
705
  }
706

707
  tFreeSMCreateStbReq(&createReq);
1,557✔
708
  mndFreeStb(&stbObj);
1,557✔
709
  mndReleaseStb(pMnode, pStb);
1,557✔
710
  mndReleaseDb(pMnode, pDb);
1,557✔
711
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
1,557✔
712
  return code;
1,557✔
713

714
_OVER:
×
715
  tFreeSMCreateStbReq(&createReq);
×
716
  mndReleaseStb(pMnode, pStb);
×
717
  mndReleaseDb(pMnode, pDb);
×
718

719
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
720
         tstrerror(code));
721
  return code;
×
722
}
723

724
// 1. stream number check
725
// 2. target stable can not be target table of other existed streams.
726
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
1,710✔
727
  int32_t     numOfStream = 0;
1,710✔
728
  SStreamObj *pStream = NULL;
1,710✔
729
  void       *pIter = NULL;
1,710✔
730

731
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,930✔
732
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
2,221✔
733
      ++numOfStream;
1,540✔
734
    }
735

736
    sdbRelease(pMnode->pSdb, pStream);
2,221✔
737

738
    if (numOfStream > MND_STREAM_MAX_NUM) {
2,221!
739
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
740
             pStreamObj->name);
741
      sdbCancelFetch(pMnode->pSdb, pIter);
×
742
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
743
    }
744

745
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
2,221✔
746
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
747
             pStreamObj->name);
748
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
749
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
750
    }
751
  }
752

753
  return TSDB_CODE_SUCCESS;
1,709✔
754
}
755

756
static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
×
757

758
static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
×
759
                                       SStreamTask *pTask) {
760
  int32_t code = TSDB_CODE_SUCCESS;
×
761
  int32_t lino = 0;
×
762

763
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
764
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
765

766
  pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
×
767
  TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
×
768
  pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
×
769
  pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
×
770
  pTask->notifyInfo.streamName = taosStrdup(createReq->name);
×
771
  TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
×
772
  pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
×
773
  TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
×
774
  pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
×
775
  TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
×
776

777
_end:
×
778
  if (code != TSDB_CODE_SUCCESS) {
×
779
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
780
  }
781
  return code;
×
782
}
783

784
static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
1,709✔
785
  int32_t code = TSDB_CODE_SUCCESS;
1,709✔
786
  int32_t lino = 0;
1,709✔
787
  int32_t level = 0;
1,709✔
788
  int32_t nTasks = 0;
1,709✔
789
  SArray *pLevel = NULL;
1,709✔
790

791
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,709!
792
  TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,709!
793

794
  if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
1,709!
795
    goto _end;
1,709✔
796
  }
797

798
  level = taosArrayGetSize(pStream->tasks);
×
799
  for (int32_t i = 0; i < level; ++i) {
×
800
    pLevel = taosArrayGetP(pStream->tasks, i);
×
801
    nTasks = taosArrayGetSize(pLevel);
×
802
    for (int32_t j = 0; j < nTasks; ++j) {
×
803
      code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
804
      TSDB_CHECK_CODE(code, lino, _end);
×
805
    }
806
  }
807

808
  if (pStream->conf.fillHistory && createReq->notifyHistory) {
×
809
    level = taosArrayGetSize(pStream->pHTasksList);
×
810
    for (int32_t i = 0; i < level; ++i) {
×
811
      pLevel = taosArrayGetP(pStream->pHTasksList, i);
×
812
      nTasks = taosArrayGetSize(pLevel);
×
813
      for (int32_t j = 0; j < nTasks; ++j) {
×
814
        code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
815
        TSDB_CHECK_CODE(code, lino, _end);
×
816
      }
817
    }
818
  }
819

820
_end:
×
821
  if (code != TSDB_CODE_SUCCESS) {
1,709!
822
    mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
×
823
  }
824
  return code;
1,709✔
825
}
826

827
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
1,714✔
828
  SMnode     *pMnode = pReq->info.node;
1,714✔
829
  SStreamObj *pStream = NULL;
1,714✔
830
  SStreamObj  streamObj = {0};
1,714✔
831
  char       *sql = NULL;
1,714✔
832
  int32_t     sqlLen = 0;
1,714✔
833
  const char *pMsg = "create stream tasks on dnodes";
1,714✔
834
  int32_t     code = TSDB_CODE_SUCCESS;
1,714✔
835
  int32_t     lino = 0;
1,714✔
836
  STrans     *pTrans = NULL;
1,714✔
837

838
  SCMCreateStreamReq createReq = {0};
1,714✔
839
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
1,714✔
840
  TSDB_CHECK_CODE(code, lino, _OVER);
1,714!
841

842
#ifdef WINDOWS
843
  code = TSDB_CODE_MND_INVALID_PLATFORM;
844
  goto _OVER;
845
#endif
846

847
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
1,714!
848
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
1,714!
849
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
850
    goto _OVER;
×
851
  }
852

853
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
1,714✔
854
  if (pStream != NULL && code == 0) {
1,714!
855
    if (createReq.igExists) {
2✔
856
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
857
      mndReleaseStream(pMnode, pStream);
1✔
858
      tFreeSCMCreateStreamReq(&createReq);
1✔
859
      return code;
1✔
860
    } else {
861
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
862
      goto _OVER;
1✔
863
    }
864
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
1,712!
865
    goto _OVER;
×
866
  }
867

868
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
1,712!
869
    goto _OVER;
×
870
  }
871

872
  if (createReq.sql != NULL) {
1,712!
873
    sql = taosStrdup(createReq.sql);
1,712!
874
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
1,712!
875
  }
876

877
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
1,712✔
878
  if (pSourceDb == NULL) {
1,712!
879
    code = terrno;
×
880
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
881
          tstrerror(code));
882
    goto _OVER;
×
883
  }
884

885
  code = mndCheckForSnode(pMnode, pSourceDb);
1,712✔
886
  mndReleaseDb(pMnode, pSourceDb);
1,712✔
887
  if (code != 0) {
1,712✔
888
    goto _OVER;
2✔
889
  }
890

891
  // build stream obj from request
892
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
1,710!
893
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
894
    goto _OVER;
×
895
  }
896

897
  code = doStreamCheck(pMnode, &streamObj);
1,710✔
898
  TSDB_CHECK_CODE(code, lino, _OVER);
1,710✔
899

900
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
1,709✔
901
  if (pTrans == NULL || code) {
1,709!
902
    goto _OVER;
×
903
  }
904

905
  // create stb for stream
906
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
1,709✔
907
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
1,557!
908
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
909
      mndTransDrop(pTrans);
×
910
      goto _OVER;
×
911
    }
912
  } else {
913
    mDebug("stream:%s no need create stable", createReq.name);
152✔
914
  }
915

916
  // schedule stream task for stream obj
917
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
1,709✔
918
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,709!
919
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
920
    mndTransDrop(pTrans);
×
921
    goto _OVER;
×
922
  }
923

924
  // add notify info into all stream tasks
925
  code = addStreamNotifyInfo(&createReq, &streamObj);
1,709✔
926
  if (code != TSDB_CODE_SUCCESS) {
1,709!
927
    mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
×
928
    mndTransDrop(pTrans);
×
929
    goto _OVER;
×
930
  }
931

932
  // add stream to trans
933
  code = mndPersistStream(pTrans, &streamObj);
1,709✔
934
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,709!
935
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
936
    mndTransDrop(pTrans);
×
937
    goto _OVER;
×
938
  }
939

940
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
1,709!
941
    mndTransDrop(pTrans);
×
942
    goto _OVER;
×
943
  }
944

945
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
1,709!
946
    mndTransDrop(pTrans);
×
947
    goto _OVER;
×
948
  }
949

950
  // add into buffer firstly
951
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
952
  streamMutexLock(&execInfo.lock);
1,709✔
953
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
1,709✔
954
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
1,709✔
955
  streamMutexUnlock(&execInfo.lock);
1,709✔
956

957
  // execute creation
958
  code = mndTransPrepare(pMnode, pTrans);
1,709✔
959
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,709!
960
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
961
    mndTransDrop(pTrans);
×
962
    goto _OVER;
×
963
  }
964

965
  mndTransDrop(pTrans);
1,709✔
966

967
  SName dbname = {0};
1,709✔
968
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,709✔
969
  if (code) {
1,709!
970
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
971
    goto _OVER;
×
972
  }
973

974
  SName name = {0};
1,709✔
975
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
1,709✔
976
  if (code) {
1,709!
977
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
978
    goto _OVER;
×
979
  }
980

981
  // reuse this function for stream
982
  if (sql != NULL && sqlLen > 0) {
1,709!
983
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
984
  } else {
985
    char detail[1000] = {0};
1,709✔
986
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
1,709✔
987
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
1,709✔
988
  }
989

990
_OVER:
1,713✔
991
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,713!
992
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
4!
993
  } else {
994
    mDebug("stream:%s create stream completed", createReq.name);
1,709✔
995
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,709✔
996
  }
997

998
  mndReleaseStream(pMnode, pStream);
1,713✔
999
  tFreeSCMCreateStreamReq(&createReq);
1,713✔
1000
  tFreeStreamObj(&streamObj);
1,713✔
1001

1002
  if (sql != NULL) {
1,713✔
1003
    taosMemoryFreeClear(sql);
1,712!
1004
  }
1005

1006
  return code;
1,713✔
1007
}
1008

1009
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
1010
  SMnode          *pMnode = pReq->info.node;
×
1011
  SStreamObj      *pStream = NULL;
×
1012
  int32_t          code = 0;
×
1013
  SMPauseStreamReq pauseReq = {0};
×
1014

1015
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1016
    return TSDB_CODE_INVALID_MSG;
×
1017
  }
1018

1019
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
1020
  if (pStream == NULL || code != 0) {
×
1021
    if (pauseReq.igNotExists) {
×
1022
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
1023
      return 0;
×
1024
    } else {
1025
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
1026
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1027
    }
1028
  }
1029

1030
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
1031
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
1032
    sdbRelease(pMnode->pSdb, pStream);
×
1033
    return code;
×
1034
  }
1035

1036
  // check if it is conflict with other trans in both sourceDb and targetDb.
1037
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
1038
  if (code) {
×
1039
    sdbRelease(pMnode->pSdb, pStream);
×
1040
    return code;
×
1041
  }
1042

1043
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
1044
  if (updated) {
×
1045
    mError("tasks are not ready for restart, node update detected");
×
1046
    sdbRelease(pMnode->pSdb, pStream);
×
1047
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1048
  }
1049

1050
  STrans *pTrans = NULL;
×
1051
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
1052
                       &pTrans);
1053
  if (pTrans == NULL || code) {
×
1054
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1055
    sdbRelease(pMnode->pSdb, pStream);
×
1056
    return code;
×
1057
  }
1058

1059
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
1060
  if (code) {
×
1061
    sdbRelease(pMnode->pSdb, pStream);
×
1062
    mndTransDrop(pTrans);
×
1063
    return code;
×
1064
  }
1065

1066
  // if nodeUpdate happened, not send pause trans
1067
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
1068
  if (code) {
×
1069
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
1070
    sdbRelease(pMnode->pSdb, pStream);
×
1071
    mndTransDrop(pTrans);
×
1072
    return code;
×
1073
  }
1074

1075
  code = mndTransPrepare(pMnode, pTrans);
×
1076
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1077
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
1078
    sdbRelease(pMnode->pSdb, pStream);
×
1079
    mndTransDrop(pTrans);
×
1080
    return code;
×
1081
  }
1082

1083
  sdbRelease(pMnode->pSdb, pStream);
×
1084
  mndTransDrop(pTrans);
×
1085

1086
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1087
}
1088

1089
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
1,380✔
1090
  SStreamObj *pStream = NULL;
1,380✔
1091
  void       *pIter = NULL;
1,380✔
1092
  SSdb       *pSdb = pMnode->pSdb;
1,380✔
1093
  int64_t     maxChkptId = 0;
1,380✔
1094

1095
  while (1) {
1096
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4,873✔
1097
    if (pIter == NULL) break;
4,873✔
1098

1099
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
3,493✔
1100
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
3,493✔
1101
           pStream->checkpointId);
1102
    sdbRelease(pSdb, pStream);
3,493✔
1103
  }
1104

1105
  {  // check the max checkpoint id from all vnodes.
1106
    int64_t maxCheckpointId = -1;
1,380✔
1107
    if (lock) {
1,380✔
1108
      streamMutexLock(&execInfo.lock);
663✔
1109
    }
1110

1111
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
16,288✔
1112
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
14,908✔
1113
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
14,908✔
1114
      if (p == NULL || pEntry == NULL) {
14,908!
1115
        continue;
×
1116
      }
1117

1118
      if (pEntry->checkpointInfo.failed) {
14,908!
1119
        continue;
×
1120
      }
1121

1122
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
14,908✔
1123
        maxCheckpointId = pEntry->checkpointInfo.latestId;
2,070✔
1124
      }
1125
    }
1126

1127
    if (lock) {
1,380✔
1128
      streamMutexUnlock(&execInfo.lock);
663✔
1129
    }
1130

1131
    if (maxCheckpointId > maxChkptId) {
1,380!
1132
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1133
             maxCheckpointId);
1134
      maxChkptId = maxCheckpointId;
×
1135
    }
1136
  }
1137

1138
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
1,380✔
1139
  return maxChkptId + 1;
1,380✔
1140
}
1141

1142
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
1,382✔
1143
                                               int8_t mndTrigger, bool lock) {
1144
  int32_t code = TSDB_CODE_SUCCESS;
1,382✔
1145
  bool    conflict = false;
1,382✔
1146
  int64_t ts = taosGetTimestampMs();
1,382✔
1147
  STrans *pTrans = NULL;
1,382✔
1148

1149
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
1,382!
1150
    return code;
×
1151
  }
1152

1153
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
1,382✔
1154
  if (code) {
1,382!
1155
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1156
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
1157
    goto _ERR;
×
1158
  }
1159

1160
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
1,382✔
1161
                       "gen checkpoint for stream", &pTrans);
1162
  if (code) {
1,382!
1163
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1164
           tstrerror(code));
1165
    goto _ERR;
×
1166
  }
1167

1168
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
1,382✔
1169
  if (code) {
1,382!
1170
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
1171
    goto _ERR;
×
1172
  }
1173

1174
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
1,382✔
1175

1176
  taosWLockLatch(&pStream->lock);
1,382✔
1177
  pStream->currentTick = 1;
1,382✔
1178

1179
  // 1. redo action: broadcast checkpoint source msg for all source vg
1180
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
1,382✔
1181
  for (int32_t i = 0; i < totalLevel; i++) {
4,182✔
1182
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
2,800✔
1183
    SStreamTask *p = taosArrayGetP(pLevel, 0);
2,800✔
1184

1185
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
2,800✔
1186
      int32_t sz = taosArrayGetSize(pLevel);
1,382✔
1187
      for (int32_t j = 0; j < sz; j++) {
4,563✔
1188
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
3,181✔
1189
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
3,181✔
1190

1191
        if (code != TSDB_CODE_SUCCESS) {
3,181!
1192
          taosWUnLockLatch(&pStream->lock);
×
1193
          goto _ERR;
×
1194
        }
1195
      }
1196
    }
1197
  }
1198

1199
  // 2. reset tick
1200
  pStream->checkpointId = checkpointId;
1,382✔
1201
  pStream->checkpointFreq = taosGetTimestampMs();
1,382✔
1202
  pStream->currentTick = 0;
1,382✔
1203

1204
  // 3. commit log: stream checkpoint info
1205
  pStream->version = pStream->version + 1;
1,382✔
1206
  taosWUnLockLatch(&pStream->lock);
1,382✔
1207

1208
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
1,382!
1209
    goto _ERR;
×
1210
  }
1211

1212
  code = mndTransPrepare(pMnode, pTrans);
1,382✔
1213
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,382!
1214
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1215
  } else {
1216
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,382✔
1217
  }
1218

1219
_ERR:
1,382✔
1220
  mndTransDrop(pTrans);
1,382✔
1221
  return code;
1,382✔
1222
}
1223

1224
int32_t extractStreamNodeList(SMnode *pMnode) {
3,714✔
1225
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
3,714✔
1226
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
779✔
1227
    if (code) {
779!
1228
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
1229
      return code;
×
1230
    }
1231
  }
1232

1233
  return taosArrayGetSize(execInfo.pNodeList);
3,714✔
1234
}
1235

1236
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,810✔
1237
  bool ready = true;
1,810✔
1238
  if (mndStreamNodeIsUpdated(pMnode)) {
1,810✔
1239
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
33✔
1240
  }
1241

1242
  streamMutexLock(&execInfo.lock);
1,777✔
1243
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,777✔
1244
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
779✔
1245
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
779!
1246
      streamMutexUnlock(&execInfo.lock);
×
1247
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
1248
      return TSDB_CODE_FAILED;
×
1249
    }
1250
  }
1251

1252
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
11,385✔
1253
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
9,745✔
1254
    if (p == NULL) {
9,745!
1255
      continue;
×
1256
    }
1257

1258
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
9,745✔
1259
    if (pEntry == NULL) {
9,745!
1260
      continue;
×
1261
    }
1262

1263
    if (pEntry->status != TASK_STATUS__READY) {
9,745✔
1264
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
123✔
1265
             (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1266
      ready = false;
123✔
1267
      break;
123✔
1268
    }
1269

1270
    if (pEntry->hTaskId != 0) {
9,622✔
1271
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
14✔
1272
             " exists, checkpoint not issued",
1273
             pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1274
             pEntry->hTaskId);
1275
      ready = false;
14✔
1276
      break;
14✔
1277
    }
1278
  }
1279

1280
  streamMutexUnlock(&execInfo.lock);
1,777✔
1281
  return ready ? 0 : -1;
1,777✔
1282
}
1283

1284
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
1,788✔
1285
  int64_t ts = -1;
1,788✔
1286
  int32_t taskId = -1;
1,788✔
1287

1288
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
30,095✔
1289
    STaskId          *p = taosArrayGet(pTaskList, i);
28,307✔
1290
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
28,307✔
1291
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
28,307!
1292
      continue;
21,747✔
1293
    }
1294

1295
    if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
6,560!
1296
      ts = pEntry->startTime;
3,861✔
1297
      taskId = pEntry->id.taskId;
3,861✔
1298
    }
1299
  }
1300

1301
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
1,788✔
1302
  return ts;
1,788✔
1303
}
1304

1305
typedef struct {
1306
  int64_t streamId;
1307
  int64_t duration;
1308
} SCheckpointInterval;
1309

1310
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
722✔
1311
  const SCheckpointInterval *pInt1 = p1;
722✔
1312
  const SCheckpointInterval *pInt2 = p2;
722✔
1313
  if (pInt1->duration == pInt2->duration) {
722✔
1314
    return 0;
54✔
1315
  }
1316

1317
  return pInt1->duration > pInt2->duration ? -1 : 1;
668✔
1318
}
1319

1320
// all tasks of this stream should be ready, otherwise do nothing
1321
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
1,788✔
1322
  bool ready = false;
1,788✔
1323

1324
  streamMutexLock(&execInfo.lock);
1,788✔
1325

1326
  int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
1,788✔
1327
  if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
1,788!
1328
    if (lastReadyTs != -1) {
432✔
1329
      mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold",
430!
1330
            pStream->uid, lastReadyTs, now - lastReadyTs);
1331
    } else {
1332
      mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid);
2!
1333
    }
1334

1335
    ready = false;
432✔
1336
  } else {
1337
    ready = true;
1,356✔
1338
  }
1339

1340
  streamMutexUnlock(&execInfo.lock);
1,788✔
1341
  return ready;
1,788✔
1342
}
1343

1344
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,810✔
1345
  SMnode     *pMnode = pReq->info.node;
1,810✔
1346
  SSdb       *pSdb = pMnode->pSdb;
1,810✔
1347
  void       *pIter = NULL;
1,810✔
1348
  SStreamObj *pStream = NULL;
1,810✔
1349
  int32_t     code = 0;
1,810✔
1350
  int32_t     numOfCheckpointTrans = 0;
1,810✔
1351

1352
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,810✔
1353
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
170✔
1354
  }
1355

1356
  SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
1,640✔
1357
  if (pList == NULL) {
1,640!
1358
    return terrno;
×
1359
  }
1360

1361
  int64_t now = taosGetTimestampMs();
1,640✔
1362

1363
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
4,176✔
1364
    int64_t duration = now - pStream->checkpointFreq;
2,536✔
1365
    if (duration < tsStreamCheckpointInterval * 1000) {
2,536✔
1366
      sdbRelease(pSdb, pStream);
748✔
1367
      continue;
1,180✔
1368
    }
1369

1370
    bool ready = isStreamReadyHelp(now, pStream);
1,788✔
1371
    if (!ready) {
1,788✔
1372
      sdbRelease(pSdb, pStream);
432✔
1373
      continue;
432✔
1374
    }
1375

1376
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
1,356✔
1377
    void               *p = taosArrayPush(pList, &in);
1,356✔
1378
    if (p) {
1,356!
1379
      int32_t currentSize = taosArrayGetSize(pList);
1,356✔
1380
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
1,356✔
1381
             "s), concurrently launch threshold:%d",
1382
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1383
             tsMaxConcurrentCheckpoint);
1384
    } else {
1385
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1386
    }
1387
    sdbRelease(pSdb, pStream);
1,356✔
1388
  }
1389

1390
  int32_t size = taosArrayGetSize(pList);
1,640✔
1391
  if (size == 0) {
1,640✔
1392
    taosArrayDestroy(pList);
977✔
1393
    return code;
977✔
1394
  }
1395

1396
  taosArraySort(pList, streamWaitComparFn);
663✔
1397
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
663✔
1398
  if (code) {
663!
1399
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1400
    taosArrayDestroy(pList);
×
1401
    return code;
×
1402
  }
1403

1404
  int32_t numOfQual = taosArrayGetSize(pList);
663✔
1405
  if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
663!
1406
    mDebug(
×
1407
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1408
        "checkpoint trans are not allowed, wait for 30s",
1409
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1410
    taosArrayDestroy(pList);
×
1411
    return code;
×
1412
  }
1413

1414
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
663✔
1415
  mDebug(
663✔
1416
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1417
      "concurrent trans threshold:%d",
1418
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1419

1420
  int32_t started = 0;
663✔
1421
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
663✔
1422

1423
  for (int32_t i = 0; i < numOfQual; ++i) {
667✔
1424
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
665✔
1425
    if (pCheckpointInfo == NULL) {
665!
1426
      continue;
×
1427
    }
1428

1429
    SStreamObj *p = NULL;
665✔
1430
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
665✔
1431
    if (p != NULL && code == 0) {
665!
1432
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
665✔
1433
      sdbRelease(pSdb, p);
665✔
1434

1435
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
665!
1436
        started += 1;
665✔
1437

1438
        if (started >= capacity) {
665✔
1439
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
661✔
1440
                 (started + numOfCheckpointTrans));
1441
          break;
661✔
1442
        }
1443
      } else {
1444
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1445
      }
1446
    }
1447
  }
1448

1449
  taosArrayDestroy(pList);
663✔
1450
  return code;
663✔
1451
}
1452

1453
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
1,388✔
1454
  SMnode     *pMnode = pReq->info.node;
1,388✔
1455
  SStreamObj *pStream = NULL;
1,388✔
1456
  int32_t     code = 0;
1,388✔
1457

1458
  SMDropStreamReq dropReq = {0};
1,388✔
1459
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
1,388!
1460
    mError("invalid drop stream msg recv, discarded");
×
1461
    code = TSDB_CODE_INVALID_MSG;
×
1462
    TAOS_RETURN(code);
×
1463
  }
1464

1465
  mDebug("recv drop stream:%s msg", dropReq.name);
1,388✔
1466

1467
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
1,388✔
1468
  if (pStream == NULL || code != 0) {
1,388!
1469
    if (dropReq.igNotExists) {
139✔
1470
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
131!
1471
      sdbRelease(pMnode->pSdb, pStream);
131✔
1472
      tFreeMDropStreamReq(&dropReq);
131✔
1473
      return 0;
131✔
1474
    } else {
1475
      mError("stream:%s not exist failed to drop it", dropReq.name);
8!
1476
      tFreeMDropStreamReq(&dropReq);
8✔
1477
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
8✔
1478
    }
1479
  }
1480

1481
  if (pStream->smaId != 0) {
1,249✔
1482
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
193!
1483

1484
    void    *pIter = NULL;
193✔
1485
    SSmaObj *pSma = NULL;
193✔
1486
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
193✔
1487
    while (pIter) {
313✔
1488
      if (pSma && pSma->uid == pStream->smaId) {
124!
1489
        sdbRelease(pMnode->pSdb, pSma);
4✔
1490
        sdbRelease(pMnode->pSdb, pStream);
4✔
1491

1492
        sdbCancelFetch(pMnode->pSdb, pIter);
4✔
1493
        tFreeMDropStreamReq(&dropReq);
4✔
1494
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
4✔
1495

1496
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
4!
1497
               dropReq.name, pStream->uid, tstrerror(terrno));
1498
        TAOS_RETURN(code);
4✔
1499
      }
1500

1501
      if (pSma) {
120!
1502
        sdbRelease(pMnode->pSdb, pSma);
120✔
1503
      }
1504

1505
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
120✔
1506
    }
1507
  }
1508

1509
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
1,245!
1510
    sdbRelease(pMnode->pSdb, pStream);
×
1511
    tFreeMDropStreamReq(&dropReq);
×
1512
    return -1;
×
1513
  }
1514

1515
  // check if it is conflict with other trans in both sourceDb and targetDb.
1516
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
1,245✔
1517
  if (code) {
1,245!
1518
    sdbRelease(pMnode->pSdb, pStream);
×
1519
    tFreeMDropStreamReq(&dropReq);
×
1520
    return code;
×
1521
  }
1522

1523
  STrans *pTrans = NULL;
1,245✔
1524
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
1,245✔
1525
  if (pTrans == NULL || code) {
1,245!
1526
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1527
    sdbRelease(pMnode->pSdb, pStream);
×
1528
    tFreeMDropStreamReq(&dropReq);
×
1529
    TAOS_RETURN(code);
×
1530
  }
1531

1532
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
1,245✔
1533
  if (code) {
1,245!
1534
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1535
    sdbRelease(pMnode->pSdb, pStream);
×
1536
    mndTransDrop(pTrans);
×
1537
    tFreeMDropStreamReq(&dropReq);
×
1538
    TAOS_RETURN(code);
×
1539
  }
1540

1541
  // drop all tasks
1542
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
1,245✔
1543
  if (code) {
1,245!
1544
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1545
    sdbRelease(pMnode->pSdb, pStream);
×
1546
    mndTransDrop(pTrans);
×
1547
    tFreeMDropStreamReq(&dropReq);
×
1548
    TAOS_RETURN(code);
×
1549
  }
1550

1551
  // drop stream
1552
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
1,245✔
1553
  if (code) {
1,245!
1554
    sdbRelease(pMnode->pSdb, pStream);
×
1555
    mndTransDrop(pTrans);
×
1556
    tFreeMDropStreamReq(&dropReq);
×
1557
    TAOS_RETURN(code);
×
1558
  }
1559

1560
  code = mndTransPrepare(pMnode, pTrans);
1,245✔
1561
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,245!
1562
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1563
    sdbRelease(pMnode->pSdb, pStream);
×
1564
    mndTransDrop(pTrans);
×
1565
    tFreeMDropStreamReq(&dropReq);
×
1566
    TAOS_RETURN(code);
×
1567
  }
1568

1569
  // kill the related checkpoint trans
1570
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
1,245✔
1571
  if (transId != 0) {
1,245!
1572
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
1573
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1574
  }
1575

1576
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
1,245✔
1577
         pStream->uid, transId);
1578

1579
  removeStreamTasksInBuf(pStream, &execInfo);
1,245✔
1580

1581
  SName name = {0};
1,245✔
1582
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,245✔
1583
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
1,245✔
1584

1585
  sdbRelease(pMnode->pSdb, pStream);
1,245✔
1586
  mndTransDrop(pTrans);
1,245✔
1587
  tFreeMDropStreamReq(&dropReq);
1,245✔
1588

1589
  if (code == 0) {
1,245✔
1590
    return TSDB_CODE_ACTION_IN_PROGRESS;
1,237✔
1591
  } else {
1592
    TAOS_RETURN(code);
8✔
1593
  }
1594
}
1595

1596
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,943✔
1597
  SSdb   *pSdb = pMnode->pSdb;
1,943✔
1598
  void   *pIter = NULL;
1,943✔
1599
  int32_t code = 0;
1,943✔
1600

1601
  while (1) {
557✔
1602
    SStreamObj *pStream = NULL;
2,500✔
1603
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,500✔
1604
    if (pIter == NULL) break;
2,500✔
1605

1606
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
558✔
1607
      if (pStream->sourceDbUid != pStream->targetDbUid) {
78✔
1608
        sdbRelease(pSdb, pStream);
1✔
1609
        sdbCancelFetch(pSdb, pIter);
1✔
1610
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1611
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1612
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1613
      } else {
1614
        // kill the related checkpoint trans
1615
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
77✔
1616
        if (transId != 0) {
77!
1617
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
1618
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1619
        }
1620

1621
        // drop the stream obj in execInfo
1622
        removeStreamTasksInBuf(pStream, &execInfo);
77✔
1623

1624
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
77✔
1625
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
77!
1626
          sdbRelease(pSdb, pStream);
×
1627
          sdbCancelFetch(pSdb, pIter);
×
1628
          return code;
×
1629
        }
1630
      }
1631
    }
1632

1633
    sdbRelease(pSdb, pStream);
557✔
1634
  }
1635

1636
  return 0;
1,942✔
1637
}
1638

1639
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,267✔
1640
  SMnode     *pMnode = pReq->info.node;
11,267✔
1641
  SSdb       *pSdb = pMnode->pSdb;
11,267✔
1642
  int32_t     numOfRows = 0;
11,267✔
1643
  SStreamObj *pStream = NULL;
11,267✔
1644
  int32_t     code = 0;
11,267✔
1645

1646
  while (numOfRows < rows) {
44,480!
1647
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
44,480✔
1648
    if (pShow->pIter == NULL) break;
44,481✔
1649

1650
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
33,209✔
1651
    if (code == 0) {
33,133!
1652
      numOfRows++;
33,133✔
1653
    }
1654
    sdbRelease(pSdb, pStream);
33,133✔
1655
  }
1656

1657
  pShow->numOfRows += numOfRows;
11,272✔
1658
  return numOfRows;
11,272✔
1659
}
1660

1661
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1662
  SSdb *pSdb = pMnode->pSdb;
×
1663
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1664
}
×
1665

1666
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
20,767✔
1667
  SMnode     *pMnode = pReq->info.node;
20,767✔
1668
  SSdb       *pSdb = pMnode->pSdb;
20,767✔
1669
  int32_t     numOfRows = 0;
20,767✔
1670
  SStreamObj *pStream = NULL;
20,767✔
1671
  int32_t     code = 0;
20,767✔
1672

1673
  streamMutexLock(&execInfo.lock);
20,767✔
1674
  mndInitStreamExecInfo(pMnode, &execInfo);
20,779✔
1675
  streamMutexUnlock(&execInfo.lock);
20,779✔
1676

1677
  while (numOfRows < rowsCapacity) {
83,702✔
1678
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
83,652✔
1679
    if (pShow->pIter == NULL) {
83,635✔
1680
      break;
20,728✔
1681
    }
1682

1683
    // lock
1684
    taosRLockLatch(&pStream->lock);
62,907✔
1685

1686
    int32_t count = mndGetNumOfStreamTasks(pStream);
62,923✔
1687
    if (numOfRows + count > rowsCapacity) {
62,805✔
1688
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
40✔
1689
      if (code) {
40!
1690
        mError("failed to prepare the result block buffer, quit return value");
×
1691
        taosRUnLockLatch(&pStream->lock);
×
1692
        sdbRelease(pSdb, pStream);
×
1693
        continue;
×
1694
      }
1695
    }
1696

1697
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
62,805✔
1698
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
62,805✔
1699
    if (pSourceDb != NULL) {
62,881!
1700
      precision = pSourceDb->cfg.precision;
62,885✔
1701
      mndReleaseDb(pMnode, pSourceDb);
62,885✔
1702
    }
1703

1704
    // add row for each task
1705
    SStreamTaskIter *pIter = NULL;
62,921✔
1706
    code = createStreamTaskIter(pStream, &pIter);
62,921✔
1707
    if (code) {
62,915✔
1708
      taosRUnLockLatch(&pStream->lock);
1✔
1709
      sdbRelease(pSdb, pStream);
×
1710
      mError("failed to create task iter for stream:%s", pStream->name);
×
1711
      continue;
×
1712
    }
1713

1714
    while (streamTaskIterNextTask(pIter)) {
277,299✔
1715
      SStreamTask *pTask = NULL;
214,895✔
1716
      code = streamTaskIterGetCurrent(pIter, &pTask);
214,895✔
1717
      if (code) {
215,029!
1718
        destroyStreamTaskIter(pIter);
×
1719
        break;
×
1720
      }
1721

1722
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
215,029✔
1723
      if (code == TSDB_CODE_SUCCESS) {
214,385!
1724
        numOfRows++;
214,428✔
1725
      }
1726
    }
1727

1728
    pBlock->info.rows = numOfRows;
61,753✔
1729

1730
    destroyStreamTaskIter(pIter);
61,753✔
1731
    taosRUnLockLatch(&pStream->lock);
62,879✔
1732

1733
    sdbRelease(pSdb, pStream);
62,908✔
1734
  }
1735

1736
  pShow->numOfRows += numOfRows;
20,778✔
1737
  return numOfRows;
20,778✔
1738
}
1739

1740
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
1741
  SSdb *pSdb = pMnode->pSdb;
×
1742
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1743
}
×
1744

1745
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
720✔
1746
  SMnode     *pMnode = pReq->info.node;
720✔
1747
  SStreamObj *pStream = NULL;
720✔
1748
  int32_t     code = 0;
720✔
1749

1750
  SMPauseStreamReq pauseReq = {0};
720✔
1751
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
720!
1752
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1753
  }
1754

1755
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
720✔
1756
  if (pStream == NULL || code != 0) {
720!
1757
    if (pauseReq.igNotExists) {
380✔
1758
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
127!
1759
      return 0;
127✔
1760
    } else {
1761
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
253!
1762
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
253✔
1763
    }
1764
  }
1765

1766
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
340!
1767

1768
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
340!
1769
    sdbRelease(pMnode->pSdb, pStream);
×
1770
    return code;
×
1771
  }
1772

1773
  // check if it is conflict with other trans in both sourceDb and targetDb.
1774
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
340✔
1775
  if (code) {
340!
1776
    sdbRelease(pMnode->pSdb, pStream);
×
1777
    TAOS_RETURN(code);
×
1778
  }
1779

1780
  bool updated = mndStreamNodeIsUpdated(pMnode);
340✔
1781
  if (updated) {
340!
1782
    mError("tasks are not ready for pause, node update detected");
×
1783
    sdbRelease(pMnode->pSdb, pStream);
×
1784
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1785
  }
1786

1787
  {  // check for tasks, if tasks are not ready, not allowed to pause
1788
    bool found = false;
340✔
1789
    bool readyToPause = true;
340✔
1790
    streamMutexLock(&execInfo.lock);
340✔
1791

1792
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
4,935✔
1793
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
4,595✔
1794
      if (p == NULL) {
4,595!
1795
        continue;
×
1796
      }
1797

1798
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
4,595✔
1799
      if (pEntry == NULL) {
4,595!
1800
        continue;
×
1801
      }
1802

1803
      if (pEntry->id.streamId != pStream->uid) {
4,595✔
1804
        continue;
2,971✔
1805
      }
1806

1807
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
1,624!
1808
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
148!
1809
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1810
        readyToPause = false;
148✔
1811
      }
1812

1813
      found = true;
1,624✔
1814
    }
1815

1816
    streamMutexUnlock(&execInfo.lock);
340✔
1817
    if (!found) {
340!
1818
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1819
      sdbRelease(pMnode->pSdb, pStream);
×
1820
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1821
    }
1822

1823
    if (!readyToPause) {
340✔
1824
      mError("stream:%s task not ready for pause yet", pauseReq.name);
47!
1825
      sdbRelease(pMnode->pSdb, pStream);
47✔
1826
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
47✔
1827
    }
1828
  }
1829

1830
  STrans *pTrans = NULL;
293✔
1831
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
293✔
1832
  if (pTrans == NULL || code) {
293!
1833
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1834
    sdbRelease(pMnode->pSdb, pStream);
×
1835
    return code;
×
1836
  }
1837

1838
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
293✔
1839
  if (code) {
293!
1840
    sdbRelease(pMnode->pSdb, pStream);
×
1841
    mndTransDrop(pTrans);
×
1842
    return code;
×
1843
  }
1844

1845
  // if nodeUpdate happened, not send pause trans
1846
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
293✔
1847
  if (code) {
293!
1848
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1849
    sdbRelease(pMnode->pSdb, pStream);
×
1850
    mndTransDrop(pTrans);
×
1851
    return code;
×
1852
  }
1853

1854
  // pause stream
1855
  taosWLockLatch(&pStream->lock);
293✔
1856
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
293✔
1857
  if (code) {
293!
1858
    taosWUnLockLatch(&pStream->lock);
×
1859
    sdbRelease(pMnode->pSdb, pStream);
×
1860
    mndTransDrop(pTrans);
×
1861
    return code;
×
1862
  }
1863

1864
  taosWUnLockLatch(&pStream->lock);
293✔
1865

1866
  code = mndTransPrepare(pMnode, pTrans);
293✔
1867
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
293!
1868
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1869
    sdbRelease(pMnode->pSdb, pStream);
×
1870
    mndTransDrop(pTrans);
×
1871
    return code;
×
1872
  }
1873

1874
  sdbRelease(pMnode->pSdb, pStream);
293✔
1875
  mndTransDrop(pTrans);
293✔
1876

1877
  return TSDB_CODE_ACTION_IN_PROGRESS;
293✔
1878
}
1879

1880
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
798✔
1881
  SMnode     *pMnode = pReq->info.node;
798✔
1882
  SStreamObj *pStream = NULL;
798✔
1883
  int32_t     code = 0;
798✔
1884

1885
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
798!
1886
    return code;
×
1887
  }
1888

1889
  SMResumeStreamReq resumeReq = {0};
798✔
1890
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
798!
1891
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1892
  }
1893

1894
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
798✔
1895
  if (pStream == NULL || code != 0) {
798!
1896
    if (resumeReq.igNotExists) {
254✔
1897
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
253!
1898
      sdbRelease(pMnode->pSdb, pStream);
253✔
1899
      return 0;
253✔
1900
    } else {
1901
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1902
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1903
    }
1904
  }
1905

1906
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
544!
1907
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
544!
1908
    sdbRelease(pMnode->pSdb, pStream);
×
1909
    return -1;
×
1910
  }
1911

1912
  // check if it is conflict with other trans in both sourceDb and targetDb.
1913
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
544✔
1914
  if (code) {
544!
1915
    sdbRelease(pMnode->pSdb, pStream);
×
1916
    return code;
×
1917
  }
1918

1919
  STrans *pTrans = NULL;
544✔
1920
  code =
1921
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
544✔
1922
  if (pTrans == NULL || code) {
544!
1923
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
1924
    sdbRelease(pMnode->pSdb, pStream);
×
1925
    return code;
×
1926
  }
1927

1928
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
544✔
1929
  if (code) {
544!
1930
    sdbRelease(pMnode->pSdb, pStream);
×
1931
    mndTransDrop(pTrans);
×
1932
    return code;
×
1933
  }
1934

1935
  // set the resume action
1936
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
544✔
1937
  if (code) {
544!
1938
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
1939
    sdbRelease(pMnode->pSdb, pStream);
×
1940
    mndTransDrop(pTrans);
×
1941
    return code;
×
1942
  }
1943

1944
  // resume stream
1945
  taosWLockLatch(&pStream->lock);
544✔
1946
  pStream->status = STREAM_STATUS__NORMAL;
544✔
1947
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
544!
1948
    taosWUnLockLatch(&pStream->lock);
×
1949

1950
    sdbRelease(pMnode->pSdb, pStream);
×
1951
    mndTransDrop(pTrans);
×
1952
    return code;
×
1953
  }
1954

1955
  taosWUnLockLatch(&pStream->lock);
544✔
1956
  code = mndTransPrepare(pMnode, pTrans);
544✔
1957
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
544!
1958
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1959
    sdbRelease(pMnode->pSdb, pStream);
×
1960
    mndTransDrop(pTrans);
×
1961
    return code;
×
1962
  }
1963

1964
  sdbRelease(pMnode->pSdb, pStream);
544✔
1965
  mndTransDrop(pTrans);
544✔
1966

1967
  return TSDB_CODE_ACTION_IN_PROGRESS;
544✔
1968
}
1969

1970
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
1971
  SMnode     *pMnode = pReq->info.node;
×
1972
  SStreamObj *pStream = NULL;
×
1973
  int32_t     code = 0;
×
1974

1975
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
1976
    return code;
×
1977
  }
1978

1979
  SMResetStreamReq resetReq = {0};
×
1980
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
1981
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1982
  }
1983

1984
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
1985

1986
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
1987
  if (pStream == NULL || code != 0) {
×
1988
    if (resetReq.igNotExists) {
×
1989
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
1990
      return 0;
×
1991
    } else {
1992
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
1993
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1994
    }
1995
  }
1996

1997
  //todo(liao hao jun)
1998
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1999
}
2000

2001
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
11✔
2002
  SSdb       *pSdb = pMnode->pSdb;
11✔
2003
  SStreamObj *pStream = NULL;
11✔
2004
  void       *pIter = NULL;
11✔
2005
  STrans     *pTrans = NULL;
11✔
2006
  int32_t     code = 0;
11✔
2007

2008
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
2009
  while (1) {
2010
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
22✔
2011
    if (pIter == NULL) {
22✔
2012
      break;
11✔
2013
    }
2014

2015
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
11✔
2016
    sdbRelease(pSdb, pStream);
11✔
2017

2018
    if (code) {
11!
2019
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
2020
      sdbCancelFetch(pSdb, pIter);
×
2021
      return code;
×
2022
    }
2023
  }
2024

2025
  while (1) {
2026
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
22✔
2027
    if (pIter == NULL) {
22✔
2028
      break;
11✔
2029
    }
2030

2031
    // here create only one trans
2032
    if (pTrans == NULL) {
11!
2033
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
11✔
2034
                           "update task epsets", &pTrans);
2035
      if (pTrans == NULL || code) {
11!
2036
        sdbRelease(pSdb, pStream);
×
2037
        sdbCancelFetch(pSdb, pIter);
×
2038
        return terrno = code;
×
2039
      }
2040
    }
2041

2042
    if (!includeAllNodes) {
11!
2043
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
11✔
2044
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
11✔
2045
      if (p1 == NULL && p2 == NULL) {
11!
2046
        mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
×
2047
        sdbRelease(pSdb, pStream);
×
2048
        continue;
×
2049
      }
2050
    }
2051

2052
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
11✔
2053
           pStream->name, pTrans->id);
2054

2055
    // NOTE: for each stream, we register one trans entry for task update
2056
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
11✔
2057
    if (code) {
11!
2058
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
2059
    }
2060

2061
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
11✔
2062

2063
    // todo: not continue, drop all and retry again
2064
    if (code != TSDB_CODE_SUCCESS) {
11!
2065
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
2066
             tstrerror(code));
2067
      sdbRelease(pSdb, pStream);
×
2068
      continue;
×
2069
    }
2070

2071
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
11✔
2072
    sdbRelease(pSdb, pStream);
11✔
2073

2074
    if (code != TSDB_CODE_SUCCESS) {
11!
2075
      sdbCancelFetch(pSdb, pIter);
×
2076
      return code;
×
2077
    }
2078
  }
2079

2080
  // no need to build the trans to handle the vgroup update
2081
  if (pTrans == NULL) {
11!
2082
    return 0;
×
2083
  }
2084

2085
  code = mndTransPrepare(pMnode, pTrans);
11✔
2086
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11!
2087
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2088
    sdbRelease(pMnode->pSdb, pStream);
×
2089
    mndTransDrop(pTrans);
×
2090
    return code;
×
2091
  }
2092

2093
  sdbRelease(pMnode->pSdb, pStream);
11✔
2094
  mndTransDrop(pTrans);
11✔
2095
  return code;
11✔
2096
}
2097

2098
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
790✔
2099
  SSdb       *pSdb = pMnode->pSdb;
790✔
2100
  SStreamObj *pStream = NULL;
790✔
2101
  void       *pIter = NULL;
790✔
2102
  int32_t     code = 0;
790✔
2103

2104
  mDebug("start to refresh node list by existed streams");
790✔
2105

2106
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
790✔
2107
  if (pHash == NULL) {
790!
2108
    return terrno;
×
2109
  }
2110

2111
  while (1) {
11✔
2112
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
801✔
2113
    if (pIter == NULL) {
801✔
2114
      break;
790✔
2115
    }
2116

2117
    taosWLockLatch(&pStream->lock);
11✔
2118

2119
    SStreamTaskIter *pTaskIter = NULL;
11✔
2120
    code = createStreamTaskIter(pStream, &pTaskIter);
11✔
2121
    if (code) {
11!
2122
      taosWUnLockLatch(&pStream->lock);
×
2123
      sdbRelease(pSdb, pStream);
×
2124
      mError("failed to create task iter for stream:%s", pStream->name);
×
2125
      continue;
×
2126
    }
2127

2128
    while (streamTaskIterNextTask(pTaskIter)) {
78✔
2129
      SStreamTask *pTask = NULL;
67✔
2130
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
67✔
2131
      if (code) {
67!
2132
        break;
×
2133
      }
2134

2135
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
67✔
2136
      epsetAssign(&entry.epset, &pTask->info.epSet);
67✔
2137
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
67✔
2138
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
67!
2139
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2140
      }
2141
    }
2142

2143
    destroyStreamTaskIter(pTaskIter);
11✔
2144
    taosWUnLockLatch(&pStream->lock);
11✔
2145

2146
    sdbRelease(pSdb, pStream);
11✔
2147
  }
2148

2149
  taosArrayClear(pNodeList);
790✔
2150

2151
  // convert to list
2152
  pIter = NULL;
790✔
2153
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
825✔
2154
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
35✔
2155

2156
    void *p = taosArrayPush(pNodeList, pEntry);
35✔
2157
    if (p == NULL) {
35!
2158
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2159
      if (code == 0) {
×
2160
        code = terrno;
×
2161
      }
2162
      continue;
×
2163
    }
2164

2165
    char    buf[256] = {0};
35✔
2166
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
35✔
2167
    if (ret != 0) {                                                // print error and continue
35!
2168
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2169
    }
2170

2171
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
35✔
2172
  }
2173

2174
  taosHashCleanup(pHash);
790✔
2175

2176
  mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
790✔
2177
  return code;
790✔
2178
}
2179

2180
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2181
  void   *pIter = NULL;
×
2182
  int32_t code = 0;
×
2183
  while (1) {
×
2184
    SVgObj *pVgroup = NULL;
×
2185
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2186
    if (pIter == NULL) {
×
2187
      break;
×
2188
    }
2189

2190
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2191
    sdbRelease(pSdb, pVgroup);
×
2192

2193
    if (code == 0) {
×
2194
      int32_t size = taosHashGetSize(pDBMap);
×
2195
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2196
    }
2197
  }
2198
}
×
2199

2200
// this function runs by only one thread, so it is not multi-thread safe
2201
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
1,564✔
2202
  int32_t code = 0;
1,564✔
2203
  bool    allReady = true;
1,564✔
2204
  SArray *pNodeSnapshot = NULL;
1,564✔
2205
  SMnode *pMnode = pMsg->info.node;
1,564✔
2206
  int64_t ts = taosGetTimestampSec();
1,564✔
2207
  bool    updateAllVgroups = false;
1,564✔
2208

2209
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
1,564✔
2210
  if (old != 0) {
1,564!
2211
    mDebug("still in checking node change");
×
2212
    return 0;
×
2213
  }
2214

2215
  mDebug("start to do node changing check");
1,564✔
2216

2217
  streamMutexLock(&execInfo.lock);
1,564✔
2218
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,564✔
2219
  streamMutexUnlock(&execInfo.lock);
1,564✔
2220

2221
  if (numOfNodes == 0) {
1,564!
2222
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2223
    execInfo.ts = ts;
×
2224
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2225
    return 0;
×
2226
  }
2227

2228
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
1,564✔
2229
  if (code) {
1,564!
2230
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2231
  }
2232

2233
  if (!allReady) {
1,564✔
2234
    taosArrayDestroy(pNodeSnapshot);
40✔
2235
    atomic_store_32(&mndNodeCheckSentinel, 0);
40✔
2236
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
40!
2237
    return 0;
40✔
2238
  }
2239

2240
  streamMutexLock(&execInfo.lock);
1,524✔
2241

2242
  code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
1,524✔
2243
  if (code) {
1,524!
2244
    goto _end;
×
2245
  }
2246

2247
  SVgroupChangeInfo changeInfo = {0};
1,524✔
2248
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
1,524✔
2249
  if (code) {
1,524!
2250
    goto _end;
×
2251
  }
2252

2253
  {
2254
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
1,524!
2255
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2256
      updateAllVgroups = true;
×
2257
      execInfo.switchFromFollower = false;  // reset the flag
×
2258
      addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
×
2259
    }
2260
  }
2261

2262
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
1,524!
2263
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2264
    killAllCheckpointTrans(pMnode, &changeInfo);
11✔
2265
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
11✔
2266

2267
    // keep the new vnode snapshot if success
2268
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
11!
2269
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
11✔
2270
      if (code) {
11!
2271
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
2272
        goto _end;
×
2273
      }
2274

2275
      execInfo.ts = ts;
11✔
2276
      mDebug("create trans successfully, update cached node list, numOfNodes:%d",
11✔
2277
             (int)taosArrayGetSize(execInfo.pNodeList));
2278
    } else {
2279
      mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
×
2280
    }
2281
  } else {
2282
    mDebug("no update found in nodeList");
1,513✔
2283
  }
2284

2285
  mndDestroyVgroupChangeInfo(&changeInfo);
1,524✔
2286

2287
_end:
1,524✔
2288
  streamMutexUnlock(&execInfo.lock);
1,524✔
2289
  taosArrayDestroy(pNodeSnapshot);
1,524✔
2290

2291
  mDebug("end to do stream task node change checking");
1,524✔
2292
  atomic_store_32(&mndNodeCheckSentinel, 0);
1,524✔
2293
  return 0;
1,524✔
2294
}
2295

2296
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
2,907✔
2297
  SMnode *pMnode = pReq->info.node;
2,907✔
2298
  SSdb   *pSdb = pMnode->pSdb;
2,907✔
2299
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
2,907✔
2300
    return 0;
1,343✔
2301
  }
2302

2303
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
1,564✔
2304
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
1,564✔
2305
  if (pMsg == NULL) {
1,564!
2306
    return terrno;
×
2307
  }
2308

2309
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
1,564✔
2310
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1,564✔
2311
}
2312

2313
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,974✔
2314
  SStreamTaskIter *pIter = NULL;
1,974✔
2315
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,974✔
2316
  if (code) {
1,974!
2317
    mError("failed to create task iter for stream:%s", pStream->name);
×
2318
    return;
×
2319
  }
2320

2321
  while (streamTaskIterNextTask(pIter)) {
11,800✔
2322
    SStreamTask *pTask = NULL;
9,826✔
2323
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,826✔
2324
    if (code) {
9,826!
2325
      break;
×
2326
    }
2327

2328
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
9,826✔
2329
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
9,826✔
2330
    if (p == NULL) {
9,826✔
2331
      STaskStatusEntry entry = {0};
9,032✔
2332
      streamTaskStatusInit(&entry, pTask);
9,032✔
2333

2334
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
9,032✔
2335
      if (code == 0) {
9,032!
2336
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
9,032✔
2337
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
9,032✔
2338
        if (px) {
9,032!
2339
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
9,032!
2340
        } else {
2341
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2342
        }
2343
      } else {
2344
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2345
      }
2346

2347
      // add the new vgroups if not added yet
2348
      bool exist = false;
9,032✔
2349
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
46,534✔
2350
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
44,692✔
2351
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
44,692!
2352
          exist = true;
7,190✔
2353
          break;
7,190✔
2354
        }
2355
      }
2356

2357
      if (!exist) {
9,032✔
2358
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
1,842✔
2359
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
1,842✔
2360

2361
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
1,842✔
2362
        if (px) {
1,842!
2363
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
1,842!
2364
        } else {
2365
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2366
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2367
        }
2368
      }
2369
    }
2370
  }
2371

2372
  destroyStreamTaskIter(pIter);
1,974✔
2373
}
2374

2375
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
4,258✔
2376
  int32_t num = taosArrayGetSize(pList);
4,258✔
2377
  for (int32_t i = 0; i < num; ++i) {
15,655✔
2378
    int32_t *pId = taosArrayGet(pList, i);
11,410✔
2379
    if (pId == NULL) {
11,410!
2380
      continue;
×
2381
    }
2382

2383
    if (taskId == *pId) {
11,410✔
2384
      return;
13✔
2385
    }
2386
  }
2387

2388
  int32_t numOfTasks = taosArrayGetSize(pList);
4,245✔
2389
  void   *p = taosArrayPush(pList, &taskId);
4,245✔
2390
  if (p) {
4,245!
2391
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
4,245✔
2392
  } else {
2393
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2394
           uid, numOfTasks);
2395
  }
2396
}
2397

2398
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
4,258✔
2399
  SMnode                  *pMnode = pReq->info.node;
4,258✔
2400
  SStreamTaskCheckpointReq req = {0};
4,258✔
2401

2402
  SDecoder decoder = {0};
4,258✔
2403
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,258✔
2404

2405
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
4,258!
2406
    tDecoderClear(&decoder);
×
2407
    mError("invalid task checkpoint req msg received");
×
2408
    return TSDB_CODE_INVALID_MSG;
×
2409
  }
2410
  tDecoderClear(&decoder);
4,258✔
2411

2412
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
4,258✔
2413

2414
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2415
  streamMutexLock(&execInfo.lock);
4,258✔
2416

2417
  SStreamObj *pStream = NULL;
4,258✔
2418
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,258✔
2419
  if (pStream == NULL || code != 0) {
4,258!
2420
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2421
          req.streamId);
2422

2423
    // not in meta-store yet, try to acquire the task in exec buffer
2424
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2425
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2426
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2427
    if (p == NULL) {
×
2428
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2429
      streamMutexUnlock(&execInfo.lock);
×
2430
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2431
    } else {
2432
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2433
             req.streamId, req.taskId);
2434
    }
2435
  }
2436

2437
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,258!
2438

2439
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
4,258✔
2440
  if (pReqTaskList == NULL) {
4,258✔
2441
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
746✔
2442
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
746✔
2443
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
746✔
2444
    if (code) {
746!
2445
      mError("failed to put into transfer state stream map, code: out of memory");
×
2446
    }
2447
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
746✔
2448
  } else {
2449
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
3,512✔
2450
  }
2451

2452
  int32_t total = taosArrayGetSize(*pReqTaskList);
4,258✔
2453
  if (total == numOfTasks) {  // all tasks have sent the reqs
4,258✔
2454
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
717✔
2455
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
717!
2456

2457
    if (pStream != NULL) {  // TODO:handle error
717!
2458
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
717✔
2459
      if (code) {
717!
2460
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
717!
2461
      }
2462
    } else {
2463
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2464
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2465
      // sleep(500ms)
2466
    }
2467

2468
    // remove this entry
2469
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
717✔
2470

2471
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
717✔
2472
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
717✔
2473
  }
2474

2475
  if (pStream != NULL) {
4,258!
2476
    mndReleaseStream(pMnode, pStream);
4,258✔
2477
  }
2478

2479
  streamMutexUnlock(&execInfo.lock);
4,258✔
2480

2481
  {
2482
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
4,258✔
2483
    rsp.pCont = rpcMallocCont(rsp.contLen);
4,258✔
2484
    if (rsp.pCont == NULL) {
4,258!
2485
      return terrno;
×
2486
    }
2487

2488
    SMsgHead *pHead = rsp.pCont;
4,258✔
2489
    pHead->vgId = htonl(req.nodeId);
4,258✔
2490

2491
    tmsgSendRsp(&rsp);
4,258✔
2492
    pReq->info.handle = NULL;  // disable auto rsp
4,258✔
2493
  }
2494

2495
  return 0;
4,258✔
2496
}
2497

2498
// valid the info according to the HbMsg
2499
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
6,237✔
2500
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
6,237✔
2501
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
6,237✔
2502
  if (pTaskEntry == NULL) {
6,237✔
2503
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
17!
2504
    return false;
17✔
2505
  }
2506

2507
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
6,220!
2508
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2509
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2510
    return false;
×
2511
  }
2512

2513
  // now the task in checkpoint procedure
2514
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
6,220!
2515
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2516
           " discard",
2517
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2518
    return false;
×
2519
  }
2520

2521
  if (reportChkptId >= pReport->checkpointId) {
6,220!
2522
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2523
           " discard",
2524
           pReport->taskId, pReport->checkpointId, reportChkptId);
2525
    return false;
×
2526
  }
2527

2528
  return true;
6,220✔
2529
}
2530

2531
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) {
6,237✔
2532
  bool valid = validateChkptReport(pReport, reportChkptId);
6,237✔
2533
  if (!valid) {
6,237✔
2534
    return;
17✔
2535
  }
2536

2537
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
20,800✔
2538
    STaskChkptInfo *p = taosArrayGet(pList, i);
14,580✔
2539
    if (p == NULL) {
14,580!
2540
      continue;
×
2541
    }
2542

2543
    if (p->taskId == pReport->taskId) {
14,580!
2544
      if (p->checkpointId > pReport->checkpointId) {
×
2545
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2546
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2547
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2548
        mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2549
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2550

2551
        // update the checkpoint report info
2552
        p->checkpointId = pReport->checkpointId;
×
2553
        p->ts = pReport->checkpointTs;
×
2554
        p->version = pReport->checkpointVer;
×
2555
        p->transId = pReport->transId;
×
2556
        p->dropHTask = pReport->dropHTask;
×
2557
      } else {
2558
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2559
      }
2560
      return;
×
2561
    }
2562
  }
2563

2564
  STaskChkptInfo info = {
6,220✔
2565
      .streamId = pReport->streamId,
6,220✔
2566
      .taskId = pReport->taskId,
6,220✔
2567
      .transId = pReport->transId,
6,220✔
2568
      .dropHTask = pReport->dropHTask,
6,220✔
2569
      .version = pReport->checkpointVer,
6,220✔
2570
      .ts = pReport->checkpointTs,
6,220✔
2571
      .checkpointId = pReport->checkpointId,
6,220✔
2572
      .nodeId = pReport->nodeId,
6,220✔
2573
  };
2574

2575
  void *p = taosArrayPush(pList, &info);
6,220✔
2576
  if (p == NULL) {
6,220!
2577
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2578
  } else {
2579
    int32_t size = taosArrayGetSize(pList);
6,220✔
2580
    mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size);
6,220✔
2581
  }
2582
}
2583

2584
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
6,237✔
2585
  SMnode           *pMnode = pReq->info.node;
6,237✔
2586
  SCheckpointReport req = {0};
6,237✔
2587

2588
  SDecoder decoder = {0};
6,237✔
2589
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
6,237✔
2590

2591
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
6,237!
2592
    tDecoderClear(&decoder);
×
2593
    mError("invalid task checkpoint-report msg received");
×
2594
    return TSDB_CODE_INVALID_MSG;
×
2595
  }
2596
  tDecoderClear(&decoder);
6,237✔
2597

2598
  streamMutexLock(&execInfo.lock);
6,237✔
2599
  mndInitStreamExecInfo(pMnode, &execInfo);
6,237✔
2600
  streamMutexUnlock(&execInfo.lock);
6,237✔
2601

2602
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
6,237✔
2603
         " checkpointVer:%" PRId64 " transId:%d",
2604
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2605

2606
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2607
  streamMutexLock(&execInfo.lock);
6,237✔
2608

2609
  SStreamObj *pStream = NULL;
6,237✔
2610
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
6,237✔
2611
  if (pStream == NULL || code != 0) {
6,237!
2612
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2613

2614
    // not in meta-store yet, try to acquire the task in exec buffer
2615
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2616
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2617
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2618
    if (p == NULL) {
×
2619
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2620
      streamMutexUnlock(&execInfo.lock);
×
2621
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2622
    } else {
2623
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2624
             req.streamId, req.taskId);
2625
    }
2626
  }
2627

2628
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
6,237!
2629

2630
  SChkptReportInfo *pInfo =
2631
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
6,237✔
2632
  if (pInfo == NULL) {
6,237✔
2633
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
712✔
2634
    if (info.pTaskList != NULL) {
712!
2635
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
712✔
2636
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
712✔
2637
      if (code) {
712!
2638
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2639
      }
2640

2641
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
712✔
2642
    }
2643
  } else {
2644
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
5,525✔
2645
  }
2646

2647
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
6,237✔
2648
  if (total == numOfTasks) {  // all tasks has send the reqs
6,237✔
2649
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
1,332!
2650
          " will be issued soon",
2651
          req.streamId, pStream->name, total, req.checkpointId);
2652
  }
2653

2654
  if (pStream != NULL) {
6,237!
2655
    mndReleaseStream(pMnode, pStream);
6,237✔
2656
  }
2657

2658
  streamMutexUnlock(&execInfo.lock);
6,237✔
2659

2660
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
6,237✔
2661
  return code;
6,237✔
2662
}
2663

2664
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
189✔
2665
  int32_t num = 0;
189✔
2666
  int64_t chkId = INT64_MAX;
189✔
2667
  *pExistedTasks = 0;
189✔
2668
  *pAllSame = true;
189✔
2669

2670
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
6,506✔
2671
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
6,317✔
2672
    if (p == NULL) {
6,317!
2673
      continue;
×
2674
    }
2675

2676
    if (p->streamId != streamId) {
6,317✔
2677
      continue;
5,060✔
2678
    }
2679

2680
    num += 1;
1,257✔
2681
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
1,257✔
2682
    if (chkId > pe->checkpointInfo.latestId) {
1,257✔
2683
      if (chkId != INT64_MAX) {
197✔
2684
        *pAllSame = false;
8✔
2685
      }
2686
      chkId = pe->checkpointInfo.latestId;
197✔
2687
    }
2688
  }
2689

2690
  *pExistedTasks = num;
189✔
2691
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
189!
2692
    return -1;
×
2693
  }
2694

2695
  return chkId;
189✔
2696
}
2697

2698
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
6,237✔
2699
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
6,237✔
2700
  rsp.pCont = rpcMallocCont(rsp.contLen);
6,237✔
2701
  if (rsp.pCont != NULL) {
6,237!
2702
    SMsgHead *pHead = rsp.pCont;
6,237✔
2703
    pHead->vgId = htonl(vgId);
6,237✔
2704

2705
    tmsgSendRsp(&rsp);
6,237✔
2706
    pInfo->handle = NULL;  // disable auto rsp
6,237✔
2707
  }
2708
}
6,237✔
2709

2710
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
13,122✔
2711
  SMnode *pMnode = pMsg->info.node;
13,122✔
2712
  int64_t now = taosGetTimestampMs();
13,122✔
2713
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
13,122✔
2714
  if (pStreamList == NULL) {
13,122!
2715
    return terrno;
×
2716
  }
2717

2718
  mDebug("start to process consensus-checkpointId in tmr");
13,122✔
2719

2720
  bool    allReady = true;
13,122✔
2721
  SArray *pNodeSnapshot = NULL;
13,122✔
2722

2723
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
13,122✔
2724
  taosArrayDestroy(pNodeSnapshot);
13,122✔
2725
  if (code) {
13,122✔
2726
    mError("failed to get the vgroup snapshot, ignore it and continue");
131!
2727
  }
2728

2729
  if (!allReady) {
13,122✔
2730
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1,272!
2731
    taosArrayDestroy(pStreamList);
1,272✔
2732
    return 0;
1,272✔
2733
  }
2734

2735
  streamMutexLock(&execInfo.lock);
11,850✔
2736

2737
  void *pIter = NULL;
11,850✔
2738
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
11,889✔
2739
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
39✔
2740

2741
    int64_t streamId = -1;
39✔
2742
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
39✔
2743
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
39✔
2744
    if (pList == NULL) {
39!
2745
      continue;
×
2746
    }
2747

2748
    SStreamObj *pStream = NULL;
39✔
2749
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
39✔
2750
    if (pStream == NULL || code != 0) {  // stream has been dropped already
39!
2751
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2752
      taosArrayDestroy(pList);
×
2753
      continue;
×
2754
    }
2755

2756
    for (int32_t j = 0; j < num; ++j) {
228✔
2757
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
189✔
2758
      if (pe == NULL) {
189!
2759
        continue;
×
2760
      }
2761

2762
      streamId = pe->req.streamId;
189✔
2763

2764
      int32_t existed = 0;
189✔
2765
      bool    allSame = true;
189✔
2766
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
189✔
2767
      if (chkId == -1) {
189!
2768
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2769
               pInfo->numOfTasks, pe->req.taskId);
2770
        break;
×
2771
      }
2772

2773
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
189✔
2774
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
181✔
2775
               pe->req.startTs, (now - pe->ts) / 1000.0);
2776
        if (chkId > pe->req.checkpointId) {
181!
2777
          streamMutexUnlock(&execInfo.lock);
×
2778
          taosArrayDestroy(pStreamList);
×
2779
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2780
                 pe->req.checkpointId, chkId);
2781
          return TSDB_CODE_FAILED;
×
2782
        }
2783
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
181✔
2784
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
181!
2785
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2786
        }
2787

2788
        void *p = taosArrayPush(pList, &pe->req.taskId);
181✔
2789
        if (p == NULL) {
181!
2790
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2791
        }
2792
        streamId = pe->req.streamId;
181✔
2793
      } else {
2794
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
8!
2795
               pe->req.startTs, (now - pe->ts) / 1000.0);
2796
      }
2797
    }
2798

2799
    mndReleaseStream(pMnode, pStream);
39✔
2800

2801
    if (taosArrayGetSize(pList) > 0) {
39✔
2802
      for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
218✔
2803
        int32_t *taskId = taosArrayGet(pList, i);
181✔
2804
        if (taskId == NULL) {
181!
2805
          continue;
×
2806
        }
2807

2808
        for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
181!
2809
          SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
181✔
2810
          if ((pe != NULL) && (pe->req.taskId == *taskId)) {
181!
2811
            taosArrayRemove(pInfo->pTaskList, k);
181✔
2812
            break;
181✔
2813
          }
2814
        }
2815
      }
2816
    }
2817

2818
    taosArrayDestroy(pList);
39✔
2819

2820
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
39✔
2821
      mndClearConsensusRspEntry(pInfo);
37✔
2822
      if (streamId == -1) {
37!
2823
        streamMutexUnlock(&execInfo.lock);
×
2824
        taosArrayDestroy(pStreamList);
×
2825
        mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
×
2826
        return TSDB_CODE_FAILED;
×
2827
      }
2828
      void *p = taosArrayPush(pStreamList, &streamId);
37✔
2829
      if (p == NULL) {
37!
2830
        mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
×
2831
      }
2832
    }
2833
  }
2834

2835
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
11,887✔
2836
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
37✔
2837
    if (pStreamId == NULL) {
37!
2838
      continue;
×
2839
    }
2840

2841
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
37✔
2842
  }
2843

2844
  streamMutexUnlock(&execInfo.lock);
11,850✔
2845

2846
  taosArrayDestroy(pStreamList);
11,850✔
2847
  mDebug("end to process consensus-checkpointId in tmr");
11,850✔
2848
  return code;
11,850✔
2849
}
2850

2851
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
228✔
2852
  int32_t code = mndProcessCreateStreamReq(pReq);
228✔
2853
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
228!
2854
    pReq->info.rsp = rpcMallocCont(1);
×
2855
    if (pReq->info.rsp == NULL) {
×
2856
      return terrno;
×
2857
    }
2858

2859
    pReq->info.rspLen = 1;
×
2860
    pReq->info.noResp = false;
×
2861
    pReq->code = code;
×
2862
  }
2863
  return code;
228✔
2864
}
2865

2866
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
197✔
2867
  int32_t code = mndProcessDropStreamReq(pReq);
197✔
2868
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
197!
2869
    pReq->info.rsp = rpcMallocCont(1);
16✔
2870
    if (pReq->info.rsp == NULL) {
16!
2871
      return terrno;
×
2872
    }
2873

2874
    pReq->info.rspLen = 1;
16✔
2875
    pReq->info.noResp = false;
16✔
2876
    pReq->code = code;
16✔
2877
  }
2878
  return code;
197✔
2879
}
2880

2881
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
57,492✔
2882
  if (pExecInfo->initTaskList || pMnode == NULL) {
57,492✔
2883
    return;
57,333✔
2884
  }
2885

2886
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
159✔
2887
  pExecInfo->initTaskList = true;
159✔
2888
}
2889

2890
void mndStreamResetInitTaskListLoadFlag() {
1,681✔
2891
  mInfo("reset task list buffer init flag for leader");
1,681!
2892
  execInfo.initTaskList = false;
1,681✔
2893
}
1,681✔
2894

2895
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
1,995✔
2896
  execInfo.switchFromFollower = false;
1,995✔
2897

2898
  if (execInfo.role == NODE_ROLE_UNINIT) {
1,995✔
2899
    execInfo.role = role;
1,813✔
2900
    if (role == NODE_ROLE_LEADER) {
1,813✔
2901
      mInfo("init mnode is set to leader");
1,626!
2902
    } else {
2903
      mInfo("init mnode is set to follower");
187!
2904
    }
2905
  } else {
2906
    if (role == NODE_ROLE_LEADER) {
182✔
2907
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
55!
2908
        execInfo.role = role;
55✔
2909
        execInfo.switchFromFollower = true;
55✔
2910
        mInfo("mnode switch to be leader from follower");
55!
2911
      } else {
2912
        mInfo("mnode remain to be leader, do nothing");
×
2913
      }
2914
    } else {  // follower's
2915
      if (execInfo.role == NODE_ROLE_LEADER) {
127✔
2916
        execInfo.role = role;
1✔
2917
        mInfo("mnode switch to be follower from leader");
1!
2918
      } else {
2919
        mInfo("mnode remain to be follower, do nothing");
126!
2920
      }
2921
    }
2922
  }
2923
}
1,995✔
2924

2925
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
159✔
2926
  SSdb       *pSdb = pMnode->pSdb;
159✔
2927
  SStreamObj *pStream = NULL;
159✔
2928
  void       *pIter = NULL;
159✔
2929

2930
  while (1) {
2931
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
424✔
2932
    if (pIter == NULL) {
424✔
2933
      break;
159✔
2934
    }
2935

2936
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
265✔
2937
    sdbRelease(pSdb, pStream);
265✔
2938
  }
2939
}
159✔
2940

2941
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
1,183✔
2942
  STrans *pTrans = NULL;
1,183✔
2943
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
1,183✔
2944
                               "update checkpoint-info", &pTrans);
2945
  if (pTrans == NULL || code) {
1,183!
2946
    sdbRelease(pMnode->pSdb, pStream);
×
2947
    return code;
×
2948
  }
2949

2950
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
1,183✔
2951
  if (code) {
1,183!
2952
    sdbRelease(pMnode->pSdb, pStream);
×
2953
    mndTransDrop(pTrans);
×
2954
    return code;
×
2955
  }
2956

2957
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
1,183✔
2958
  if (code) {
1,183!
2959
    sdbRelease(pMnode->pSdb, pStream);
×
2960
    mndTransDrop(pTrans);
×
2961
    return code;
×
2962
  }
2963

2964
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,183✔
2965
  if (code) {
1,183!
2966
    sdbRelease(pMnode->pSdb, pStream);
×
2967
    mndTransDrop(pTrans);
×
2968
    return code;
×
2969
  }
2970

2971
  code = mndTransPrepare(pMnode, pTrans);
1,183✔
2972
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,183!
2973
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
2974
    sdbRelease(pMnode->pSdb, pStream);
×
2975
    mndTransDrop(pTrans);
×
2976
    return code;
×
2977
  }
2978

2979
  sdbRelease(pMnode->pSdb, pStream);
1,183✔
2980
  mndTransDrop(pTrans);
1,183✔
2981

2982
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,183✔
2983
}
2984

2985
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
2✔
2986
  SMnode      *pMnode = pReq->info.node;
2✔
2987
  int32_t      code = 0;
2✔
2988
  SOrphanTask *pTask = NULL;
2✔
2989
  int32_t      i = 0;
2✔
2990
  STrans      *pTrans = NULL;
2✔
2991
  int32_t      numOfTasks = 0;
2✔
2992

2993
  SMStreamDropOrphanMsg msg = {0};
2✔
2994
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
2✔
2995
  if (code) {
2!
2996
    return code;
×
2997
  }
2998

2999
  numOfTasks = taosArrayGetSize(msg.pList);
2✔
3000
  if (numOfTasks == 0) {
2!
3001
    mDebug("no orphan tasks to drop, no need to create trans");
×
3002
    goto _err;
×
3003
  }
3004

3005
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
2!
3006

3007
  i = 0;
2✔
3008
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
2!
3009
    i += 1;
×
3010
  }
3011

3012
  if (pTask == NULL) {
2!
3013
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
3014
    goto _err;
×
3015
  }
3016

3017
  // check if it is conflict with other trans in both sourceDb and targetDb.
3018
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
2✔
3019
  if (code) {
2!
3020
    goto _err;
×
3021
  }
3022

3023
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
2✔
3024

3025
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
2✔
3026
  if (pTrans == NULL || code != 0) {
2!
3027
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3028
    goto _err;
×
3029
  }
3030

3031
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
2✔
3032
  if (code) {
2!
3033
    goto _err;
×
3034
  }
3035

3036
  // drop all tasks
3037
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
2!
3038
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3039
    goto _err;
×
3040
  }
3041

3042
  // drop stream
3043
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
2!
3044
    goto _err;
×
3045
  }
3046

3047
  code = mndTransPrepare(pMnode, pTrans);
2✔
3048
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
3049
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
3050
    goto _err;
×
3051
  }
3052

3053
_err:
2✔
3054
  tDestroyDropOrphanTaskMsg(&msg);
2✔
3055
  mndTransDrop(pTrans);
2✔
3056

3057
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
3058
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
2!
3059
  }
3060
  return code;
2✔
3061
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc