• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.94
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndScheduler.h"
21
#include "mndShow.h"
22
#include "mndStb.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "osMemory.h"
26
#include "parser.h"
27
#include "taoserror.h"
28
#include "tmisce.h"
29
#include "tname.h"
30

31
#define MND_STREAM_MAX_NUM 60
32

33
typedef struct {
34
  int8_t placeHolder;  // // to fix windows compile error, define place holder
35
} SMStreamNodeCheckMsg;
36

37
static int32_t  mndNodeCheckSentinel = 0;
38
SStreamExecInfo execInfo;
39

40
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
42
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
43
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
44
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
45

46
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
47
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
48

49
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
50
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
51
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
52
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
53
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
54
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
55
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
56
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
57
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
58
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
59
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
60
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
61
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
62
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
63
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
64
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
65
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
66
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
67

68
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
69
static void     removeExpiredNodeInfo(const SArray *pNodeSnapshot);
70
static int32_t  doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
71
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
72

73
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
74
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
75
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
77
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
78

79
int32_t mndInitStream(SMnode *pMnode) {
1,996✔
80
  SSdbTable table = {
1,996✔
81
      .sdbType = SDB_STREAM,
82
      .keyType = SDB_KEY_BINARY,
83
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
84
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
85
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
86
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
87
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
88
  };
89
  SSdbTable tableSeq = {
1,996✔
90
      .sdbType = SDB_STREAM_SEQ,
91
      .keyType = SDB_KEY_BINARY,
92
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
93
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
94
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
95
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
96
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
97
  };
98

99
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,996✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,996✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
1,996✔
102

103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
1,996✔
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
1,996✔
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
1,996✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
1,996✔
107
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
1,996✔
108
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
1,996✔
109
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
1,996✔
110
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
1,996✔
111
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
1,996✔
112

113
  // for msgs inside mnode
114
  // TODO change the name
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
1,996✔
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
1,996✔
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
1,996✔
118
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
1,996✔
119

120
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
1,996✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
1,996✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
1,996✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
1,996✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
1,996✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
1,996✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
1,996✔
127
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
1,996✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
1,996✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
1,996✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
1,996✔
131

132
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
1,996✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
1,996✔
134

135
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,996✔
136
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,996✔
137
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,996✔
138
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,996✔
139

140
  int32_t code = mndInitExecInfo();
1,996✔
141
  if (code) {
1,996!
142
    return code;
×
143
  }
144

145
  code = sdbSetTable(pMnode->pSdb, table);
1,996✔
146
  if (code) {
1,996!
147
    return code;
×
148
  }
149

150
  code = sdbSetTable(pMnode->pSdb, tableSeq);
1,996✔
151
  return code;
1,996✔
152
}
153

154
void mndCleanupStream(SMnode *pMnode) {
1,995✔
155
  taosArrayDestroy(execInfo.pTaskList);
1,995✔
156
  taosArrayDestroy(execInfo.pNodeList);
1,995✔
157
  taosArrayDestroy(execInfo.pKilledChkptTrans);
1,995✔
158
  taosHashCleanup(execInfo.pTaskMap);
1,995✔
159
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
1,995✔
160
  taosHashCleanup(execInfo.pTransferStateStreams);
1,995✔
161
  taosHashCleanup(execInfo.pChkptStreams);
1,995✔
162
  taosHashCleanup(execInfo.pStreamConsensus);
1,995✔
163
  (void) taosThreadMutexDestroy(&execInfo.lock);
1,995✔
164
  mDebug("mnd stream exec info cleanup");
1,995✔
165
}
1,995✔
166

167
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
6,257✔
168
  int32_t     code = 0;
6,257✔
169
  int32_t     lino = 0;
6,257✔
170
  SSdbRow *   pRow = NULL;
6,257✔
171
  SStreamObj *pStream = NULL;
6,257✔
172
  void *      buf = NULL;
6,257✔
173
  int8_t      sver = 0;
6,257✔
174
  int32_t     tlen;
175
  int32_t     dataPos = 0;
6,257✔
176

177
  code = sdbGetRawSoftVer(pRaw, &sver);
6,257✔
178
  TSDB_CHECK_CODE(code, lino, _over);
6,257!
179

180
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
6,257!
181
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
182
    goto _over;
×
183
  }
184

185
  pRow = sdbAllocRow(sizeof(SStreamObj));
6,257✔
186
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
6,257!
187

188
  pStream = sdbGetRowObj(pRow);
6,257✔
189
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
6,257!
190

191
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
6,257!
192

193
  buf = taosMemoryMalloc(tlen + 1);
6,257✔
194
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,257!
195

196
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,257!
197

198
  SDecoder decoder;
199
  tDecoderInit(&decoder, buf, tlen + 1);
6,257✔
200
  code = tDecodeSStreamObj(&decoder, pStream, sver);
6,257✔
201
  tDecoderClear(&decoder);
6,257✔
202

203
  if (code < 0) {
6,257!
204
    tFreeStreamObj(pStream);
×
205
  }
206

207
_over:
6,257✔
208
  taosMemoryFreeClear(buf);
6,257!
209

210
  if (code != TSDB_CODE_SUCCESS) {
6,257!
211
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
212
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
213
    taosMemoryFreeClear(pRow);
×
214

215
    terrno = code;
×
216
    return NULL;
×
217
  } else {
218
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
6,257✔
219
           pStream->checkpointId);
220

221
    terrno = 0;
6,257✔
222
    return pRow;
6,257✔
223
  }
224
}
225

226
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
1,691✔
227
  mTrace("stream:%s, perform insert action", pStream->name);
1,691✔
228
  return 0;
1,691✔
229
}
230

231
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
6,255✔
232
  mTrace("stream:%s, perform delete action", pStream->name);
6,255✔
233
  taosWLockLatch(&pStream->lock);
6,255✔
234
  tFreeStreamObj(pStream);
6,255✔
235
  taosWUnLockLatch(&pStream->lock);
6,255✔
236
  return 0;
6,255✔
237
}
238

239
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
3,211✔
240
  mTrace("stream:%s, perform update action", pOldStream->name);
3,211✔
241
  (void) atomic_exchange_32(&pOldStream->version, pNewStream->version);
3,211✔
242

243
  taosWLockLatch(&pOldStream->lock);
3,211✔
244

245
  pOldStream->status = pNewStream->status;
3,211✔
246
  pOldStream->updateTime = pNewStream->updateTime;
3,211✔
247
  pOldStream->checkpointId = pNewStream->checkpointId;
3,211✔
248
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
3,211✔
249

250
  taosWUnLockLatch(&pOldStream->lock);
3,211✔
251
  return 0;
3,211✔
252
}
253

254
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
6,729✔
255
  int32_t code = 0;
6,729✔
256
  SSdb *pSdb = pMnode->pSdb;
6,729✔
257
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
6,729✔
258
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
6,729!
259
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
2,729✔
260
  }
261
  return code;
6,729✔
262
}
263

264
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
15,299✔
265
  SSdb *pSdb = pMnode->pSdb;
15,299✔
266
  sdbRelease(pSdb, pStream);
15,299✔
267
}
15,299✔
268

269
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
270
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
271
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
272
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
273
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
274

275
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
1,601✔
276
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
1,601!
277
      pCreate->targetStbFullName[0] == 0) {
1,601!
278
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
279
  }
280
  return TSDB_CODE_SUCCESS;
1,601✔
281
}
282

283
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
1,597✔
284
  pWrapper->nCols = taosArrayGetSize(pFields);
1,597✔
285
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
1,597✔
286
  if (NULL == pWrapper->pSchema) {
1,597!
287
    return terrno;
×
288
  }
289

290
  int32_t index = 0;
1,597✔
291
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
61,698✔
292
    SField *pField = (SField *)taosArrayGet(pFields, i);
60,101✔
293
    if (pField == NULL) {
60,101!
294
      return terrno;
×
295
    }
296

297
    if (TSDB_DATA_TYPE_NULL == pField->type) {
60,101!
298
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
299
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
300
    } else {
301
      pWrapper->pSchema[index].type = pField->type;
60,101✔
302
      pWrapper->pSchema[index].bytes = pField->bytes;
60,101✔
303
    }
304
    pWrapper->pSchema[index].colId = index + 1;
60,101✔
305
    strcpy(pWrapper->pSchema[index].name, pField->name);
60,101✔
306
    pWrapper->pSchema[index].flags = pField->flags;
60,101✔
307
    index += 1;
60,101✔
308
  }
309

310
  return TSDB_CODE_SUCCESS;
1,597✔
311
}
312

313
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
1,597✔
314
  if (pWrapper->nCols < 2) {
1,597!
315
    return false;
×
316
  }
317
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
60,314✔
318
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
58,724✔
319
      return true;
7✔
320
    }
321
  }
322
  return false;
1,590✔
323
}
324

325
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
1,597✔
326
  SNode      *pAst = NULL;
1,597✔
327
  SQueryPlan *pPlan = NULL;
1,597✔
328
  int32_t     code = 0;
1,597✔
329

330
  mInfo("stream:%s to create", pCreate->name);
1,597!
331
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
1,597✔
332
  pObj->createTime = taosGetTimestampMs();
1,597✔
333
  pObj->updateTime = pObj->createTime;
1,597✔
334
  pObj->version = 1;
1,597✔
335

336
  if (pCreate->smaId > 0) {
1,597✔
337
    pObj->subTableWithoutMd5 = 1;
259✔
338
  }
339

340
  pObj->smaId = pCreate->smaId;
1,597✔
341
  pObj->indexForMultiAggBalance = -1;
1,597✔
342

343
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,597✔
344

345
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
1,597✔
346
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
1,597✔
347

348
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
1,597✔
349
  pObj->status = 0;
1,597✔
350

351
  pObj->conf.igExpired = pCreate->igExpired;
1,597✔
352
  pObj->conf.trigger = pCreate->triggerType;
1,597✔
353
  pObj->conf.triggerParam = pCreate->maxDelay;
1,597✔
354
  pObj->conf.watermark = pCreate->watermark;
1,597✔
355
  pObj->conf.fillHistory = pCreate->fillHistory;
1,597✔
356
  pObj->deleteMark = pCreate->deleteMark;
1,597✔
357
  pObj->igCheckUpdate = pCreate->igUpdate;
1,597✔
358

359
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
1,597✔
360
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
1,597✔
361
  if (pSourceDb == NULL) {
1,597!
362
    code = terrno;
×
363
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, tstrerror(code));
×
364
    goto FAIL;
×
365
  }
366

367
  pObj->sourceDbUid = pSourceDb->uid;
1,597✔
368
  mndReleaseDb(pMnode, pSourceDb);
1,597✔
369

370
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
1,597✔
371

372
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
1,597✔
373
  if (pTargetDb == NULL) {
1,597!
374
    code = terrno;
×
375
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, tstrerror(code));
×
376
    goto FAIL;
×
377
  }
378

379
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
1,597✔
380

381
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
1,597✔
382
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,456✔
383
  } else {
384
    pObj->targetStbUid = pCreate->targetStbUid;
141✔
385
  }
386
  pObj->targetDbUid = pTargetDb->uid;
1,597✔
387
  mndReleaseDb(pMnode, pTargetDb);
1,597✔
388

389
  pObj->sql = pCreate->sql;
1,597✔
390
  pObj->ast = pCreate->ast;
1,597✔
391

392
  pCreate->sql = NULL;
1,597✔
393
  pCreate->ast = NULL;
1,597✔
394

395
  // deserialize ast
396
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
1,597!
397
    goto FAIL;
×
398
  }
399

400
  // create output schema
401
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
1,597!
402
    goto FAIL;
×
403
  }
404

405
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
1,597✔
406
  if (numOfNULL > 0) {
1,597✔
407
    pObj->outputSchema.nCols += numOfNULL;
26✔
408
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
26✔
409
    if (!pFullSchema) {
26!
410
      code = terrno;
×
411
      goto FAIL;
×
412
    }
413

414
    int32_t nullIndex = 0;
26✔
415
    int32_t dataIndex = 0;
26✔
416
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
332✔
417
      if (nullIndex >= numOfNULL) {
306!
418
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
419
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
420
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
421
        strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
×
422
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
423
        dataIndex++;
×
424
      } else {
425
        SColLocation *pos = NULL;
306✔
426
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
306!
427
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
306✔
428
        }
429

430
        if (pos == NULL) {
306!
431
          mError("invalid null column index, %d", nullIndex);
×
432
          continue;
×
433
        }
434

435
        if (i < pos->slotId) {
306✔
436
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
79✔
437
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
79✔
438
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
79✔
439
          strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
79✔
440
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
79✔
441
          dataIndex++;
79✔
442
        } else {
443
          pFullSchema[i].bytes = 0;
227✔
444
          pFullSchema[i].colId = pos->colId;
227✔
445
          pFullSchema[i].flags = COL_SET_NULL;
227✔
446
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
227✔
447
          pFullSchema[i].type = pos->type;
227✔
448
          nullIndex++;
227✔
449
        }
450
      }
451
    }
452

453
    taosMemoryFree(pObj->outputSchema.pSchema);
26✔
454
    pObj->outputSchema.pSchema = pFullSchema;
26✔
455
  }
456

457
  SPlanContext cxt = {
1,597✔
458
      .pAstRoot = pAst,
459
      .topicQuery = false,
460
      .streamQuery = true,
461
      .triggerType = (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY)? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
1,597✔
462
      .watermark = pObj->conf.watermark,
1,597✔
463
      .igExpired = pObj->conf.igExpired,
1,597✔
464
      .deleteMark = pObj->deleteMark,
1,597✔
465
      .igCheckUpdate = pObj->igCheckUpdate,
1,597✔
466
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
1,597✔
467
  };
468

469
  // using ast and param to build physical plan
470
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
1,597!
471
    goto FAIL;
×
472
  }
473

474
  // save physcial plan
475
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
1,597!
476
    goto FAIL;
×
477
  }
478

479
  pObj->tagSchema.nCols = pCreate->numOfTags;
1,597✔
480
  if (pCreate->numOfTags) {
1,597✔
481
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
276✔
482
    if (pObj->tagSchema.pSchema == NULL) {
276!
483
      code = terrno;
×
484
      goto FAIL;
×
485
    }
486
  }
487

488
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
489
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
3,179✔
490
    SField *pField = taosArrayGet(pCreate->pTags, i);
1,582✔
491
    if (pField == NULL) {
1,582!
492
      continue;
×
493
    }
494

495
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
1,582✔
496
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
1,582✔
497
    pObj->tagSchema.pSchema[i].flags = pField->flags;
1,582✔
498
    pObj->tagSchema.pSchema[i].type = pField->type;
1,582✔
499
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
1,582✔
500
  }
501

502
FAIL:
1,597✔
503
  if (pAst != NULL) nodesDestroyNode(pAst);
1,597!
504
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
1,597!
505
  return code;
1,597✔
506
}
507

508
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
13,395✔
509
  SEncoder encoder;
510
  tEncoderInit(&encoder, NULL, 0);
13,395✔
511

512
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
13,395!
513
    pTask->ver = SSTREAM_TASK_VER;
×
514
  }
515

516
  int32_t code = tEncodeStreamTask(&encoder, pTask);
13,395✔
517
  if (code == -1) {
13,395!
518
    tEncoderClear(&encoder);
×
519
    return TSDB_CODE_INVALID_MSG;
×
520
  }
521

522
  int32_t size = encoder.pos;
13,395✔
523
  int32_t tlen = sizeof(SMsgHead) + size;
13,395✔
524
  tEncoderClear(&encoder);
13,395✔
525

526
  void *buf = taosMemoryCalloc(1, tlen);
13,395✔
527
  if (buf == NULL) {
13,395!
528
    return terrno;
×
529
  }
530

531
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
13,395✔
532

533
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
13,395✔
534
  tEncoderInit(&encoder, abuf, size);
13,395✔
535
  code = tEncodeStreamTask(&encoder, pTask);
13,395✔
536
  tEncoderClear(&encoder);
13,395✔
537

538
  if (code != 0) {
13,395!
539
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
540
    taosMemoryFree(buf);
×
541
    return code;
×
542
  }
543

544
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
13,395✔
545
  if (code) {
13,395!
546
    taosMemoryFree(buf);
×
547
  }
548

549
  return code;
13,395✔
550
}
551

552
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
1,627✔
553
  SStreamTaskIter *pIter = NULL;
1,627✔
554
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,627✔
555
  if (code) {
1,627!
556
    mError("failed to create task iter for stream:%s", pStream->name);
×
557
    return code;
×
558
  }
559

560
  while (streamTaskIterNextTask(pIter)) {
10,200✔
561
    SStreamTask *pTask = NULL;
8,573✔
562
    code = streamTaskIterGetCurrent(pIter, &pTask);
8,573✔
563
    if (code) {
8,573!
564
      destroyStreamTaskIter(pIter);
×
565
      return code;
×
566
    }
567

568
    code = mndPersistTaskDeployReq(pTrans, pTask);
8,573✔
569
    if (code) {
8,573!
570
      destroyStreamTaskIter(pIter);
×
571
      return code;
×
572
    }
573
  }
574

575
  destroyStreamTaskIter(pIter);
1,627✔
576

577
  // persistent stream task for already stored ts data
578
  if (pStream->conf.fillHistory) {
1,627✔
579
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
825✔
580

581
    for (int32_t i = 0; i < level; i++) {
2,546✔
582
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
1,721✔
583

584
      int32_t numOfTasks = taosArrayGetSize(pLevel);
1,721✔
585
      for (int32_t j = 0; j < numOfTasks; j++) {
6,543✔
586
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
4,822✔
587
        code = mndPersistTaskDeployReq(pTrans, pTask);
4,822✔
588
        if (code) {
4,822!
589
          return code;
×
590
        }
591
      }
592
    }
593
  }
594

595
  return code;
1,627✔
596
}
597

598
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
1,627✔
599
  int32_t code = 0;
1,627✔
600
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
1,627!
601
    return code;
×
602
  }
603

604
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,627✔
605
}
606

607
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
1,456✔
608
  SStbObj *pStb = NULL;
1,456✔
609
  SDbObj  *pDb = NULL;
1,456✔
610
  int32_t  code = 0;
1,456✔
611
  int32_t  lino = 0;
1,456✔
612

613
  SMCreateStbReq createReq = {0};
1,456✔
614
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,456✔
615
  createReq.numOfColumns = pStream->outputSchema.nCols;
1,456✔
616
  createReq.numOfTags = 1;  // group id
1,456✔
617
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
1,456✔
618
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
1,456!
619

620
  // build fields
621
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
59,940✔
622
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
58,484✔
623
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
58,484!
624

625
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
58,484✔
626
    pField->flags = pStream->outputSchema.pSchema[i].flags;
58,484✔
627
    pField->type = pStream->outputSchema.pSchema[i].type;
58,484✔
628
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
58,484✔
629
    pField->compress = createDefaultColCmprByType(pField->type);
58,484✔
630
  }
631

632
  if (pStream->tagSchema.nCols == 0) {
1,456✔
633
    createReq.numOfTags = 1;
1,180✔
634
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
1,180✔
635
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
1,180!
636

637
    // build tags
638
    SField *pField = taosArrayGet(createReq.pTags, 0);
1,180✔
639
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
1,180!
640

641
    strcpy(pField->name, "group_id");
1,180✔
642
    pField->type = TSDB_DATA_TYPE_UBIGINT;
1,180✔
643
    pField->flags = 0;
1,180✔
644
    pField->bytes = 8;
1,180✔
645
  } else {
646
    createReq.numOfTags = pStream->tagSchema.nCols;
276✔
647
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
276✔
648
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
276!
649

650
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
1,858✔
651
      SField *pField = taosArrayGet(createReq.pTags, i);
1,582✔
652
      if (pField == NULL) {
1,582!
653
        continue;
×
654
      }
655

656
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
1,582✔
657
      pField->flags = pStream->tagSchema.pSchema[i].flags;
1,582✔
658
      pField->type = pStream->tagSchema.pSchema[i].type;
1,582✔
659
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
1,582✔
660
    }
661
  }
662

663
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
1,456!
664
    goto _OVER;
×
665
  }
666

667
  pStb = mndAcquireStb(pMnode, createReq.name);
1,456✔
668
  if (pStb != NULL) {
1,456!
669
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
670
    goto _OVER;
×
671
  }
672

673
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
1,456✔
674
  if (pDb == NULL) {
1,456!
675
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
676
    goto _OVER;
×
677
  }
678

679
  int32_t numOfStbs = -1;
1,456✔
680
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
1,456!
681
    goto _OVER;
×
682
  }
683

684
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
1,456!
685
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
686
    goto _OVER;
×
687
  }
688

689
  SStbObj stbObj = {0};
1,456✔
690

691
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
1,456!
692
    goto _OVER;
×
693
  }
694

695
  stbObj.uid = pStream->targetStbUid;
1,456✔
696

697
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
1,456!
698
    mndFreeStb(&stbObj);
×
699
    goto _OVER;
×
700
  }
701

702
  tFreeSMCreateStbReq(&createReq);
1,456✔
703
  mndFreeStb(&stbObj);
1,456✔
704
  mndReleaseStb(pMnode, pStb);
1,456✔
705
  mndReleaseDb(pMnode, pDb);
1,456✔
706
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
1,456✔
707
  return code;
1,456✔
708

709
_OVER:
×
710
  tFreeSMCreateStbReq(&createReq);
×
711
  mndReleaseStb(pMnode, pStb);
×
712
  mndReleaseDb(pMnode, pDb);
×
713

714
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
715
         tstrerror(code));
716
  return code;
×
717
}
718

719
// 1. stream number check
720
// 2. target stable can not be target table of other existed streams.
721
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
1,597✔
722
  int32_t     numOfStream = 0;
1,597✔
723
  SStreamObj *pStream = NULL;
1,597✔
724
  void       *pIter = NULL;
1,597✔
725

726
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,372✔
727
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
1,776✔
728
      ++numOfStream;
1,327✔
729
    }
730

731
    sdbRelease(pMnode->pSdb, pStream);
1,776✔
732

733
    if (numOfStream > MND_STREAM_MAX_NUM) {
1,776!
734
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
735
             pStreamObj->name);
736
      sdbCancelFetch(pMnode->pSdb, pIter);
×
737
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
738
    }
739

740
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
1,776✔
741
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
742
             pStreamObj->name);
743
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
744
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
745
    }
746
  }
747

748
  return TSDB_CODE_SUCCESS;
1,596✔
749
}
750

751
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
1,601✔
752
  SMnode     *pMnode = pReq->info.node;
1,601✔
753
  SStreamObj *pStream = NULL;
1,601✔
754
  SStreamObj  streamObj = {0};
1,601✔
755
  char       *sql = NULL;
1,601✔
756
  int32_t     sqlLen = 0;
1,601✔
757
  const char *pMsg = "create stream tasks on dnodes";
1,601✔
758
  int32_t     code = TSDB_CODE_SUCCESS;
1,601✔
759
  int32_t     lino = 0;
1,601✔
760
  STrans     *pTrans = NULL;
1,601✔
761

762
  SCMCreateStreamReq createReq = {0};
1,601✔
763
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
1,601✔
764
  TSDB_CHECK_CODE(code, lino, _OVER);
1,601!
765

766
#ifdef WINDOWS
767
  code = TSDB_CODE_MND_INVALID_PLATFORM;
768
  goto _OVER;
769
#endif
770

771
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
1,601!
772
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
1,601!
773
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
774
    goto _OVER;
×
775
  }
776

777
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
1,601✔
778
  if (pStream != NULL && code == 0) {
1,601!
779
    if (createReq.igExists) {
2✔
780
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
781
      mndReleaseStream(pMnode, pStream);
1✔
782
      tFreeSCMCreateStreamReq(&createReq);
1✔
783
      return code;
1✔
784
    } else {
785
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
786
      goto _OVER;
1✔
787
    }
788
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
1,599!
789
    goto _OVER;
×
790
  }
791

792
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
1,599!
793
    goto _OVER;
×
794
  }
795

796
  if (createReq.sql != NULL) {
1,599!
797
    sql = taosStrdup(createReq.sql);
1,599✔
798
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
1,599!
799
  }
800

801
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
1,599✔
802
  if (pSourceDb == NULL) {
1,599!
803
    code = terrno;
×
804
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
805
          tstrerror(code));
806
    goto _OVER;
×
807
  }
808

809
  code = mndCheckForSnode(pMnode, pSourceDb);
1,599✔
810
  mndReleaseDb(pMnode, pSourceDb);
1,599✔
811
  if (code != 0) {
1,599✔
812
    goto _OVER;
2✔
813
  }
814

815
  // build stream obj from request
816
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
1,597!
817
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
818
    goto _OVER;
×
819
  }
820

821
  code = doStreamCheck(pMnode, &streamObj);
1,597✔
822
  TSDB_CHECK_CODE(code, lino, _OVER);
1,597✔
823

824
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
1,596✔
825
  if (pTrans == NULL || code) {
1,596!
826
    goto _OVER;
×
827
  }
828

829
  // create stb for stream
830
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
1,596✔
831
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
1,456!
832
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
833
      mndTransDrop(pTrans);
×
834
      goto _OVER;
×
835
    }
836
  } else {
837
    mDebug("stream:%s no need create stable", createReq.name);
140✔
838
  }
839

840
  // schedule stream task for stream obj
841
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
1,596✔
842
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,596!
843
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
844
    mndTransDrop(pTrans);
×
845
    goto _OVER;
×
846
  }
847

848
  // add stream to trans
849
  code = mndPersistStream(pTrans, &streamObj);
1,596✔
850
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,596!
851
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
852
    mndTransDrop(pTrans);
×
853
    goto _OVER;
×
854
  }
855

856
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
1,596!
857
    mndTransDrop(pTrans);
×
858
    goto _OVER;
×
859
  }
860

861
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
1,596!
862
    mndTransDrop(pTrans);
×
863
    goto _OVER;
×
864
  }
865

866
  // add into buffer firstly
867
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
868
  streamMutexLock(&execInfo.lock);
1,596✔
869
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
1,596✔
870
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
1,596✔
871
  streamMutexUnlock(&execInfo.lock);
1,596✔
872

873
  // execute creation
874
  code = mndTransPrepare(pMnode, pTrans);
1,596✔
875
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,596!
876
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
877
    mndTransDrop(pTrans);
×
878
    goto _OVER;
×
879
  }
880

881
  mndTransDrop(pTrans);
1,596✔
882

883
  SName dbname = {0};
1,596✔
884
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,596✔
885
  if (code) {
1,596!
886
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
887
    goto _OVER;
×
888
  }
889

890
  SName name = {0};
1,596✔
891
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
1,596✔
892
  if (code) {
1,596!
893
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
894
    goto _OVER;
×
895
  }
896

897
  // reuse this function for stream
898
  if (sql != NULL && sqlLen > 0) {
1,596!
899
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
900
  } else {
901
    char detail[1000] = {0};
1,596✔
902
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
1,596✔
903
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
1,596✔
904
  }
905

906
_OVER:
1,600✔
907
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,600!
908
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
4!
909
  } else {
910
    mDebug("stream:%s create stream completed", createReq.name);
1,596✔
911
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,596✔
912
  }
913

914
  mndReleaseStream(pMnode, pStream);
1,600✔
915
  tFreeSCMCreateStreamReq(&createReq);
1,600✔
916
  tFreeStreamObj(&streamObj);
1,600✔
917

918
  if (sql != NULL) {
1,600✔
919
    taosMemoryFreeClear(sql);
1,599!
920
  }
921

922
  return code;
1,600✔
923
}
924

925
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
926
  SMnode          *pMnode = pReq->info.node;
×
927
  SStreamObj      *pStream = NULL;
×
928
  int32_t          code = 0;
×
929
  SMPauseStreamReq pauseReq = {0};
×
930

931
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
932
    return TSDB_CODE_INVALID_MSG;
×
933
  }
934

935
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
936
  if (pStream == NULL || code != 0) {
×
937
    if (pauseReq.igNotExists) {
×
938
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
939
      return 0;
×
940
    } else {
941
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
942
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
943
    }
944
  }
945

946
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
947
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
948
    sdbRelease(pMnode->pSdb, pStream);
×
949
    return code;
×
950
  }
951

952
  // check if it is conflict with other trans in both sourceDb and targetDb.
953
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
954
  if (code) {
×
955
    sdbRelease(pMnode->pSdb, pStream);
×
956
    return code;
×
957
  }
958

959
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
960
  if (updated) {
×
961
    mError("tasks are not ready for restart, node update detected");
×
962
    sdbRelease(pMnode->pSdb, pStream);
×
963
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
964
  }
965

966
  STrans *pTrans = NULL;
×
967
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream", &pTrans);
×
968
  if (pTrans == NULL || code) {
×
969
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
970
    sdbRelease(pMnode->pSdb, pStream);
×
971
    return code;
×
972
  }
973

974
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
975
  if (code) {
×
976
    sdbRelease(pMnode->pSdb, pStream);
×
977
    mndTransDrop(pTrans);
×
978
    return code;
×
979
  }
980

981
  // if nodeUpdate happened, not send pause trans
982
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
983
  if (code) {
×
984
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
985
    sdbRelease(pMnode->pSdb, pStream);
×
986
    mndTransDrop(pTrans);
×
987
    return code;
×
988
  }
989

990
  code = mndTransPrepare(pMnode, pTrans);
×
991
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
992
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
993
    sdbRelease(pMnode->pSdb, pStream);
×
994
    mndTransDrop(pTrans);
×
995
    return code;
×
996
  }
997

998
  sdbRelease(pMnode->pSdb, pStream);
×
999
  mndTransDrop(pTrans);
×
1000

1001
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1002
}
1003

1004
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
1,409✔
1005
  SStreamObj *pStream = NULL;
1,409✔
1006
  void       *pIter = NULL;
1,409✔
1007
  SSdb       *pSdb = pMnode->pSdb;
1,409✔
1008
  int64_t     maxChkptId = 0;
1,409✔
1009

1010
  while (1) {
1011
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4,914✔
1012
    if (pIter == NULL) break;
4,914✔
1013

1014
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
3,505✔
1015
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
3,505✔
1016
           pStream->checkpointId);
1017
    sdbRelease(pSdb, pStream);
3,505✔
1018
  }
1019

1020
  {  // check the max checkpoint id from all vnodes.
1021
    int64_t maxCheckpointId = -1;
1,409✔
1022
    if (lock) {
1,409✔
1023
      streamMutexLock(&execInfo.lock);
611✔
1024
    }
1025

1026
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
17,111✔
1027
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
15,702✔
1028
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
15,702✔
1029
      if (p == NULL || pEntry == NULL) {
15,702!
1030
        continue;
×
1031
      }
1032

1033
      if (pEntry->checkpointInfo.failed) {
15,702!
1034
        continue;
×
1035
      }
1036

1037
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
15,702✔
1038
        maxCheckpointId = pEntry->checkpointInfo.latestId;
2,054✔
1039
      }
1040
    }
1041

1042
    if (lock) {
1,409✔
1043
      streamMutexUnlock(&execInfo.lock);
611✔
1044
    }
1045

1046
    if (maxCheckpointId > maxChkptId) {
1,409!
1047
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1048
             maxCheckpointId);
1049
      maxChkptId = maxCheckpointId;
×
1050
    }
1051
  }
1052

1053
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
1,409✔
1054
  return maxChkptId + 1;
1,409✔
1055
}
1056

1057
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
1,411✔
1058
                                               int8_t mndTrigger, bool lock) {
1059
  int32_t code = TSDB_CODE_SUCCESS;
1,411✔
1060
  bool    conflict = false;
1,411✔
1061
  int64_t ts = taosGetTimestampMs();
1,411✔
1062
  STrans *pTrans = NULL;
1,411✔
1063

1064
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
1,411!
1065
    return code;
×
1066
  }
1067

1068
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
1,411✔
1069
  if (code) {
1,411!
UNCOV
1070
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1071
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
UNCOV
1072
    goto _ERR;
×
1073
  }
1074

1075
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
1,411✔
1076
                       "gen checkpoint for stream", &pTrans);
1077
  if (code) {
1,411!
1078
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1079
           tstrerror(code));
1080
    goto _ERR;
×
1081
  }
1082

1083
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
1,411✔
1084
  if (code) {
1,411!
1085
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
1086
    goto _ERR;
×
1087
  }
1088

1089
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
1,411✔
1090

1091
  taosWLockLatch(&pStream->lock);
1,411✔
1092
  pStream->currentTick = 1;
1,411✔
1093

1094
  // 1. redo action: broadcast checkpoint source msg for all source vg
1095
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
1,411✔
1096
  for (int32_t i = 0; i < totalLevel; i++) {
4,283✔
1097
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
2,872✔
1098
    SStreamTask *p = taosArrayGetP(pLevel, 0);
2,872✔
1099

1100
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
2,872✔
1101
      int32_t sz = taosArrayGetSize(pLevel);
1,411✔
1102
      for (int32_t j = 0; j < sz; j++) {
4,770✔
1103
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
3,359✔
1104
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
3,359✔
1105

1106
        if (code != TSDB_CODE_SUCCESS) {
3,359!
1107
          taosWUnLockLatch(&pStream->lock);
×
1108
          goto _ERR;
×
1109
        }
1110
      }
1111
    }
1112
  }
1113

1114
  // 2. reset tick
1115
  pStream->checkpointId = checkpointId;
1,411✔
1116
  pStream->checkpointFreq = taosGetTimestampMs();
1,411✔
1117
  pStream->currentTick = 0;
1,411✔
1118

1119
  // 3. commit log: stream checkpoint info
1120
  pStream->version = pStream->version + 1;
1,411✔
1121
  taosWUnLockLatch(&pStream->lock);
1,411✔
1122

1123
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
1,411!
1124
    goto _ERR;
×
1125
  }
1126

1127
  code = mndTransPrepare(pMnode, pTrans);
1,411✔
1128
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,411!
1129
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1130
  } else {
1131
    code = TSDB_CODE_ACTION_IN_PROGRESS;
1,411✔
1132
  }
1133

1134
_ERR:
1,411✔
1135
  mndTransDrop(pTrans);
1,411✔
1136
  return code;
1,411✔
1137
}
1138

1139
int32_t extractStreamNodeList(SMnode *pMnode) {
3,337✔
1140
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
3,337✔
1141
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
753✔
1142
    if (code) {
753!
1143
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
1144
      return code;
×
1145
    }
1146
  }
1147

1148
  return taosArrayGetSize(execInfo.pNodeList);
3,337✔
1149
}
1150

1151
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,681✔
1152
  bool ready = true;
1,681✔
1153
  if (mndStreamNodeIsUpdated(pMnode)) {
1,681✔
1154
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
58✔
1155
  }
1156

1157
  streamMutexLock(&execInfo.lock);
1,623✔
1158
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,623✔
1159
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
753✔
1160
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
753!
1161
      streamMutexUnlock(&execInfo.lock);
×
1162
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
1163
      return TSDB_CODE_FAILED;
×
1164
    }
1165
  }
1166

1167
  SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
1,623✔
1168

1169
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
9,499✔
1170
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
8,002✔
1171
    if (p == NULL) {
8,002!
1172
      continue;
×
1173
    }
1174

1175
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
8,002✔
1176
    if (pEntry == NULL) {
8,002!
1177
      continue;
×
1178
    }
1179

1180
    if (pEntry->status == TASK_STATUS__STOP) {
8,002✔
1181
      for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
65!
1182
        STaskId *pId = taosArrayGet(pInvalidList, j);
×
1183
        if (pId == NULL) {
×
1184
          continue;
×
1185
        }
1186

1187
        if (pEntry->id.streamId == pId->streamId) {
×
1188
          void *px = taosArrayPush(pInvalidList, &pEntry->id);
×
1189
          if (px == NULL) {
×
1190
            mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
1191
          }
1192
          break;
×
1193
        }
1194
      }
1195
    }
1196

1197
    if (pEntry->status != TASK_STATUS__READY) {
8,002✔
1198
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
113✔
1199
             (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1200
      ready = false;
113✔
1201
      break;
113✔
1202
    }
1203

1204
    if (pEntry->hTaskId != 0) {
7,889✔
1205
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
13✔
1206
             " exists, checkpoint not issued",
1207
             pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1208
             pEntry->hTaskId);
1209
      ready = false;
13✔
1210
      break;
13✔
1211
    }
1212
  }
1213

1214
  removeTasksInBuf(pInvalidList, &execInfo);
1,623✔
1215
  taosArrayDestroy(pInvalidList);
1,623✔
1216

1217
  streamMutexUnlock(&execInfo.lock);
1,623✔
1218
  return ready ? 0 : -1;
1,623✔
1219
}
1220

1221
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
1,419✔
1222
  int64_t ts = -1;
1,419✔
1223
  int32_t taskId = -1;
1,419✔
1224

1225
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
20,278✔
1226
    STaskId          *p = taosArrayGet(pTaskList, i);
18,859✔
1227
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
18,859✔
1228
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
18,859!
1229
      continue;
13,764✔
1230
    }
1231

1232
    if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
5,095!
1233
      ts = pEntry->startTime;
3,066✔
1234
      taskId = pEntry->id.taskId;
3,066✔
1235
    }
1236
  }
1237

1238
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
1,419✔
1239
  return ts;
1,419✔
1240
}
1241

1242
typedef struct {
1243
  int64_t streamId;
1244
  int64_t duration;
1245
} SCheckpointInterval;
1246

1247
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
652✔
1248
  const SCheckpointInterval *pInt1 = p1;
652✔
1249
  const SCheckpointInterval *pInt2 = p2;
652✔
1250
  if (pInt1->duration == pInt2->duration) {
652✔
1251
    return 0;
51✔
1252
  }
1253

1254
  return pInt1->duration > pInt2->duration ? -1 : 1;
601✔
1255
}
1256

1257
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,681✔
1258
  SMnode     *pMnode = pReq->info.node;
1,681✔
1259
  SSdb       *pSdb = pMnode->pSdb;
1,681✔
1260
  void       *pIter = NULL;
1,681✔
1261
  SStreamObj *pStream = NULL;
1,681✔
1262
  int32_t     code = 0;
1,681✔
1263
  int32_t     numOfCheckpointTrans = 0;
1,681✔
1264

1265
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,681✔
1266
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
184✔
1267
  }
1268

1269
  SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
1,497✔
1270
  if (pList == NULL) {
1,497!
1271
    return terrno;
×
1272
  }
1273

1274
  int64_t now = taosGetTimestampMs();
1,497✔
1275

1276
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
3,580✔
1277
    int64_t duration = now - pStream->checkpointFreq;
2,083✔
1278
    if (duration < tsStreamCheckpointInterval * 1000) {
2,083✔
1279
      sdbRelease(pSdb, pStream);
664✔
1280
      continue;
845✔
1281
    }
1282

1283
    streamMutexLock(&execInfo.lock);
1,419✔
1284
    int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
1,419✔
1285
    if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
1,419!
1286
      streamMutexUnlock(&execInfo.lock);
181✔
1287
      sdbRelease(pSdb, pStream);
181✔
1288
      continue;
181✔
1289
    }
1290
    streamMutexUnlock(&execInfo.lock);
1,238✔
1291

1292
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
1,238✔
1293
    void* p = taosArrayPush(pList, &in);
1,238✔
1294
    if (p) {
1,238!
1295
      int32_t currentSize = taosArrayGetSize(pList);
1,238✔
1296
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
1,238✔
1297
             "s), concurrently launch threshold:%d",
1298
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1299
             tsMaxConcurrentCheckpoint);
1300
    } else {
1301
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1302
    }
1303
    sdbRelease(pSdb, pStream);
1,238✔
1304
  }
1305

1306
  int32_t size = taosArrayGetSize(pList);
1,497✔
1307
  if (size == 0) {
1,497✔
1308
    taosArrayDestroy(pList);
886✔
1309
    return code;
886✔
1310
  }
1311

1312
  taosArraySort(pList, streamWaitComparFn);
611✔
1313
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
611✔
1314
  if (code) {
611!
1315
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1316
    taosArrayDestroy(pList);
×
1317
    return code;
×
1318
  }
1319

1320
  int32_t numOfQual = taosArrayGetSize(pList);
611✔
1321
  if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
611!
1322
    mDebug(
×
1323
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1324
        "checkpoint trans are not allowed, wait for 30s",
1325
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1326
    taosArrayDestroy(pList);
×
1327
    return code;
×
1328
  }
1329

1330
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
611✔
1331
  mDebug(
611✔
1332
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1333
      "concurrent trans threshold:%d",
1334
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1335

1336
  int32_t started = 0;
611✔
1337
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
611✔
1338

1339
  for (int32_t i = 0; i < numOfQual; ++i) {
615✔
1340
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
613✔
1341
    if (pCheckpointInfo == NULL) {
613!
1342
      continue;
×
1343
    }
1344

1345
    SStreamObj *p = NULL;
613✔
1346
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
613✔
1347
    if (p != NULL && code == 0) {
613!
1348
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
613✔
1349
      sdbRelease(pSdb, p);
613✔
1350

1351
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
613!
1352
        started += 1;
613✔
1353

1354
        if (started >= capacity) {
613✔
1355
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
609✔
1356
                 (started + numOfCheckpointTrans));
1357
          break;
609✔
1358
        }
1359
      } else {
1360
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1361
      }
1362
    }
1363
  }
1364

1365
  taosArrayDestroy(pList);
611✔
1366
  return code;
611✔
1367
}
1368

1369
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
1,401✔
1370
  SMnode *    pMnode = pReq->info.node;
1,401✔
1371
  SStreamObj *pStream = NULL;
1,401✔
1372
  int32_t     code = 0;
1,401✔
1373

1374
  SMDropStreamReq dropReq = {0};
1,401✔
1375
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
1,401!
1376
    mError("invalid drop stream msg recv, discarded");
×
1377
    code = TSDB_CODE_INVALID_MSG;
×
1378
    TAOS_RETURN(code);
×
1379
  }
1380

1381
  mDebug("recv drop stream:%s msg", dropReq.name);
1,401✔
1382

1383
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
1,401✔
1384
  if (pStream == NULL || code != 0) {
1,401!
1385
    if (dropReq.igNotExists) {
141✔
1386
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
131!
1387
      sdbRelease(pMnode->pSdb, pStream);
131✔
1388
      tFreeMDropStreamReq(&dropReq);
131✔
1389
      return 0;
131✔
1390
    } else {
1391
      mError("stream:%s not exist failed to drop it", dropReq.name);
10!
1392
      tFreeMDropStreamReq(&dropReq);
10✔
1393
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
10✔
1394
    }
1395
  }
1396

1397
  if (pStream->smaId != 0) {
1,260✔
1398
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
220!
1399

1400
    void *   pIter = NULL;
220✔
1401
    SSmaObj *pSma = NULL;
220✔
1402
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
220✔
1403
    while (pIter) {
364✔
1404
      if (pSma && pSma->uid == pStream->smaId) {
149!
1405
        sdbRelease(pMnode->pSdb, pSma);
5✔
1406
        sdbRelease(pMnode->pSdb, pStream);
5✔
1407

1408
        sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1409
        tFreeMDropStreamReq(&dropReq);
5✔
1410
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
5✔
1411

1412
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
5!
1413
               dropReq.name, pStream->uid, tstrerror(terrno));
1414
        TAOS_RETURN(code);
5✔
1415
      }
1416

1417
      if (pSma) {
144!
1418
        sdbRelease(pMnode->pSdb, pSma);
144✔
1419
      }
1420

1421
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
144✔
1422
    }
1423
  }
1424

1425
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
1,255!
1426
    sdbRelease(pMnode->pSdb, pStream);
×
1427
    tFreeMDropStreamReq(&dropReq);
×
1428
    return -1;
×
1429
  }
1430

1431
  // check if it is conflict with other trans in both sourceDb and targetDb.
1432
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
1,255✔
1433
  if (code) {
1,255✔
1434
    sdbRelease(pMnode->pSdb, pStream);
1✔
1435
    tFreeMDropStreamReq(&dropReq);
1✔
1436
    return code;
1✔
1437
  }
1438

1439
  STrans *pTrans = NULL;
1,254✔
1440
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
1,254✔
1441
  if (pTrans == NULL || code) {
1,254!
1442
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1443
    sdbRelease(pMnode->pSdb, pStream);
×
1444
    tFreeMDropStreamReq(&dropReq);
×
1445
    TAOS_RETURN(code);
×
1446
  }
1447

1448
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
1,254✔
1449
  if (code) {
1,254!
1450
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1451
    sdbRelease(pMnode->pSdb, pStream);
×
1452
    mndTransDrop(pTrans);
×
1453
    tFreeMDropStreamReq(&dropReq);
×
1454
    TAOS_RETURN(code);
×
1455
  }
1456

1457
  // drop all tasks
1458
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
1,254✔
1459
  if (code) {
1,254!
1460
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1461
    sdbRelease(pMnode->pSdb, pStream);
×
1462
    mndTransDrop(pTrans);
×
1463
    tFreeMDropStreamReq(&dropReq);
×
1464
    TAOS_RETURN(code);
×
1465
  }
1466

1467
  // drop stream
1468
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
1,254✔
1469
  if (code) {
1,254!
1470
    sdbRelease(pMnode->pSdb, pStream);
×
1471
    mndTransDrop(pTrans);
×
1472
    tFreeMDropStreamReq(&dropReq);
×
1473
    TAOS_RETURN(code);
×
1474
  }
1475

1476
  code = mndTransPrepare(pMnode, pTrans);
1,254✔
1477
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,254!
1478
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1479
    sdbRelease(pMnode->pSdb, pStream);
×
1480
    mndTransDrop(pTrans);
×
1481
    tFreeMDropStreamReq(&dropReq);
×
1482
    TAOS_RETURN(code);
×
1483
  }
1484

1485
  // kill the related checkpoint trans
1486
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
1,254✔
1487
  if (transId != 0) {
1,254!
1488
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
1489
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1490
  }
1491

1492
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
1,254✔
1493
         pStream->uid, transId);
1494

1495
  removeStreamTasksInBuf(pStream, &execInfo);
1,254✔
1496

1497
  SName name = {0};
1,254✔
1498
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,254✔
1499
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
1,254✔
1500

1501
  sdbRelease(pMnode->pSdb, pStream);
1,254✔
1502
  mndTransDrop(pTrans);
1,254✔
1503
  tFreeMDropStreamReq(&dropReq);
1,254✔
1504

1505
  if (code == 0) {
1,254✔
1506
    return TSDB_CODE_ACTION_IN_PROGRESS;
1,244✔
1507
  } else {
1508
    TAOS_RETURN(code);
10✔
1509
  }
1510
}
1511

1512
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
2,345✔
1513
  SSdb   *pSdb = pMnode->pSdb;
2,345✔
1514
  void   *pIter = NULL;
2,345✔
1515
  int32_t code = 0;
2,345✔
1516

1517
  while (1) {
578✔
1518
    SStreamObj *pStream = NULL;
2,923✔
1519
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,923✔
1520
    if (pIter == NULL) break;
2,923✔
1521

1522
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
579✔
1523
      if (pStream->sourceDbUid != pStream->targetDbUid) {
79✔
1524
        sdbRelease(pSdb, pStream);
1✔
1525
        sdbCancelFetch(pSdb, pIter);
1✔
1526
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1527
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1528
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1529
      } else {
1530
        // kill the related checkpoint trans
1531
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
78✔
1532
        if (transId != 0) {
78!
1533
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
1534
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1535
        }
1536

1537
        // drop the stream obj in execInfo
1538
        removeStreamTasksInBuf(pStream, &execInfo);
78✔
1539

1540
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
78✔
1541
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
78!
1542
          sdbRelease(pSdb, pStream);
×
1543
          sdbCancelFetch(pSdb, pIter);
×
1544
          return code;
×
1545
        }
1546
      }
1547
    }
1548

1549
    sdbRelease(pSdb, pStream);
578✔
1550
  }
1551

1552
  return 0;
2,344✔
1553
}
1554

1555
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,253✔
1556
  SMnode     *pMnode = pReq->info.node;
11,253✔
1557
  SSdb       *pSdb = pMnode->pSdb;
11,253✔
1558
  int32_t     numOfRows = 0;
11,253✔
1559
  SStreamObj *pStream = NULL;
11,253✔
1560
  int32_t     code = 0;
11,253✔
1561

1562
  while (numOfRows < rows) {
44,398!
1563
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
44,398✔
1564
    if (pShow->pIter == NULL) break;
44,399✔
1565

1566
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
33,142✔
1567
    if (code == 0) {
33,122!
1568
      numOfRows++;
33,123✔
1569
    }
1570
    sdbRelease(pSdb, pStream);
33,122✔
1571
  }
1572

1573
  pShow->numOfRows += numOfRows;
11,257✔
1574
  return numOfRows;
11,257✔
1575
}
1576

1577
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1578
  SSdb *pSdb = pMnode->pSdb;
×
1579
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1580
}
×
1581

1582
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
19,697✔
1583
  SMnode     *pMnode = pReq->info.node;
19,697✔
1584
  SSdb       *pSdb = pMnode->pSdb;
19,697✔
1585
  int32_t     numOfRows = 0;
19,697✔
1586
  SStreamObj *pStream = NULL;
19,697✔
1587
  int32_t     code = 0;
19,697✔
1588

1589
  streamMutexLock(&execInfo.lock);
19,697✔
1590
  mndInitStreamExecInfo(pMnode, &execInfo);
19,709✔
1591
  streamMutexUnlock(&execInfo.lock);
19,709✔
1592

1593
  while (numOfRows < rowsCapacity) {
77,907✔
1594
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
77,867✔
1595
    if (pShow->pIter == NULL) {
77,867✔
1596
      break;
19,669✔
1597
    }
1598

1599
    // lock
1600
    taosRLockLatch(&pStream->lock);
58,198✔
1601

1602
    int32_t count = mndGetNumOfStreamTasks(pStream);
58,198✔
1603
    if (numOfRows + count > rowsCapacity) {
58,151✔
1604
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
30✔
1605
      if (code) {
30!
1606
        mError("failed to prepare the result block buffer, quit return value");
×
1607
        taosRUnLockLatch(&pStream->lock);
×
1608
        sdbRelease(pSdb, pStream);
×
1609
        continue;
×
1610
      }
1611
    }
1612

1613
    // add row for each task
1614
    SStreamTaskIter *pIter = NULL;
58,151✔
1615
    code = createStreamTaskIter(pStream, &pIter);
58,151✔
1616
    if (code) {
58,178!
UNCOV
1617
      taosRUnLockLatch(&pStream->lock);
×
1618
      sdbRelease(pSdb, pStream);
×
1619
      mError("failed to create task iter for stream:%s", pStream->name);
×
1620
      continue;
×
1621
    }
1622

1623
    while (streamTaskIterNextTask(pIter)) {
257,441✔
1624
      SStreamTask *pTask = NULL;
199,142✔
1625
      code = streamTaskIterGetCurrent(pIter, &pTask);
199,142✔
1626
      if (code) {
199,273!
1627
        destroyStreamTaskIter(pIter);
×
1628
        break;
×
1629
      }
1630

1631
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
199,273✔
1632
      if (code == TSDB_CODE_SUCCESS) {
199,263!
1633
        numOfRows++;
199,267✔
1634
      }
1635
    }
1636

1637
    pBlock->info.rows = numOfRows;
57,642✔
1638

1639
    destroyStreamTaskIter(pIter);
57,642✔
1640
    taosRUnLockLatch(&pStream->lock);
58,160✔
1641

1642
    sdbRelease(pSdb, pStream);
58,195✔
1643
  }
1644

1645
  pShow->numOfRows += numOfRows;
19,709✔
1646
  return numOfRows;
19,709✔
1647
}
1648

1649
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
1650
  SSdb *pSdb = pMnode->pSdb;
×
1651
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1652
}
×
1653

1654
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
718✔
1655
  SMnode     *pMnode = pReq->info.node;
718✔
1656
  SStreamObj *pStream = NULL;
718✔
1657
  int32_t     code = 0;
718✔
1658

1659
  SMPauseStreamReq pauseReq = {0};
718✔
1660
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
718!
1661
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1662
  }
1663

1664
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
718✔
1665
  if (pStream == NULL || code != 0) {
718!
1666
    if (pauseReq.igNotExists) {
401✔
1667
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
148!
1668
      return 0;
148✔
1669
    } else {
1670
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
253!
1671
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
253✔
1672
    }
1673
  }
1674

1675
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
317!
1676

1677
  if (pStream->status == STREAM_STATUS__PAUSE) {
317!
1678
    sdbRelease(pMnode->pSdb, pStream);
×
1679
    return 0;
×
1680
  }
1681

1682
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
317!
1683
    sdbRelease(pMnode->pSdb, pStream);
×
1684
    return code;
×
1685
  }
1686

1687
  // check if it is conflict with other trans in both sourceDb and targetDb.
1688
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
317✔
1689
  if (code) {
317!
1690
    sdbRelease(pMnode->pSdb, pStream);
×
1691
    TAOS_RETURN(code);
×
1692
  }
1693

1694
  bool updated = mndStreamNodeIsUpdated(pMnode);
317✔
1695
  if (updated) {
317!
UNCOV
1696
    mError("tasks are not ready for pause, node update detected");
×
UNCOV
1697
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1698
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1699
  }
1700

1701
  {  // check for tasks, if tasks are not ready, not allowed to pause
1702
    bool found = false;
317✔
1703
    bool readyToPause = true;
317✔
1704
    streamMutexLock(&execInfo.lock);
317✔
1705

1706
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
4,771✔
1707
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
4,454✔
1708
      if (p == NULL) {
4,454!
1709
        continue;
×
1710
      }
1711

1712
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
4,454✔
1713
      if (pEntry == NULL) {
4,454!
1714
        continue;
×
1715
      }
1716

1717
      if (pEntry->id.streamId != pStream->uid) {
4,454✔
1718
        continue;
2,949✔
1719
      }
1720

1721
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
1,505!
1722
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
160!
1723
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1724
        readyToPause = false;
160✔
1725
      }
1726

1727
      found = true;
1,505✔
1728
    }
1729

1730
    streamMutexUnlock(&execInfo.lock);
317✔
1731
    if (!found) {
317!
1732
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1733
      sdbRelease(pMnode->pSdb, pStream);
×
1734
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1735
    }
1736

1737
    if (!readyToPause) {
317✔
1738
      mError("stream:%s task not ready for pause yet", pauseReq.name);
45!
1739
      sdbRelease(pMnode->pSdb, pStream);
45✔
1740
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
45✔
1741
    }
1742
  }
1743

1744
  STrans *pTrans = NULL;
272✔
1745
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
272✔
1746
  if (pTrans == NULL || code) {
272!
1747
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1748
    sdbRelease(pMnode->pSdb, pStream);
×
1749
    return code;
×
1750
  }
1751

1752
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
272✔
1753
  if (code) {
272!
1754
    sdbRelease(pMnode->pSdb, pStream);
×
1755
    mndTransDrop(pTrans);
×
1756
    return code;
×
1757
  }
1758

1759
  // if nodeUpdate happened, not send pause trans
1760
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
272✔
1761
  if (code) {
272!
1762
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1763
    sdbRelease(pMnode->pSdb, pStream);
×
1764
    mndTransDrop(pTrans);
×
1765
    return code;
×
1766
  }
1767

1768
  // pause stream
1769
  taosWLockLatch(&pStream->lock);
272✔
1770
  pStream->status = STREAM_STATUS__PAUSE;
272✔
1771
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
272✔
1772
  if (code) {
272!
1773
    taosWUnLockLatch(&pStream->lock);
×
1774
    sdbRelease(pMnode->pSdb, pStream);
×
1775
    mndTransDrop(pTrans);
×
1776
    return code;
×
1777
  }
1778

1779
  taosWUnLockLatch(&pStream->lock);
272✔
1780

1781
  code = mndTransPrepare(pMnode, pTrans);
272✔
1782
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
272!
1783
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1784
    sdbRelease(pMnode->pSdb, pStream);
×
1785
    mndTransDrop(pTrans);
×
1786
    return code;
×
1787
  }
1788

1789
  sdbRelease(pMnode->pSdb, pStream);
272✔
1790
  mndTransDrop(pTrans);
272✔
1791

1792
  return TSDB_CODE_ACTION_IN_PROGRESS;
272✔
1793
}
1794

1795
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
819✔
1796
  SMnode     *pMnode = pReq->info.node;
819✔
1797
  SStreamObj *pStream = NULL;
819✔
1798
  int32_t     code = 0;
819✔
1799

1800
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
819!
1801
    return code;
×
1802
  }
1803

1804
  SMResumeStreamReq resumeReq = {0};
819✔
1805
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
819!
1806
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1807
  }
1808

1809
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
819✔
1810
  if (pStream == NULL || code != 0) {
819!
1811
    if (resumeReq.igNotExists) {
296✔
1812
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
295!
1813
      sdbRelease(pMnode->pSdb, pStream);
295✔
1814
      return 0;
295✔
1815
    } else {
1816
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1817
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1818
    }
1819
  }
1820

1821
  if (pStream->status != STREAM_STATUS__PAUSE) {
523✔
1822
    sdbRelease(pMnode->pSdb, pStream);
252✔
1823
    return 0;
252✔
1824
  }
1825

1826
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
271!
1827
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
271!
1828
    sdbRelease(pMnode->pSdb, pStream);
×
1829
    return -1;
×
1830
  }
1831

1832
  // check if it is conflict with other trans in both sourceDb and targetDb.
1833
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
271✔
1834
  if (code) {
271!
1835
    sdbRelease(pMnode->pSdb, pStream);
×
1836
    return code;
×
1837
  }
1838

1839
  STrans *pTrans = NULL;
271✔
1840
  code =
1841
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
271✔
1842
  if (pTrans == NULL || code) {
271!
1843
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
1844
    sdbRelease(pMnode->pSdb, pStream);
×
1845
    return code;
×
1846
  }
1847

1848
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
271✔
1849
  if (code) {
271!
1850
    sdbRelease(pMnode->pSdb, pStream);
×
1851
    mndTransDrop(pTrans);
×
1852
    return code;
×
1853
  }
1854

1855
  // set the resume action
1856
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
271✔
1857
  if (code) {
271!
1858
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
1859
    sdbRelease(pMnode->pSdb, pStream);
×
1860
    mndTransDrop(pTrans);
×
1861
    return code;
×
1862
  }
1863

1864
  // resume stream
1865
  taosWLockLatch(&pStream->lock);
271✔
1866
  pStream->status = STREAM_STATUS__NORMAL;
271✔
1867
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
271!
1868
    taosWUnLockLatch(&pStream->lock);
×
1869

1870
    sdbRelease(pMnode->pSdb, pStream);
×
1871
    mndTransDrop(pTrans);
×
1872
    return code;
×
1873
  }
1874

1875
  taosWUnLockLatch(&pStream->lock);
271✔
1876
  code = mndTransPrepare(pMnode, pTrans);
271✔
1877
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
271!
1878
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1879
    sdbRelease(pMnode->pSdb, pStream);
×
1880
    mndTransDrop(pTrans);
×
1881
    return code;
×
1882
  }
1883

1884
  sdbRelease(pMnode->pSdb, pStream);
271✔
1885
  mndTransDrop(pTrans);
271✔
1886

1887
  return TSDB_CODE_ACTION_IN_PROGRESS;
271✔
1888
}
1889

1890
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
2✔
1891
  SSdb       *pSdb = pMnode->pSdb;
2✔
1892
  SStreamObj *pStream = NULL;
2✔
1893
  void       *pIter = NULL;
2✔
1894
  STrans     *pTrans = NULL;
2✔
1895
  int32_t     code = 0;
2✔
1896

1897
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
1898
  while (1) {
1899
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4✔
1900
    if (pIter == NULL) {
4✔
1901
      break;
2✔
1902
    }
1903

1904
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
2✔
1905
    sdbRelease(pSdb, pStream);
2✔
1906

1907
    if (code) {
2!
1908
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
1909
      sdbCancelFetch(pSdb, pIter);
×
1910
      return code;
×
1911
    }
1912
  }
1913

1914
  while (1) {
1915
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4✔
1916
    if (pIter == NULL) {
4✔
1917
      break;
2✔
1918
    }
1919

1920
    // here create only one trans
1921
    if (pTrans == NULL) {
2!
1922
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets", &pTrans);
2✔
1923
      if (pTrans == NULL || code) {
2!
1924
        sdbRelease(pSdb, pStream);
×
1925
        sdbCancelFetch(pSdb, pIter);
×
1926
        return terrno = code;
×
1927
      }
1928

1929
      code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
2✔
1930
      if (code) {
2!
1931
        mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
1932
      }
1933
    }
1934

1935
    if (!includeAllNodes) {
2!
1936
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
2✔
1937
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
2✔
1938
      if (p1 == NULL && p2 == NULL) {
2!
1939
        mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
×
1940
        sdbRelease(pSdb, pStream);
×
1941
        continue;
×
1942
      }
1943
    }
1944

1945
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
2✔
1946
           pStream->name, pTrans->id);
1947

1948
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
2✔
1949

1950
    // todo: not continue, drop all and retry again
1951
    if (code != TSDB_CODE_SUCCESS) {
2!
1952
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
1953
             tstrerror(code));
1954
      sdbRelease(pSdb, pStream);
×
1955
      continue;
×
1956
    }
1957

1958
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
2✔
1959
    sdbRelease(pSdb, pStream);
2✔
1960

1961
    if (code != TSDB_CODE_SUCCESS) {
2!
1962
      sdbCancelFetch(pSdb, pIter);
×
1963
      return code;
×
1964
    }
1965
  }
1966

1967
  // no need to build the trans to handle the vgroup update
1968
  if (pTrans == NULL) {
2!
1969
    return 0;
×
1970
  }
1971

1972
  code = mndTransPrepare(pMnode, pTrans);
2✔
1973
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
1974
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
1975
    sdbRelease(pMnode->pSdb, pStream);
×
1976
    mndTransDrop(pTrans);
×
1977
    return code;
×
1978
  }
1979

1980
  sdbRelease(pMnode->pSdb, pStream);
2✔
1981
  mndTransDrop(pTrans);
2✔
1982
  return code;
2✔
1983
}
1984

1985
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
755✔
1986
  SSdb       *pSdb = pMnode->pSdb;
755✔
1987
  SStreamObj *pStream = NULL;
755✔
1988
  void       *pIter = NULL;
755✔
1989
  int32_t     code = 0;
755✔
1990

1991
  mDebug("start to refresh node list by existed streams");
755✔
1992

1993
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
755✔
1994
  if (pHash == NULL) {
755!
1995
    return terrno;
×
1996
  }
1997

1998
  while (1) {
2✔
1999
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
757✔
2000
    if (pIter == NULL) {
757✔
2001
      break;
755✔
2002
    }
2003

2004
    taosWLockLatch(&pStream->lock);
2✔
2005

2006
    SStreamTaskIter *pTaskIter = NULL;
2✔
2007
    code = createStreamTaskIter(pStream, &pTaskIter);
2✔
2008
    if (code) {
2!
2009
      taosWUnLockLatch(&pStream->lock);
×
2010
      sdbRelease(pSdb, pStream);
×
2011
      mError("failed to create task iter for stream:%s", pStream->name);
×
2012
      continue;
×
2013
    }
2014

2015
    while (streamTaskIterNextTask(pTaskIter)) {
13✔
2016
      SStreamTask *pTask = NULL;
11✔
2017
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
11✔
2018
      if (code) {
11!
2019
        break;
×
2020
      }
2021

2022
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
11✔
2023
      epsetAssign(&entry.epset, &pTask->info.epSet);
11✔
2024
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
11✔
2025
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
11!
2026
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2027
      }
2028
    }
2029

2030
    destroyStreamTaskIter(pTaskIter);
2✔
2031
    taosWUnLockLatch(&pStream->lock);
2✔
2032

2033
    sdbRelease(pSdb, pStream);
2✔
2034
  }
2035

2036
  taosArrayClear(pNodeList);
755✔
2037

2038
  // convert to list
2039
  pIter = NULL;
755✔
2040
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
761✔
2041
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
6✔
2042

2043
    void *p = taosArrayPush(pNodeList, pEntry);
6✔
2044
    if (p == NULL) {
6!
2045
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2046
      if (code == 0) {
×
2047
        code = terrno;
×
2048
      }
2049
      continue;
×
2050
    }
2051

2052
    char buf[256] = {0};
6✔
2053
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
6✔
2054
    if (ret != 0) {  // print error and continue
6!
2055
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2056
    }
2057

2058
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
6✔
2059
  }
2060

2061
  taosHashCleanup(pHash);
755✔
2062

2063
  mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
755✔
2064
  return code;
755✔
2065
}
2066

2067
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2068
  void   *pIter = NULL;
×
2069
  int32_t code = 0;
×
2070
  while (1) {
×
2071
    SVgObj *pVgroup = NULL;
×
2072
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2073
    if (pIter == NULL) {
×
2074
      break;
×
2075
    }
2076

2077
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2078
    sdbRelease(pSdb, pVgroup);
×
2079

2080
    if (code == 0) {
×
2081
      int32_t size = taosHashGetSize(pDBMap);
×
2082
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2083
    }
2084
  }
2085
}
×
2086

2087
// this function runs by only one thread, so it is not multi-thread safe
2088
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
1,339✔
2089
  int32_t code = 0;
1,339✔
2090
  bool    allReady = true;
1,339✔
2091
  SArray *pNodeSnapshot = NULL;
1,339✔
2092
  SMnode *pMnode = pMsg->info.node;
1,339✔
2093
  int64_t ts = taosGetTimestampSec();
1,339✔
2094
  bool    updateAllVgroups = false;
1,339✔
2095

2096
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
1,339✔
2097
  if (old != 0) {
1,339!
2098
    mDebug("still in checking node change");
×
2099
    return 0;
×
2100
  }
2101

2102
  mDebug("start to do node changing check");
1,339✔
2103

2104
  streamMutexLock(&execInfo.lock);
1,339✔
2105
  int32_t numOfNodes = extractStreamNodeList(pMnode);
1,339✔
2106
  streamMutexUnlock(&execInfo.lock);
1,339✔
2107

2108
  if (numOfNodes == 0) {
1,339!
2109
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2110
    execInfo.ts = ts;
×
2111
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2112
    return 0;
×
2113
  }
2114

2115
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
1,339✔
2116
  if (code) {
1,339!
2117
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2118
  }
2119

2120
  if (!allReady) {
1,339✔
2121
    taosArrayDestroy(pNodeSnapshot);
43✔
2122
    atomic_store_32(&mndNodeCheckSentinel, 0);
43✔
2123
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
43!
2124
    return 0;
43✔
2125
  }
2126

2127
  streamMutexLock(&execInfo.lock);
1,296✔
2128

2129
  code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
1,296✔
2130
  if (code) {
1,296!
2131
    goto _end;
×
2132
  }
2133

2134
  SVgroupChangeInfo changeInfo = {0};
1,296✔
2135
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
1,296✔
2136
  if (code) {
1,296!
2137
    goto _end;
×
2138
  }
2139

2140
  {
2141
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
1,296!
2142
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2143
      updateAllVgroups = true;
×
2144
      execInfo.switchFromFollower = false;  // reset the flag
×
2145
      addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
×
2146
    }
2147
  }
2148

2149
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
1,296!
2150
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2151
    killAllCheckpointTrans(pMnode, &changeInfo);
2✔
2152
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
2✔
2153

2154
    // keep the new vnode snapshot if success
2155
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2156
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
2✔
2157
      if (code) {
2!
2158
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
2159
        goto _end;
×
2160
      }
2161

2162
      execInfo.ts = ts;
2✔
2163
      mDebug("create trans successfully, update cached node list, numOfNodes:%d",
2✔
2164
             (int)taosArrayGetSize(execInfo.pNodeList));
2165
    } else {
2166
      mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
×
2167
    }
2168
  } else {
2169
    mDebug("no update found in nodeList");
1,294✔
2170
  }
2171

2172
  mndDestroyVgroupChangeInfo(&changeInfo);
1,296✔
2173

2174
  _end:
1,296✔
2175
  streamMutexUnlock(&execInfo.lock);
1,296✔
2176
  taosArrayDestroy(pNodeSnapshot);
1,296✔
2177

2178
  mDebug("end to do stream task node change checking");
1,296✔
2179
  atomic_store_32(&mndNodeCheckSentinel, 0);
1,296✔
2180
  return 0;
1,296✔
2181
}
2182

2183
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
2,689✔
2184
  SMnode *pMnode = pReq->info.node;
2,689✔
2185
  SSdb   *pSdb = pMnode->pSdb;
2,689✔
2186
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
2,689✔
2187
    return 0;
1,350✔
2188
  }
2189

2190
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
1,339✔
2191
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
1,339✔
2192
  if (pMsg == NULL) {
1,339!
2193
    return terrno;
×
2194
  }
2195

2196
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
1,339✔
2197
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
1,339✔
2198
}
2199

2200
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
1,801✔
2201
  SStreamTaskIter *pIter = NULL;
1,801✔
2202
  int32_t          code = createStreamTaskIter(pStream, &pIter);
1,801✔
2203
  if (code) {
1,801!
2204
    mError("failed to create task iter for stream:%s", pStream->name);
×
2205
    return;
×
2206
  }
2207

2208
  while (streamTaskIterNextTask(pIter)) {
11,023✔
2209
    SStreamTask *pTask = NULL;
9,222✔
2210
    code = streamTaskIterGetCurrent(pIter, &pTask);
9,222✔
2211
    if (code) {
9,222!
2212
      break;
×
2213
    }
2214

2215
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
9,222✔
2216
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
9,222✔
2217
    if (p == NULL) {
9,222✔
2218
      STaskStatusEntry entry = {0};
8,624✔
2219
      streamTaskStatusInit(&entry, pTask);
8,624✔
2220

2221
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
8,624✔
2222
      if (code == 0) {
8,624!
2223
        void *  px = taosArrayPush(pExecNode->pTaskList, &id);
8,624✔
2224
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
8,624✔
2225
        if (px) {
8,624!
2226
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
8,624!
2227
        } else {
2228
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2229
        }
2230
      } else {
2231
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t) entry.id.taskId);
×
2232
      }
2233

2234
      // add the new vgroups if not added yet
2235
      bool exist = false;
8,624✔
2236
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
47,592✔
2237
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
45,873✔
2238
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
45,873!
2239
          exist = true;
6,905✔
2240
          break;
6,905✔
2241
        }
2242
      }
2243

2244
      if (!exist) {
8,624✔
2245
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
1,719✔
2246
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
1,719✔
2247

2248
        void* px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
1,719✔
2249
        if (px) {
1,719!
2250
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
1,719!
2251
        } else {
2252
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList))
×
2253
        }
2254
      }
2255
    }
2256
  }
2257

2258
  destroyStreamTaskIter(pIter);
1,801✔
2259
}
2260

2261
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
4,710✔
2262
  int32_t num = taosArrayGetSize(pList);
4,710✔
2263
  for (int32_t i = 0; i < num; ++i) {
17,400✔
2264
    int32_t *pId = taosArrayGet(pList, i);
12,691✔
2265
    if (pId == NULL) {
12,691!
2266
      continue;
×
2267
    }
2268

2269
    if (taskId == *pId) {
12,691✔
2270
      return;
1✔
2271
    }
2272
  }
2273

2274
  int32_t numOfTasks = taosArrayGetSize(pList);
4,709✔
2275
  void   *p = taosArrayPush(pList, &taskId);
4,709✔
2276
  if (p) {
4,709!
2277
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
4,709✔
2278
  } else {
2279
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2280
           uid, numOfTasks);
2281
  }
2282
}
2283

2284
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
4,710✔
2285
  SMnode                  *pMnode = pReq->info.node;
4,710✔
2286
  SStreamTaskCheckpointReq req = {0};
4,710✔
2287

2288
  SDecoder decoder = {0};
4,710✔
2289
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
4,710✔
2290

2291
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
4,710!
2292
    tDecoderClear(&decoder);
×
2293
    mError("invalid task checkpoint req msg received");
×
2294
    return TSDB_CODE_INVALID_MSG;
×
2295
  }
2296
  tDecoderClear(&decoder);
4,710✔
2297

2298
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
4,710✔
2299

2300
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2301
  streamMutexLock(&execInfo.lock);
4,710✔
2302

2303
  SStreamObj *pStream = NULL;
4,710✔
2304
  int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream);
4,710✔
2305
  if (pStream == NULL || code != 0) {
4,710!
2306
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2307
          req.streamId);
2308

2309
    // not in meta-store yet, try to acquire the task in exec buffer
2310
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2311
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2312
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2313
    if (p == NULL) {
×
2314
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2315
      streamMutexUnlock(&execInfo.lock);
×
2316
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2317
    } else {
2318
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2319
             req.streamId, req.taskId);
2320
    }
2321
  }
2322

2323
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
4,710!
2324

2325
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
4,710✔
2326
  if (pReqTaskList == NULL) {
4,710✔
2327
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
807✔
2328
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
807✔
2329
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
807✔
2330
    if (code) {
807!
2331
      mError("failed to put into transfer state stream map, code: out of memory");
×
2332
    }
2333
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
807✔
2334
  } else {
2335
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
3,903✔
2336
  }
2337

2338
  int32_t total = taosArrayGetSize(*pReqTaskList);
4,710✔
2339
  if (total == numOfTasks) {  // all tasks has send the reqs
4,710✔
2340
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
798✔
2341
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
798!
2342

2343
    if (pStream != NULL) {  // TODO:handle error
798!
2344
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
798✔
2345
      if (code) {
798!
2346
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
798!
2347
      }
2348
    } else {
2349
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2350
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2351
      // sleep(500ms)
2352
    }
2353

2354
    // remove this entry
2355
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
798✔
2356

2357
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
798✔
2358
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
798✔
2359
  }
2360

2361
  if (pStream != NULL) {
4,710!
2362
    mndReleaseStream(pMnode, pStream);
4,710✔
2363
  }
2364

2365
  streamMutexUnlock(&execInfo.lock);
4,710✔
2366

2367
  {
2368
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
4,710✔
2369
    rsp.pCont = rpcMallocCont(rsp.contLen);
4,710✔
2370
    if (rsp.pCont == NULL) {
4,710!
2371
      return terrno;
×
2372
    }
2373

2374
    SMsgHead *pHead = rsp.pCont;
4,710✔
2375
    pHead->vgId = htonl(req.nodeId);
4,710✔
2376

2377
    tmsgSendRsp(&rsp);
4,710✔
2378
    pReq->info.handle = NULL;  // disable auto rsp
4,710✔
2379
  }
2380

2381
  return 0;
4,710✔
2382
}
2383

2384
// valid the info according to the HbMsg
2385
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
6,633✔
2386
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
6,633✔
2387
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
6,633✔
2388
  if (pTaskEntry == NULL) {
6,633✔
2389
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
18!
2390
    return false;
18✔
2391
  }
2392

2393
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
6,615!
2394
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2395
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2396
    return false;
×
2397
  }
2398

2399
  // now the task in checkpoint procedure
2400
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
6,615!
2401
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2402
           " discard",
2403
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2404
    return false;
×
2405
  }
2406

2407
  if (reportChkptId >= pReport->checkpointId) {
6,615!
2408
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2409
           " discard",
2410
           pReport->taskId, pReport->checkpointId, reportChkptId);
2411
    return false;
×
2412
  }
2413

2414
  return true;
6,615✔
2415
}
2416

2417
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) {
6,633✔
2418
  bool valid = validateChkptReport(pReport, reportChkptId);
6,633✔
2419
  if (!valid) {
6,633✔
2420
    return;
18✔
2421
  }
2422

2423
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
22,372✔
2424
    STaskChkptInfo *p = taosArrayGet(pList, i);
15,757✔
2425
    if (p == NULL) {
15,757!
2426
      continue;
×
2427
    }
2428

2429
    if (p->taskId == pReport->taskId) {
15,757!
2430
      if (p->checkpointId > pReport->checkpointId) {
×
2431
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2432
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2433
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2434
        mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2435
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2436

2437
        // update the checkpoint report info
2438
        p->checkpointId = pReport->checkpointId;
×
2439
        p->ts = pReport->checkpointTs;
×
2440
        p->version = pReport->checkpointVer;
×
2441
        p->transId = pReport->transId;
×
2442
        p->dropHTask = pReport->dropHTask;
×
2443
      } else {
2444
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2445
      }
2446
      return;
×
2447
    }
2448
  }
2449

2450
  STaskChkptInfo info = {
6,615✔
2451
      .streamId = pReport->streamId,
6,615✔
2452
      .taskId = pReport->taskId,
6,615✔
2453
      .transId = pReport->transId,
6,615✔
2454
      .dropHTask = pReport->dropHTask,
6,615✔
2455
      .version = pReport->checkpointVer,
6,615✔
2456
      .ts = pReport->checkpointTs,
6,615✔
2457
      .checkpointId = pReport->checkpointId,
6,615✔
2458
      .nodeId = pReport->nodeId,
6,615✔
2459
  };
2460

2461
  void *p = taosArrayPush(pList, &info);
6,615✔
2462
  if (p == NULL) {
6,615!
2463
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2464
  } else {
2465
    int32_t size = taosArrayGetSize(pList);
6,615✔
2466
    mDebug("stream:0x%"PRIx64" %d tasks has send checkpoint-report", pReport->streamId, size);
6,615✔
2467
  }
2468
}
2469

2470
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
6,633✔
2471
  SMnode           *pMnode = pReq->info.node;
6,633✔
2472
  SCheckpointReport req = {0};
6,633✔
2473

2474
  SDecoder decoder = {0};
6,633✔
2475
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
6,633✔
2476

2477
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
6,633!
2478
    tDecoderClear(&decoder);
×
2479
    mError("invalid task checkpoint-report msg received");
×
2480
    return TSDB_CODE_INVALID_MSG;
×
2481
  }
2482
  tDecoderClear(&decoder);
6,633✔
2483

2484
  streamMutexLock(&execInfo.lock);
6,633✔
2485
  mndInitStreamExecInfo(pMnode, &execInfo);
6,633✔
2486
  streamMutexUnlock(&execInfo.lock);
6,633✔
2487

2488
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
6,633✔
2489
         " checkpointVer:%" PRId64 " transId:%d",
2490
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2491

2492
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2493
  streamMutexLock(&execInfo.lock);
6,633✔
2494

2495
  SStreamObj *pStream = NULL;
6,633✔
2496
  int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream);
6,633✔
2497
  if (pStream == NULL || code != 0) {
6,633!
2498
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2499

2500
    // not in meta-store yet, try to acquire the task in exec buffer
2501
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2502
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2503
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2504
    if (p == NULL) {
×
2505
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2506
      streamMutexUnlock(&execInfo.lock);
×
2507
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2508
    } else {
2509
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2510
             req.streamId, req.taskId);
2511
    }
2512
  }
2513

2514
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
6,633!
2515

2516
  SChkptReportInfo *pInfo = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
6,633✔
2517
  if (pInfo == NULL) {
6,633✔
2518
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
802✔
2519
    if (info.pTaskList != NULL) {
802!
2520
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
802✔
2521
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
802✔
2522
      if (code) {
802!
2523
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2524
      }
2525

2526
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
802✔
2527
    }
2528
  } else {
2529
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
5,831✔
2530
  }
2531

2532
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
6,633✔
2533
  if (total == numOfTasks) {  // all tasks has send the reqs
6,633✔
2534
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
1,374!
2535
          " will be issued soon",
2536
          req.streamId, pStream->name, total, req.checkpointId);
2537
  }
2538

2539
  if (pStream != NULL) {
6,633!
2540
    mndReleaseStream(pMnode, pStream);
6,633✔
2541
  }
2542

2543
  streamMutexUnlock(&execInfo.lock);
6,633✔
2544

2545
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
6,633✔
2546
  return code;
6,633✔
2547
}
2548

2549
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks, bool *pAllSame) {
95✔
2550
  int32_t num = 0;
95✔
2551
  int64_t chkId = INT64_MAX;
95✔
2552
  *pExistedTasks = 0;
95✔
2553
  *pAllSame = true;
95✔
2554

2555
  for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
1,494✔
2556
    STaskId* p = taosArrayGet(execInfo.pTaskList, i);
1,399✔
2557
    if (p == NULL) {
1,399!
2558
      continue;
×
2559
    }
2560

2561
    if (p->streamId != streamId) {
1,399✔
2562
      continue;
868✔
2563
    }
2564

2565
    num += 1;
531✔
2566
    STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
531✔
2567
    if (chkId > pe->checkpointInfo.latestId) {
531✔
2568
      if (chkId != INT64_MAX) {
99✔
2569
        *pAllSame = false;
4✔
2570
      }
2571
      chkId = pe->checkpointInfo.latestId;
99✔
2572
    }
2573
  }
2574

2575
  *pExistedTasks = num;
95✔
2576
  if (num < numOfTasks) { // not all task send info to mnode through hbMsg, no valid checkpoint Id
95!
2577
    return -1;
×
2578
  }
2579

2580
  return chkId;
95✔
2581
}
2582

2583
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
6,633✔
2584
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
6,633✔
2585
  rsp.pCont = rpcMallocCont(rsp.contLen);
6,633✔
2586
  if (rsp.pCont != NULL) {
6,633!
2587
    SMsgHead *pHead = rsp.pCont;
6,633✔
2588
    pHead->vgId = htonl(vgId);
6,633✔
2589

2590
    tmsgSendRsp(&rsp);
6,633✔
2591
    pInfo->handle = NULL;  // disable auto rsp
6,633✔
2592
  }
2593
}
6,633✔
2594

2595
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
12,398✔
2596
  SMnode *pMnode = pMsg->info.node;
12,398✔
2597
  int64_t now = taosGetTimestampMs();
12,398✔
2598
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
12,398✔
2599
  if (pStreamList == NULL) {
12,398!
2600
    return terrno;
×
2601
  }
2602

2603
  mDebug("start to process consensus-checkpointId in tmr");
12,398✔
2604

2605
  bool    allReady = true;
12,398✔
2606
  SArray *pNodeSnapshot = NULL;
12,398✔
2607

2608
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
12,398✔
2609
  taosArrayDestroy(pNodeSnapshot);
12,398✔
2610
  if (code) {
12,398!
2611
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
2612
  }
2613

2614
  if (!allReady) {
12,398✔
2615
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1,473!
2616
    taosArrayDestroy(pStreamList);
1,473✔
2617
    return 0;
1,473✔
2618
  }
2619

2620
  streamMutexLock(&execInfo.lock);
10,925✔
2621

2622
  void *pIter = NULL;
10,925✔
2623
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
10,946✔
2624
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
21✔
2625

2626
    int64_t streamId = -1;
21✔
2627
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
21✔
2628
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
21✔
2629
    if (pList == NULL) {
21!
2630
      continue;
×
2631
    }
2632

2633
    SStreamObj *pStream = NULL;
21✔
2634
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
21✔
2635
    if (pStream == NULL || code != 0) {  // stream has been dropped already
21!
2636
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2637
      taosArrayDestroy(pList);
×
2638
      continue;
×
2639
    }
2640

2641
    for (int32_t j = 0; j < num; ++j) {
116✔
2642
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
95✔
2643
      if (pe == NULL) {
95!
2644
        continue;
×
2645
      }
2646

2647
      streamId = pe->req.streamId;
95✔
2648

2649
      int32_t existed = 0;
95✔
2650
      bool    allSame = true;
95✔
2651
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
95✔
2652
      if (chkId == -1) {
95!
2653
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2654
               pInfo->numOfTasks, pe->req.taskId);
2655
        break;
×
2656
      }
2657

2658
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
95✔
2659
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
91✔
2660
               pe->req.startTs, (now - pe->ts) / 1000.0);
2661
        if (chkId > pe->req.checkpointId) {
91!
2662
          streamMutexUnlock(&execInfo.lock);
×
2663
          taosArrayDestroy(pStreamList);
×
2664
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2665
                 pe->req.checkpointId, chkId);
2666
          return TSDB_CODE_FAILED;
×
2667
        }
2668
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
91✔
2669
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
91!
UNCOV
2670
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2671
        }
2672

2673
        void* p = taosArrayPush(pList, &pe->req.taskId);
91✔
2674
        if (p == NULL) {
91!
2675
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2676
        }
2677
        streamId = pe->req.streamId;
91✔
2678
      } else {
2679
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
4!
2680
               pe->req.startTs, (now - pe->ts) / 1000.0);
2681
      }
2682
    }
2683

2684
    mndReleaseStream(pMnode, pStream);
21✔
2685

2686
    if (taosArrayGetSize(pList) > 0) {
21✔
2687
      for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
111✔
2688
        int32_t *taskId = taosArrayGet(pList, i);
91✔
2689
        if (taskId == NULL) {
91!
2690
          continue;
×
2691
        }
2692

2693
        for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
91!
2694
          SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
91✔
2695
          if ((pe != NULL) && (pe->req.taskId == *taskId)) {
91!
2696
            taosArrayRemove(pInfo->pTaskList, k);
91✔
2697
            break;
91✔
2698
          }
2699
        }
2700
      }
2701
    }
2702

2703
    taosArrayDestroy(pList);
21✔
2704

2705
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
21✔
2706
      mndClearConsensusRspEntry(pInfo);
20✔
2707
      if (streamId == -1) {
20!
2708
        streamMutexUnlock(&execInfo.lock);
×
2709
        taosArrayDestroy(pStreamList);
×
2710
        mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
×
2711
        return TSDB_CODE_FAILED;
×
2712
      }
2713
      void* p = taosArrayPush(pStreamList, &streamId);
20✔
2714
      if (p == NULL) {
20!
2715
        mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
×
2716
      }
2717
    }
2718
  }
2719

2720
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
10,945✔
2721
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
20✔
2722
    if (pStreamId == NULL) {
20!
2723
      continue;
×
2724
    }
2725

2726
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
20✔
2727
  }
2728

2729
  streamMutexUnlock(&execInfo.lock);
10,925✔
2730

2731
  taosArrayDestroy(pStreamList);
10,925✔
2732
  mDebug("end to process consensus-checkpointId in tmr");
10,925✔
2733
  return code;
10,925✔
2734
}
2735

2736
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
259✔
2737
  int32_t code = mndProcessCreateStreamReq(pReq);
259✔
2738
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
259!
2739
    pReq->info.rsp = rpcMallocCont(1);
×
2740
    if (pReq->info.rsp == NULL) {
×
2741
      return terrno;
×
2742
    }
2743

2744
    pReq->info.rspLen = 1;
×
2745
    pReq->info.noResp = false;
×
2746
    pReq->code = code;
×
2747
  }
2748
  return code;
259✔
2749
}
2750

2751
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
225✔
2752
  int32_t code = mndProcessDropStreamReq(pReq);
225✔
2753
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
225!
2754
    pReq->info.rsp = rpcMallocCont(1);
21✔
2755
    if (pReq->info.rsp == NULL) {
21!
2756
      return terrno;
×
2757
    }
2758

2759
    pReq->info.rspLen = 1;
21✔
2760
    pReq->info.noResp = false;
21✔
2761
    pReq->code = code;
21✔
2762
  }
2763
  return code;
225✔
2764
}
2765

2766
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
53,376✔
2767
  if (pExecInfo->initTaskList || pMnode == NULL) {
53,376!
2768
    return;
53,246✔
2769
  }
2770

2771
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
130✔
2772
  pExecInfo->initTaskList = true;
130✔
2773
}
2774

2775
void mndStreamResetInitTaskListLoadFlag() {
1,778✔
2776
  mInfo("reset task list buffer init flag for leader");
1,778!
2777
  execInfo.initTaskList = false;
1,778✔
2778
}
1,778✔
2779

2780
void mndUpdateStreamExecInfoRole(SMnode* pMnode, int32_t role) {
2,088✔
2781
  execInfo.switchFromFollower = false;
2,088✔
2782

2783
  if (execInfo.role == NODE_ROLE_UNINIT) {
2,088✔
2784
    execInfo.role = role;
1,901✔
2785
    if (role == NODE_ROLE_LEADER) {
1,901✔
2786
      mInfo("init mnode is set to leader");
1,718!
2787
    } else {
2788
      mInfo("init mnode is set to follower");
183!
2789
    }
2790
  } else {
2791
    if (role == NODE_ROLE_LEADER) {
187✔
2792
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
60!
2793
        execInfo.role = role;
60✔
2794
        execInfo.switchFromFollower = true;
60✔
2795
        mInfo("mnode switch to be leader from follower");
60!
2796
      } else {
2797
        mInfo("mnode remain to be leader, do nothing");
×
2798
      }
2799
    } else {  // follower's
2800
      if (execInfo.role == NODE_ROLE_LEADER) {
127✔
2801
        execInfo.role = role;
3✔
2802
        mInfo("mnode switch to be follower from leader");
3!
2803
      } else {
2804
        mInfo("mnode remain to be follower, do nothing");
124!
2805
      }
2806
    }
2807
  }
2808
}
2,088✔
2809

2810
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
130✔
2811
  SSdb       *pSdb = pMnode->pSdb;
130✔
2812
  SStreamObj *pStream = NULL;
130✔
2813
  void       *pIter = NULL;
130✔
2814

2815
  while (1) {
2816
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
335✔
2817
    if (pIter == NULL) {
335✔
2818
      break;
130✔
2819
    }
2820

2821
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
205✔
2822
    sdbRelease(pSdb, pStream);
205✔
2823
  }
2824
}
130✔
2825

2826
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
1,165✔
2827
  STrans *pTrans = NULL;
1,165✔
2828
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
1,165✔
2829
                                 "update checkpoint-info", &pTrans);
2830
  if (pTrans == NULL || code) {
1,165!
2831
    sdbRelease(pMnode->pSdb, pStream);
×
2832
    return code;
×
2833
  }
2834

2835
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
1,165✔
2836
  if (code){
1,165!
2837
    sdbRelease(pMnode->pSdb, pStream);
×
2838
    mndTransDrop(pTrans);
×
2839
    return code;
×
2840
  }
2841

2842
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
1,165✔
2843
  if (code) {
1,165!
2844
    sdbRelease(pMnode->pSdb, pStream);
×
2845
    mndTransDrop(pTrans);
×
2846
    return code;
×
2847
  }
2848

2849
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1,165✔
2850
  if (code) {
1,165!
2851
    sdbRelease(pMnode->pSdb, pStream);
×
2852
    mndTransDrop(pTrans);
×
2853
    return code;
×
2854
  }
2855

2856
  code = mndTransPrepare(pMnode, pTrans);
1,165✔
2857
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,165!
2858
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
2859
    sdbRelease(pMnode->pSdb, pStream);
×
2860
    mndTransDrop(pTrans);
×
2861
    return code;
×
2862
  }
2863

2864
  sdbRelease(pMnode->pSdb, pStream);
1,165✔
2865
  mndTransDrop(pTrans);
1,165✔
2866

2867
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,165✔
2868
}
2869

2870
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
2✔
2871
  SMnode      *pMnode = pReq->info.node;
2✔
2872
  int32_t      code = 0;
2✔
2873
  SOrphanTask *pTask = NULL;
2✔
2874
  int32_t      i = 0;
2✔
2875
  STrans      *pTrans = NULL;
2✔
2876
  int32_t      numOfTasks = 0;
2✔
2877

2878
  SMStreamDropOrphanMsg msg = {0};
2✔
2879
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
2✔
2880
  if (code) {
2!
2881
    return code;
×
2882
  }
2883

2884
  numOfTasks = taosArrayGetSize(msg.pList);
2✔
2885
  if (numOfTasks == 0) {
2!
2886
    mDebug("no orphan tasks to drop, no need to create trans");
×
2887
    goto _err;
×
2888
  }
2889

2890
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
2!
2891

2892
  i = 0;
2✔
2893
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
2!
2894
    i += 1;
×
2895
  }
2896

2897
  if (pTask == NULL) {
2!
2898
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
2899
    goto _err;
×
2900
  }
2901

2902
  // check if it is conflict with other trans in both sourceDb and targetDb.
2903
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
2✔
2904
  if (code) {
2!
2905
    goto _err;
×
2906
  }
2907

2908
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
2✔
2909

2910
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
2✔
2911
  if (pTrans == NULL || code != 0) {
2!
2912
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
2913
    goto _err;
×
2914
  }
2915

2916
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
2✔
2917
  if (code) {
2!
2918
    goto _err;
×
2919
  }
2920

2921
  // drop all tasks
2922
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
2!
2923
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
2924
    goto _err;
×
2925
  }
2926

2927
  // drop stream
2928
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
2!
2929
    goto _err;
×
2930
  }
2931

2932
  code = mndTransPrepare(pMnode, pTrans);
2✔
2933
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2934
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
2935
    goto _err;
×
2936
  }
2937

2938
_err:
2✔
2939
  tDestroyDropOrphanTaskMsg(&msg);
2✔
2940
  mndTransDrop(pTrans);
2✔
2941

2942
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
2!
2943
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
2!
2944
  }
2945
  return code;
2✔
2946
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc