• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.75
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndScheduler.h"
21
#include "mndShow.h"
22
#include "mndStb.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "osMemory.h"
26
#include "parser.h"
27
#include "taoserror.h"
28
#include "tmisce.h"
29
#include "tname.h"
30

31
#define MND_STREAM_MAX_NUM 60
32

33
typedef struct {
34
  int8_t placeHolder;  // // to fix windows compile error, define place holder
35
} SMStreamNodeCheckMsg;
36

37
static int32_t  mndNodeCheckSentinel = 0;
38
SStreamExecInfo execInfo;
39

40
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
42
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
43
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
44
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
45

46
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
47
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
48

49
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
50
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
51
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
52
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
53
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
54
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
55
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
56
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
57
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
58
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
59
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
60
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
61
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
62
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
63
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
64
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
65
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
66
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
67

68
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
69
static void     removeExpiredNodeInfo(const SArray *pNodeSnapshot);
70
static int32_t  doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
71
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
72

73
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
74
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
75
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
77
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
78

79
int32_t mndInitStream(SMnode *pMnode) {
716✔
80
  SSdbTable table = {
716✔
81
      .sdbType = SDB_STREAM,
82
      .keyType = SDB_KEY_BINARY,
83
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
84
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
85
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
86
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
87
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
88
  };
89
  SSdbTable tableSeq = {
716✔
90
      .sdbType = SDB_STREAM_SEQ,
91
      .keyType = SDB_KEY_BINARY,
92
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
93
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
94
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
95
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
96
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
97
  };
98

99
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
716✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
716✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
716✔
102

103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
716✔
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
716✔
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
716✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
716✔
107
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
716✔
108
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
716✔
109
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
716✔
110
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
716✔
111
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
716✔
112

113
  // for msgs inside mnode
114
  // TODO change the name
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
716✔
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
716✔
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
716✔
118
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
716✔
119

120
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
716✔
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
716✔
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
716✔
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
716✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
716✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
716✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
716✔
127
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
716✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
716✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
716✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
716✔
131

132
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
716✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
716✔
134

135
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
716✔
136
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
716✔
137
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
716✔
138
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
716✔
139

140
  int32_t code = mndInitExecInfo();
716✔
141
  if (code) {
716!
142
    return code;
×
143
  }
144

145
  code = sdbSetTable(pMnode->pSdb, table);
716✔
146
  if (code) {
716!
147
    return code;
×
148
  }
149

150
  code = sdbSetTable(pMnode->pSdb, tableSeq);
716✔
151
  return code;
716✔
152
}
153

154
void mndCleanupStream(SMnode *pMnode) {
715✔
155
  taosArrayDestroy(execInfo.pTaskList);
715✔
156
  taosArrayDestroy(execInfo.pNodeList);
715✔
157
  taosArrayDestroy(execInfo.pKilledChkptTrans);
715✔
158
  taosHashCleanup(execInfo.pTaskMap);
715✔
159
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
715✔
160
  taosHashCleanup(execInfo.pTransferStateStreams);
715✔
161
  taosHashCleanup(execInfo.pChkptStreams);
715✔
162
  taosHashCleanup(execInfo.pStreamConsensus);
715✔
163
  (void) taosThreadMutexDestroy(&execInfo.lock);
715✔
164
  mDebug("mnd stream exec info cleanup");
715✔
165
}
715✔
166

167
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
2,877✔
168
  int32_t     code = 0;
2,877✔
169
  int32_t     lino = 0;
2,877✔
170
  SSdbRow *   pRow = NULL;
2,877✔
171
  SStreamObj *pStream = NULL;
2,877✔
172
  void *      buf = NULL;
2,877✔
173
  int8_t      sver = 0;
2,877✔
174
  int32_t     tlen;
175
  int32_t     dataPos = 0;
2,877✔
176

177
  code = sdbGetRawSoftVer(pRaw, &sver);
2,877✔
178
  TSDB_CHECK_CODE(code, lino, _over);
2,877!
179

180
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
2,877!
181
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
182
    goto _over;
×
183
  }
184

185
  pRow = sdbAllocRow(sizeof(SStreamObj));
2,877✔
186
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
2,877!
187

188
  pStream = sdbGetRowObj(pRow);
2,877✔
189
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
2,877!
190

191
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
2,877!
192

193
  buf = taosMemoryMalloc(tlen + 1);
2,877✔
194
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
2,877!
195

196
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
2,877!
197

198
  SDecoder decoder;
199
  tDecoderInit(&decoder, buf, tlen + 1);
2,877✔
200
  code = tDecodeSStreamObj(&decoder, pStream, sver);
2,877✔
201
  tDecoderClear(&decoder);
2,877✔
202

203
  if (code < 0) {
2,877!
204
    tFreeStreamObj(pStream);
×
205
  }
206

207
_over:
2,877✔
208
  taosMemoryFreeClear(buf);
2,877!
209

210
  if (code != TSDB_CODE_SUCCESS) {
2,877!
211
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
212
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
213
    taosMemoryFreeClear(pRow);
×
214

215
    terrno = code;
×
216
    return NULL;
×
217
  } else {
218
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
2,877!
219
           pStream->checkpointId);
220

221
    terrno = 0;
2,877✔
222
    return pRow;
2,877✔
223
  }
224
}
225

226
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
779✔
227
  mTrace("stream:%s, perform insert action", pStream->name);
779!
228
  return 0;
779✔
229
}
230

231
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
2,875✔
232
  mTrace("stream:%s, perform delete action", pStream->name);
2,875!
233
  taosWLockLatch(&pStream->lock);
2,875✔
234
  tFreeStreamObj(pStream);
2,875✔
235
  taosWUnLockLatch(&pStream->lock);
2,875✔
236
  return 0;
2,875✔
237
}
238

239
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
1,523✔
240
  mTrace("stream:%s, perform update action", pOldStream->name);
1,523!
241
  (void) atomic_exchange_32(&pOldStream->version, pNewStream->version);
1,523✔
242

243
  taosWLockLatch(&pOldStream->lock);
1,523✔
244

245
  pOldStream->status = pNewStream->status;
1,523✔
246
  pOldStream->updateTime = pNewStream->updateTime;
1,523✔
247
  pOldStream->checkpointId = pNewStream->checkpointId;
1,523✔
248
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
1,523✔
249

250
  taosWUnLockLatch(&pOldStream->lock);
1,523✔
251
  return 0;
1,523✔
252
}
253

254
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
1,451✔
255
  int32_t code = 0;
1,451✔
256
  SSdb *pSdb = pMnode->pSdb;
1,451✔
257
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
1,451✔
258
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
1,451!
259
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
871✔
260
  }
261
  return code;
1,451✔
262
}
263

264
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
4,922✔
265
  SSdb *pSdb = pMnode->pSdb;
4,922✔
266
  sdbRelease(pSdb, pStream);
4,922✔
267
}
4,922✔
268

269
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
270
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
271
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
272
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
273
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
274

275
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
734✔
276
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
734!
277
      pCreate->targetStbFullName[0] == 0) {
734!
278
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
279
  }
280
  return TSDB_CODE_SUCCESS;
734✔
281
}
282

283
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
732✔
284
  pWrapper->nCols = taosArrayGetSize(pFields);
732✔
285
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
732✔
286
  if (NULL == pWrapper->pSchema) {
732!
287
    return terrno;
×
288
  }
289

290
  int32_t index = 0;
732✔
291
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
9,905✔
292
    SField *pField = (SField *)taosArrayGet(pFields, i);
9,173✔
293
    if (pField == NULL) {
9,173!
294
      return terrno;
×
295
    }
296

297
    if (TSDB_DATA_TYPE_NULL == pField->type) {
9,173!
298
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
299
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
300
    } else {
301
      pWrapper->pSchema[index].type = pField->type;
9,173✔
302
      pWrapper->pSchema[index].bytes = pField->bytes;
9,173✔
303
    }
304
    pWrapper->pSchema[index].colId = index + 1;
9,173✔
305
    strcpy(pWrapper->pSchema[index].name, pField->name);
9,173✔
306
    pWrapper->pSchema[index].flags = pField->flags;
9,173✔
307
    index += 1;
9,173✔
308
  }
309

310
  return TSDB_CODE_SUCCESS;
732✔
311
}
312

313
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
732✔
314
  if (pWrapper->nCols < 2) {
732!
315
    return false;
×
316
  }
317
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
9,156✔
318
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
8,436✔
319
      return true;
12✔
320
    }
321
  }
322
  return false;
720✔
323
}
324

325
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
732✔
326
  SNode      *pAst = NULL;
732✔
327
  SQueryPlan *pPlan = NULL;
732✔
328
  int32_t     code = 0;
732✔
329

330
  mInfo("stream:%s to create", pCreate->name);
732!
331
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
732✔
332
  pObj->createTime = taosGetTimestampMs();
732✔
333
  pObj->updateTime = pObj->createTime;
732✔
334
  pObj->version = 1;
732✔
335

336
  if (pCreate->smaId > 0) {
732!
337
    pObj->subTableWithoutMd5 = 1;
×
338
  }
339

340
  pObj->smaId = pCreate->smaId;
732✔
341
  pObj->indexForMultiAggBalance = -1;
732✔
342

343
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
732✔
344

345
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
732✔
346
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
732✔
347

348
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
732✔
349
  pObj->status = 0;
732✔
350

351
  pObj->conf.igExpired = pCreate->igExpired;
732✔
352
  pObj->conf.trigger = pCreate->triggerType;
732✔
353
  pObj->conf.triggerParam = pCreate->maxDelay;
732✔
354
  pObj->conf.watermark = pCreate->watermark;
732✔
355
  pObj->conf.fillHistory = pCreate->fillHistory;
732✔
356
  pObj->deleteMark = pCreate->deleteMark;
732✔
357
  pObj->igCheckUpdate = pCreate->igUpdate;
732✔
358

359
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
732✔
360
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
732✔
361
  if (pSourceDb == NULL) {
732!
362
    code = terrno;
×
363
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, tstrerror(code));
×
364
    goto FAIL;
×
365
  }
366

367
  pObj->sourceDbUid = pSourceDb->uid;
732✔
368
  mndReleaseDb(pMnode, pSourceDb);
732✔
369

370
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
732✔
371

372
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
732✔
373
  if (pTargetDb == NULL) {
732!
374
    code = terrno;
×
375
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, tstrerror(code));
×
376
    goto FAIL;
×
377
  }
378

379
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
732✔
380

381
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
732✔
382
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
711✔
383
  } else {
384
    pObj->targetStbUid = pCreate->targetStbUid;
21✔
385
  }
386
  pObj->targetDbUid = pTargetDb->uid;
732✔
387
  mndReleaseDb(pMnode, pTargetDb);
732✔
388

389
  pObj->sql = pCreate->sql;
732✔
390
  pObj->ast = pCreate->ast;
732✔
391

392
  pCreate->sql = NULL;
732✔
393
  pCreate->ast = NULL;
732✔
394

395
  // deserialize ast
396
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
732!
397
    goto FAIL;
×
398
  }
399

400
  // create output schema
401
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
732!
402
    goto FAIL;
×
403
  }
404

405
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
732✔
406
  if (numOfNULL > 0) {
732✔
407
    pObj->outputSchema.nCols += numOfNULL;
6✔
408
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
6✔
409
    if (!pFullSchema) {
6!
410
      code = terrno;
×
411
      goto FAIL;
×
412
    }
413

414
    int32_t nullIndex = 0;
6✔
415
    int32_t dataIndex = 0;
6✔
416
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
32✔
417
      if (nullIndex >= numOfNULL) {
26!
418
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
419
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
420
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
421
        strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
×
422
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
423
        dataIndex++;
×
424
      } else {
425
        SColLocation *pos = NULL;
26✔
426
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
26!
427
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
26✔
428
        }
429

430
        if (pos == NULL) {
26!
431
          mError("invalid null column index, %d", nullIndex);
×
432
          continue;
×
433
        }
434

435
        if (i < pos->slotId) {
26✔
436
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
19✔
437
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
19✔
438
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
19✔
439
          strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
19✔
440
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
19✔
441
          dataIndex++;
19✔
442
        } else {
443
          pFullSchema[i].bytes = 0;
7✔
444
          pFullSchema[i].colId = pos->colId;
7✔
445
          pFullSchema[i].flags = COL_SET_NULL;
7✔
446
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
7✔
447
          pFullSchema[i].type = pos->type;
7✔
448
          nullIndex++;
7✔
449
        }
450
      }
451
    }
452

453
    taosMemoryFree(pObj->outputSchema.pSchema);
6✔
454
    pObj->outputSchema.pSchema = pFullSchema;
6✔
455
  }
456

457
  bool         hasKey = hasDestPrimaryKey(&pObj->outputSchema);
732✔
458
  SPlanContext cxt = {
732✔
459
      .pAstRoot = pAst,
460
      .topicQuery = false,
461
      .streamQuery = true,
462
      .triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
732✔
463
      .watermark = pObj->conf.watermark,
732✔
464
      .igExpired = pObj->conf.igExpired,
732✔
465
      .deleteMark = pObj->deleteMark,
732✔
466
      .igCheckUpdate = pObj->igCheckUpdate,
732✔
467
      .destHasPrimaryKey = hasKey,
468
  };
469

470
  // using ast and param to build physical plan
471
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
732!
UNCOV
472
    goto FAIL;
×
473
  }
474

475
  // save physcial plan
476
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
732!
UNCOV
477
    goto FAIL;
×
478
  }
479

480
  pObj->tagSchema.nCols = pCreate->numOfTags;
732✔
481
  if (pCreate->numOfTags) {
732✔
482
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
13✔
483
    if (pObj->tagSchema.pSchema == NULL) {
13!
484
      code = terrno;
×
UNCOV
485
      goto FAIL;
×
486
    }
487
  }
488

489
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
490
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
745✔
491
    SField *pField = taosArrayGet(pCreate->pTags, i);
13✔
492
    if (pField == NULL) {
13!
UNCOV
493
      continue;
×
494
    }
495

496
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
13✔
497
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
13✔
498
    pObj->tagSchema.pSchema[i].flags = pField->flags;
13✔
499
    pObj->tagSchema.pSchema[i].type = pField->type;
13✔
500
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
13✔
501
  }
502

503
FAIL:
732✔
504
  if (pAst != NULL) nodesDestroyNode(pAst);
732!
505
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
732!
506
  return code;
732✔
507
}
508

509
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
5,341✔
510
  SEncoder encoder;
511
  tEncoderInit(&encoder, NULL, 0);
5,341✔
512

513
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
5,341!
UNCOV
514
    pTask->ver = SSTREAM_TASK_VER;
×
515
  }
516

517
  int32_t code = tEncodeStreamTask(&encoder, pTask);
5,341✔
518
  if (code == -1) {
5,341!
519
    tEncoderClear(&encoder);
×
UNCOV
520
    return TSDB_CODE_INVALID_MSG;
×
521
  }
522

523
  int32_t size = encoder.pos;
5,341✔
524
  int32_t tlen = sizeof(SMsgHead) + size;
5,341✔
525
  tEncoderClear(&encoder);
5,341✔
526

527
  void *buf = taosMemoryCalloc(1, tlen);
5,341✔
528
  if (buf == NULL) {
5,341!
UNCOV
529
    return terrno;
×
530
  }
531

532
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
5,341✔
533

534
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
5,341✔
535
  tEncoderInit(&encoder, abuf, size);
5,341✔
536
  code = tEncodeStreamTask(&encoder, pTask);
5,341✔
537
  tEncoderClear(&encoder);
5,341✔
538

539
  if (code != 0) {
5,341!
540
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
541
    taosMemoryFree(buf);
×
UNCOV
542
    return code;
×
543
  }
544

545
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
5,341✔
546
  if (code) {
5,341!
UNCOV
547
    taosMemoryFree(buf);
×
548
  }
549

550
  return code;
5,341✔
551
}
552

553
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
753✔
554
  SStreamTaskIter *pIter = NULL;
753✔
555
  int32_t code = createStreamTaskIter(pStream, &pIter);
753✔
556
  if (code) {
753!
557
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
558
    return code;
×
559
  }
560

561
  while (streamTaskIterNextTask(pIter)) {
4,684✔
562
    SStreamTask *pTask = NULL;
3,931✔
563
    code = streamTaskIterGetCurrent(pIter, &pTask);
3,931✔
564
    if (code) {
3,931!
565
      destroyStreamTaskIter(pIter);
×
UNCOV
566
      return code;
×
567
    }
568

569
    code = mndPersistTaskDeployReq(pTrans, pTask);
3,931✔
570
    if (code) {
3,931!
571
      destroyStreamTaskIter(pIter);
×
UNCOV
572
      return code;
×
573
    }
574
  }
575

576
  destroyStreamTaskIter(pIter);
753✔
577

578
  // persistent stream task for already stored ts data
579
  if (pStream->conf.fillHistory) {
753✔
580
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
247✔
581

582
    for (int32_t i = 0; i < level; i++) {
765✔
583
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
518✔
584

585
      int32_t numOfTasks = taosArrayGetSize(pLevel);
518✔
586
      for (int32_t j = 0; j < numOfTasks; j++) {
1,928✔
587
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
1,410✔
588
        code = mndPersistTaskDeployReq(pTrans, pTask);
1,410✔
589
        if (code) {
1,410!
UNCOV
590
          return code;
×
591
        }
592
      }
593
    }
594
  }
595

596
  return code;
753✔
597
}
598

599
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
753✔
600
  int32_t code = 0;
753✔
601
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
753!
UNCOV
602
    return code;
×
603
  }
604

605
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
753✔
606
}
607

608
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
711✔
609
  SStbObj *pStb = NULL;
711✔
610
  SDbObj  *pDb = NULL;
711✔
611
  int32_t  code = 0;
711✔
612
  int32_t  lino = 0;
711✔
613

614
  SMCreateStbReq createReq = {0};
711✔
615
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
711✔
616
  createReq.numOfColumns = pStream->outputSchema.nCols;
711✔
617
  createReq.numOfTags = 1;  // group id
711✔
618
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
711✔
619
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
711!
620

621
  // build fields
622
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
9,804✔
623
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
9,093✔
624
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
9,093!
625

626
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
9,093✔
627
    pField->flags = pStream->outputSchema.pSchema[i].flags;
9,093✔
628
    pField->type = pStream->outputSchema.pSchema[i].type;
9,093✔
629
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
9,093✔
630
    pField->compress = createDefaultColCmprByType(pField->type);
9,093✔
631
  }
632

633
  if (pStream->tagSchema.nCols == 0) {
711✔
634
    createReq.numOfTags = 1;
698✔
635
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
698✔
636
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
698!
637

638
    // build tags
639
    SField *pField = taosArrayGet(createReq.pTags, 0);
698✔
640
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
698!
641

642
    strcpy(pField->name, "group_id");
698✔
643
    pField->type = TSDB_DATA_TYPE_UBIGINT;
698✔
644
    pField->flags = 0;
698✔
645
    pField->bytes = 8;
698✔
646
  } else {
647
    createReq.numOfTags = pStream->tagSchema.nCols;
13✔
648
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
13✔
649
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
13!
650

651
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
26✔
652
      SField *pField = taosArrayGet(createReq.pTags, i);
13✔
653
      if (pField == NULL) {
13!
UNCOV
654
        continue;
×
655
      }
656

657
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
13✔
658
      pField->flags = pStream->tagSchema.pSchema[i].flags;
13✔
659
      pField->type = pStream->tagSchema.pSchema[i].type;
13✔
660
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
13✔
661
    }
662
  }
663

664
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
711!
UNCOV
665
    goto _OVER;
×
666
  }
667

668
  pStb = mndAcquireStb(pMnode, createReq.name);
711✔
669
  if (pStb != NULL) {
711!
670
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
UNCOV
671
    goto _OVER;
×
672
  }
673

674
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
711✔
675
  if (pDb == NULL) {
711!
676
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
UNCOV
677
    goto _OVER;
×
678
  }
679

680
  int32_t numOfStbs = -1;
711✔
681
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
711!
UNCOV
682
    goto _OVER;
×
683
  }
684

685
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
711!
686
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
UNCOV
687
    goto _OVER;
×
688
  }
689

690
  SStbObj stbObj = {0};
711✔
691

692
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
711!
UNCOV
693
    goto _OVER;
×
694
  }
695

696
  stbObj.uid = pStream->targetStbUid;
711✔
697

698
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
711!
699
    mndFreeStb(&stbObj);
×
UNCOV
700
    goto _OVER;
×
701
  }
702

703
  tFreeSMCreateStbReq(&createReq);
711✔
704
  mndFreeStb(&stbObj);
711✔
705
  mndReleaseStb(pMnode, pStb);
711✔
706
  mndReleaseDb(pMnode, pDb);
711✔
707
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
711✔
708
  return code;
711✔
709

710
_OVER:
×
711
  tFreeSMCreateStbReq(&createReq);
×
712
  mndReleaseStb(pMnode, pStb);
×
UNCOV
713
  mndReleaseDb(pMnode, pDb);
×
714

UNCOV
715
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
716
         tstrerror(code));
UNCOV
717
  return code;
×
718
}
719

720
// 1. stream number check
721
// 2. target stable can not be target table of other existed streams.
722
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
732✔
723
  int32_t     numOfStream = 0;
732✔
724
  SStreamObj *pStream = NULL;
732✔
725
  void       *pIter = NULL;
732✔
726

727
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
1,902✔
728
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
1,171✔
729
      ++numOfStream;
776✔
730
    }
731

732
    sdbRelease(pMnode->pSdb, pStream);
1,171✔
733

734
    if (numOfStream > MND_STREAM_MAX_NUM) {
1,171!
UNCOV
735
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
736
             pStreamObj->name);
737
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
738
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
739
    }
740

741
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
1,171✔
742
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
1!
743
             pStreamObj->name);
744
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
745
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
1✔
746
    }
747
  }
748

749
  return TSDB_CODE_SUCCESS;
731✔
750
}
751

752
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
734✔
753
  SMnode     *pMnode = pReq->info.node;
734✔
754
  SStreamObj *pStream = NULL;
734✔
755
  SStreamObj  streamObj = {0};
734✔
756
  char       *sql = NULL;
734✔
757
  int32_t     sqlLen = 0;
734✔
758
  const char *pMsg = "create stream tasks on dnodes";
734✔
759
  int32_t     code = TSDB_CODE_SUCCESS;
734✔
760
  int32_t     lino = 0;
734✔
761
  STrans     *pTrans = NULL;
734✔
762

763
  SCMCreateStreamReq createReq = {0};
734✔
764
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
734✔
765
  TSDB_CHECK_CODE(code, lino, _OVER);
734!
766

767
#ifdef WINDOWS
768
  code = TSDB_CODE_MND_INVALID_PLATFORM;
769
  goto _OVER;
770
#endif
771

772
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
734!
773
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
734!
774
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
UNCOV
775
    goto _OVER;
×
776
  }
777

778
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
734✔
779
  if (pStream != NULL && code == 0) {
734!
780
    if (createReq.igExists) {
2✔
781
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
1!
782
      mndReleaseStream(pMnode, pStream);
1✔
783
      tFreeSCMCreateStreamReq(&createReq);
1✔
784
      return code;
1✔
785
    } else {
786
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
1✔
787
      goto _OVER;
1✔
788
    }
789
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
732!
UNCOV
790
    goto _OVER;
×
791
  }
792

793
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
732!
UNCOV
794
    goto _OVER;
×
795
  }
796

797
  if (createReq.sql != NULL) {
732!
798
    sql = taosStrdup(createReq.sql);
732✔
799
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
732!
800
  }
801

802
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
732✔
803
  if (pSourceDb == NULL) {
732!
804
    code = terrno;
×
UNCOV
805
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
806
          tstrerror(code));
UNCOV
807
    goto _OVER;
×
808
  }
809

810
  code = mndCheckForSnode(pMnode, pSourceDb);
732✔
811
  mndReleaseDb(pMnode, pSourceDb);
732✔
812
  if (code != 0) {
732!
UNCOV
813
    goto _OVER;
×
814
  }
815

816
  // build stream obj from request
817
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
732!
818
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
UNCOV
819
    goto _OVER;
×
820
  }
821

822
  code = doStreamCheck(pMnode, &streamObj);
732✔
823
  TSDB_CHECK_CODE(code, lino, _OVER);
732✔
824

825
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
731✔
826
  if (pTrans == NULL || code) {
731!
UNCOV
827
    goto _OVER;
×
828
  }
829

830
  // create stb for stream
831
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
731✔
832
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
711!
833
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
834
      mndTransDrop(pTrans);
×
UNCOV
835
      goto _OVER;
×
836
    }
837
  } else {
838
    mDebug("stream:%s no need create stable", createReq.name);
20✔
839
  }
840

841
  // schedule stream task for stream obj
842
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
731✔
843
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
731!
844
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
845
    mndTransDrop(pTrans);
×
UNCOV
846
    goto _OVER;
×
847
  }
848

849
  // add stream to trans
850
  code = mndPersistStream(pTrans, &streamObj);
731✔
851
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
731!
852
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
853
    mndTransDrop(pTrans);
×
UNCOV
854
    goto _OVER;
×
855
  }
856

857
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
731!
858
    mndTransDrop(pTrans);
×
UNCOV
859
    goto _OVER;
×
860
  }
861

862
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
731!
863
    mndTransDrop(pTrans);
×
UNCOV
864
    goto _OVER;
×
865
  }
866

867
  // add into buffer firstly
868
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
869
  streamMutexLock(&execInfo.lock);
731✔
870
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
731✔
871
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
731✔
872
  streamMutexUnlock(&execInfo.lock);
731✔
873

874
  // execute creation
875
  code = mndTransPrepare(pMnode, pTrans);
731✔
876
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
731!
877
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
878
    mndTransDrop(pTrans);
×
UNCOV
879
    goto _OVER;
×
880
  }
881

882
  mndTransDrop(pTrans);
731✔
883

884
  SName dbname = {0};
731✔
885
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
731✔
886
  if (code) {
731!
887
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
UNCOV
888
    goto _OVER;
×
889
  }
890

891
  SName name = {0};
731✔
892
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
731✔
893
  if (code) {
731!
894
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
UNCOV
895
    goto _OVER;
×
896
  }
897

898
  // reuse this function for stream
899
  if (sql != NULL && sqlLen > 0) {
731!
UNCOV
900
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
901
  } else {
902
    char detail[1000] = {0};
731✔
903
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
731✔
904
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
731✔
905
  }
906

907
_OVER:
733✔
908
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
733!
909
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
2!
910
  } else {
911
    mDebug("stream:%s create stream completed", createReq.name);
731✔
912
    code = TSDB_CODE_ACTION_IN_PROGRESS;
731✔
913
  }
914

915
  mndReleaseStream(pMnode, pStream);
733✔
916
  tFreeSCMCreateStreamReq(&createReq);
733✔
917
  tFreeStreamObj(&streamObj);
733✔
918

919
  if (sql != NULL) {
733✔
920
    taosMemoryFreeClear(sql);
732!
921
  }
922

923
  return code;
733✔
924
}
925

926
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
927
  SMnode          *pMnode = pReq->info.node;
×
928
  SStreamObj      *pStream = NULL;
×
929
  int32_t          code = 0;
×
UNCOV
930
  SMPauseStreamReq pauseReq = {0};
×
931

932
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
UNCOV
933
    return TSDB_CODE_INVALID_MSG;
×
934
  }
935

936
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
937
  if (pStream == NULL || code != 0) {
×
938
    if (pauseReq.igNotExists) {
×
939
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
UNCOV
940
      return 0;
×
941
    } else {
942
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
UNCOV
943
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
944
    }
945
  }
946

947
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
948
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
949
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
950
    return code;
×
951
  }
952

953
  // check if it is conflict with other trans in both sourceDb and targetDb.
954
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
955
  if (code) {
×
956
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
957
    return code;
×
958
  }
959

960
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
961
  if (updated) {
×
962
    mError("tasks are not ready for restart, node update detected");
×
963
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
964
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
965
  }
966

967
  STrans *pTrans = NULL;
×
968
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream", &pTrans);
×
969
  if (pTrans == NULL || code) {
×
970
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
971
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
972
    return code;
×
973
  }
974

975
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
976
  if (code) {
×
977
    sdbRelease(pMnode->pSdb, pStream);
×
978
    mndTransDrop(pTrans);
×
UNCOV
979
    return code;
×
980
  }
981

982
  // if nodeUpdate happened, not send pause trans
983
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
984
  if (code) {
×
985
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
986
    sdbRelease(pMnode->pSdb, pStream);
×
987
    mndTransDrop(pTrans);
×
UNCOV
988
    return code;
×
989
  }
990

991
  code = mndTransPrepare(pMnode, pTrans);
×
992
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
993
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
994
    sdbRelease(pMnode->pSdb, pStream);
×
995
    mndTransDrop(pTrans);
×
UNCOV
996
    return code;
×
997
  }
998

999
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1000
  mndTransDrop(pTrans);
×
1001

UNCOV
1002
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1003
}
1004

1005
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
771✔
1006
  SStreamObj *pStream = NULL;
771✔
1007
  void       *pIter = NULL;
771✔
1008
  SSdb       *pSdb = pMnode->pSdb;
771✔
1009
  int64_t     maxChkptId = 0;
771✔
1010

1011
  while (1) {
1012
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,482✔
1013
    if (pIter == NULL) break;
2,482✔
1014

1015
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
1,711✔
1016
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
1,711✔
1017
           pStream->checkpointId);
1018
    sdbRelease(pSdb, pStream);
1,711✔
1019
  }
1020

1021
  {  // check the max checkpoint id from all vnodes.
1022
    int64_t maxCheckpointId = -1;
771✔
1023
    if (lock) {
771✔
1024
      streamMutexLock(&execInfo.lock);
535✔
1025
    }
1026

1027
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
7,149✔
1028
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
6,378✔
1029
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
6,378✔
1030
      if (p == NULL || pEntry == NULL) {
6,378!
UNCOV
1031
        continue;
×
1032
      }
1033

1034
      if (pEntry->checkpointInfo.failed) {
6,378!
UNCOV
1035
        continue;
×
1036
      }
1037

1038
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
6,378✔
1039
        maxCheckpointId = pEntry->checkpointInfo.latestId;
1,033✔
1040
      }
1041
    }
1042

1043
    if (lock) {
771✔
1044
      streamMutexUnlock(&execInfo.lock);
535✔
1045
    }
1046

1047
    if (maxCheckpointId > maxChkptId) {
771!
UNCOV
1048
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1049
             maxCheckpointId);
UNCOV
1050
      maxChkptId = maxCheckpointId;
×
1051
    }
1052
  }
1053

1054
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
771✔
1055
  return maxChkptId + 1;
771✔
1056
}
1057

1058
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
771✔
1059
                                               int8_t mndTrigger, bool lock) {
1060
  int32_t code = TSDB_CODE_SUCCESS;
771✔
1061
  bool    conflict = false;
771✔
1062
  int64_t ts = taosGetTimestampMs();
771✔
1063
  STrans *pTrans = NULL;
771✔
1064

1065
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
771!
UNCOV
1066
    return code;
×
1067
  }
1068

1069
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
771✔
1070
  if (code) {
771!
UNCOV
1071
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1072
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
UNCOV
1073
    goto _ERR;
×
1074
  }
1075

1076
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
771✔
1077
                       "gen checkpoint for stream", &pTrans);
1078
  if (code) {
771!
UNCOV
1079
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1080
           tstrerror(code));
UNCOV
1081
    goto _ERR;
×
1082
  }
1083

1084
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
771✔
1085
  if (code) {
771!
1086
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
UNCOV
1087
    goto _ERR;
×
1088
  }
1089

1090
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
771✔
1091

1092
  taosWLockLatch(&pStream->lock);
771✔
1093
  pStream->currentTick = 1;
771✔
1094

1095
  // 1. redo action: broadcast checkpoint source msg for all source vg
1096
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
771✔
1097
  for (int32_t i = 0; i < totalLevel; i++) {
2,059✔
1098
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
1,288✔
1099
    SStreamTask *p = taosArrayGetP(pLevel, 0);
1,288✔
1100

1101
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
1,288✔
1102
      int32_t sz = taosArrayGetSize(pLevel);
771✔
1103
      for (int32_t j = 0; j < sz; j++) {
2,259✔
1104
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
1,488✔
1105
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
1,488✔
1106

1107
        if (code != TSDB_CODE_SUCCESS) {
1,488!
1108
          taosWUnLockLatch(&pStream->lock);
×
UNCOV
1109
          goto _ERR;
×
1110
        }
1111
      }
1112
    }
1113
  }
1114

1115
  // 2. reset tick
1116
  pStream->checkpointId = checkpointId;
771✔
1117
  pStream->checkpointFreq = taosGetTimestampMs();
771✔
1118
  pStream->currentTick = 0;
771✔
1119

1120
  // 3. commit log: stream checkpoint info
1121
  pStream->version = pStream->version + 1;
771✔
1122
  taosWUnLockLatch(&pStream->lock);
771✔
1123

1124
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
771!
UNCOV
1125
    goto _ERR;
×
1126
  }
1127

1128
  code = mndTransPrepare(pMnode, pTrans);
771✔
1129
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
771!
UNCOV
1130
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1131
  } else {
1132
    code = TSDB_CODE_ACTION_IN_PROGRESS;
771✔
1133
  }
1134

1135
_ERR:
771✔
1136
  mndTransDrop(pTrans);
771✔
1137
  return code;
771✔
1138
}
1139

1140
int32_t extractStreamNodeList(SMnode *pMnode) {
2,265✔
1141
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
2,265✔
1142
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
658✔
1143
    if (code) {
658!
1144
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
1145
      return code;
×
1146
    }
1147
  }
1148

1149
  return taosArrayGetSize(execInfo.pNodeList);
2,265✔
1150
}
1151

1152
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
1,286✔
1153
  bool ready = true;
1,286✔
1154
  if (mndStreamNodeIsUpdated(pMnode)) {
1,286✔
1155
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
6✔
1156
  }
1157

1158
  streamMutexLock(&execInfo.lock);
1,280✔
1159
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
1,280✔
1160
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
658✔
1161
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
658!
1162
      streamMutexUnlock(&execInfo.lock);
×
1163
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
UNCOV
1164
      return TSDB_CODE_FAILED;
×
1165
    }
1166
  }
1167

1168
  SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
1,280✔
1169

1170
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
5,099✔
1171
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
3,857✔
1172
    if (p == NULL) {
3,857!
UNCOV
1173
      continue;
×
1174
    }
1175

1176
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
3,857✔
1177
    if (pEntry == NULL) {
3,857!
UNCOV
1178
      continue;
×
1179
    }
1180

1181
    if (pEntry->status == TASK_STATUS__STOP) {
3,857✔
1182
      for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
33!
1183
        STaskId *pId = taosArrayGet(pInvalidList, j);
×
1184
        if (pId == NULL) {
×
UNCOV
1185
          continue;
×
1186
        }
1187

1188
        if (pEntry->id.streamId == pId->streamId) {
×
1189
          void *px = taosArrayPush(pInvalidList, &pEntry->id);
×
1190
          if (px == NULL) {
×
UNCOV
1191
            mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
1192
          }
UNCOV
1193
          break;
×
1194
        }
1195
      }
1196
    }
1197

1198
    if (pEntry->status != TASK_STATUS__READY) {
3,857✔
1199
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
36✔
1200
             (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1201
      ready = false;
36✔
1202
      break;
36✔
1203
    }
1204

1205
    if (pEntry->hTaskId != 0) {
3,821✔
1206
      mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
2!
1207
             " exists, checkpoint not issued",
1208
             pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1209
             pEntry->hTaskId);
1210
      ready = false;
2✔
1211
      break;
2✔
1212
    }
1213
  }
1214

1215
  removeTasksInBuf(pInvalidList, &execInfo);
1,280✔
1216
  taosArrayDestroy(pInvalidList);
1,280✔
1217

1218
  streamMutexUnlock(&execInfo.lock);
1,280✔
1219
  return ready ? 0 : -1;
1,280✔
1220
}
1221

1222
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
680✔
1223
  int64_t ts = -1;
680✔
1224
  int32_t taskId = -1;
680✔
1225

1226
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
9,673✔
1227
    STaskId          *p = taosArrayGet(pTaskList, i);
8,993✔
1228
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
8,993✔
1229
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
8,993!
1230
      continue;
6,828✔
1231
    }
1232

1233
    if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
2,165!
1234
      ts = pEntry->startTime;
1,197✔
1235
      taskId = pEntry->id.taskId;
1,197✔
1236
    }
1237
  }
1238

1239
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
680✔
1240
  return ts;
680✔
1241
}
1242

1243
typedef struct {
1244
  int64_t streamId;
1245
  int64_t duration;
1246
} SCheckpointInterval;
1247

1248
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
50✔
1249
  const SCheckpointInterval *pInt1 = p1;
50✔
1250
  const SCheckpointInterval *pInt2 = p2;
50✔
1251
  if (pInt1->duration == pInt2->duration) {
50✔
1252
    return 0;
45✔
1253
  }
1254

1255
  return pInt1->duration > pInt2->duration ? -1 : 1;
5✔
1256
}
1257

1258
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
1,286✔
1259
  SMnode     *pMnode = pReq->info.node;
1,286✔
1260
  SSdb       *pSdb = pMnode->pSdb;
1,286✔
1261
  void       *pIter = NULL;
1,286✔
1262
  SStreamObj *pStream = NULL;
1,286✔
1263
  int32_t     code = 0;
1,286✔
1264
  int32_t     numOfCheckpointTrans = 0;
1,286✔
1265

1266
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
1,286✔
1267
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
44✔
1268
  }
1269

1270
  SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
1,242✔
1271
  if (pList == NULL) {
1,242!
UNCOV
1272
    return terrno;
×
1273
  }
1274

1275
  int64_t now = taosGetTimestampMs();
1,242✔
1276

1277
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
2,473✔
1278
    int64_t duration = now - pStream->checkpointFreq;
1,231✔
1279
    if (duration < tsStreamCheckpointInterval * 1000) {
1,231✔
1280
      sdbRelease(pSdb, pStream);
551✔
1281
      continue;
665✔
1282
    }
1283

1284
    streamMutexLock(&execInfo.lock);
680✔
1285
    int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
680✔
1286
    if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
680!
1287
      streamMutexUnlock(&execInfo.lock);
114✔
1288
      sdbRelease(pSdb, pStream);
114✔
1289
      continue;
114✔
1290
    }
1291
    streamMutexUnlock(&execInfo.lock);
566✔
1292

1293
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
566✔
1294
    void* p = taosArrayPush(pList, &in);
566✔
1295
    if (p) {
566!
1296
      int32_t currentSize = taosArrayGetSize(pList);
566✔
1297
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
566✔
1298
             "s), concurrently launch threshold:%d",
1299
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1300
             tsMaxConcurrentCheckpoint);
1301
    } else {
UNCOV
1302
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1303
    }
1304
    sdbRelease(pSdb, pStream);
566✔
1305
  }
1306

1307
  int32_t size = taosArrayGetSize(pList);
1,242✔
1308
  if (size == 0) {
1,242✔
1309
    taosArrayDestroy(pList);
707✔
1310
    return code;
707✔
1311
  }
1312

1313
  taosArraySort(pList, streamWaitComparFn);
535✔
1314
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
535✔
1315
  if (code) {
535!
1316
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1317
    taosArrayDestroy(pList);
×
UNCOV
1318
    return code;
×
1319
  }
1320

1321
  int32_t numOfQual = taosArrayGetSize(pList);
535✔
1322
  if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
535!
UNCOV
1323
    mDebug(
×
1324
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1325
        "checkpoint trans are not allowed, wait for 30s",
1326
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1327
    taosArrayDestroy(pList);
×
UNCOV
1328
    return code;
×
1329
  }
1330

1331
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
535✔
1332
  mDebug(
535✔
1333
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1334
      "concurrent trans threshold:%d",
1335
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1336

1337
  int32_t started = 0;
535✔
1338
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
535✔
1339

1340
  for (int32_t i = 0; i < numOfQual; ++i) {
535!
1341
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
535✔
1342
    if (pCheckpointInfo == NULL) {
535!
UNCOV
1343
      continue;
×
1344
    }
1345

1346
    SStreamObj *p = NULL;
535✔
1347
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
535✔
1348
    if (p != NULL && code == 0) {
535!
1349
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
535✔
1350
      sdbRelease(pSdb, p);
535✔
1351

1352
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
535!
1353
        started += 1;
535✔
1354

1355
        if (started >= capacity) {
535!
1356
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
535✔
1357
                 (started + numOfCheckpointTrans));
1358
          break;
535✔
1359
        }
1360
      } else {
UNCOV
1361
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1362
      }
1363
    }
1364
  }
1365

1366
  taosArrayDestroy(pList);
535✔
1367
  return code;
535✔
1368
}
1369

1370
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
636✔
1371
  SMnode *    pMnode = pReq->info.node;
636✔
1372
  SStreamObj *pStream = NULL;
636✔
1373
  int32_t     code = 0;
636✔
1374

1375
  SMDropStreamReq dropReq = {0};
636✔
1376
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
636!
1377
    mError("invalid drop stream msg recv, discarded");
×
1378
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1379
    TAOS_RETURN(code);
×
1380
  }
1381

1382
  mDebug("recv drop stream:%s msg", dropReq.name);
636✔
1383

1384
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
636✔
1385
  if (pStream == NULL || code != 0) {
636!
1386
    if (dropReq.igNotExists) {
112!
1387
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
112!
1388
      sdbRelease(pMnode->pSdb, pStream);
112✔
1389
      tFreeMDropStreamReq(&dropReq);
112✔
1390
      return 0;
112✔
1391
    } else {
1392
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
1393
      tFreeMDropStreamReq(&dropReq);
×
UNCOV
1394
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1395
    }
1396
  }
1397

1398
  if (pStream->smaId != 0) {
524!
UNCOV
1399
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
×
1400

1401
    void *   pIter = NULL;
×
1402
    SSmaObj *pSma = NULL;
×
1403
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1404
    while (pIter) {
×
1405
      if (pSma && pSma->uid == pStream->smaId) {
×
1406
        sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
1407
        sdbRelease(pMnode->pSdb, pStream);
×
1408

1409
        sdbCancelFetch(pMnode->pSdb, pIter);
×
1410
        tFreeMDropStreamReq(&dropReq);
×
UNCOV
1411
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
1412

UNCOV
1413
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
×
1414
               dropReq.name, pStream->uid, tstrerror(terrno));
UNCOV
1415
        TAOS_RETURN(code);
×
1416
      }
1417

1418
      if (pSma) {
×
UNCOV
1419
        sdbRelease(pMnode->pSdb, pSma);
×
1420
      }
1421

UNCOV
1422
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1423
    }
1424
  }
1425

1426
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
524!
1427
    sdbRelease(pMnode->pSdb, pStream);
×
1428
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1429
    return -1;
×
1430
  }
1431

1432
  // check if it is conflict with other trans in both sourceDb and targetDb.
1433
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
524✔
1434
  if (code) {
524!
1435
    sdbRelease(pMnode->pSdb, pStream);
×
1436
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1437
    return code;
×
1438
  }
1439

1440
  STrans *pTrans = NULL;
524✔
1441
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
524✔
1442
  if (pTrans == NULL || code) {
524!
1443
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1444
    sdbRelease(pMnode->pSdb, pStream);
×
1445
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1446
    TAOS_RETURN(code);
×
1447
  }
1448

1449
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
524✔
1450
  if (code) {
524!
1451
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1452
    sdbRelease(pMnode->pSdb, pStream);
×
1453
    mndTransDrop(pTrans);
×
1454
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1455
    TAOS_RETURN(code);
×
1456
  }
1457

1458
  // drop all tasks
1459
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
524✔
1460
  if (code) {
524!
1461
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1462
    sdbRelease(pMnode->pSdb, pStream);
×
1463
    mndTransDrop(pTrans);
×
1464
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1465
    TAOS_RETURN(code);
×
1466
  }
1467

1468
  // drop stream
1469
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
524✔
1470
  if (code) {
524!
1471
    sdbRelease(pMnode->pSdb, pStream);
×
1472
    mndTransDrop(pTrans);
×
1473
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1474
    TAOS_RETURN(code);
×
1475
  }
1476

1477
  code = mndTransPrepare(pMnode, pTrans);
524✔
1478
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
524!
1479
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1480
    sdbRelease(pMnode->pSdb, pStream);
×
1481
    mndTransDrop(pTrans);
×
1482
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1483
    TAOS_RETURN(code);
×
1484
  }
1485

1486
  // kill the related checkpoint trans
1487
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
524✔
1488
  if (transId != 0) {
524!
1489
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
UNCOV
1490
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1491
  }
1492

1493
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
524✔
1494
         pStream->uid, transId);
1495

1496
  removeStreamTasksInBuf(pStream, &execInfo);
524✔
1497

1498
  SName name = {0};
524✔
1499
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
524✔
1500
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
524✔
1501

1502
  sdbRelease(pMnode->pSdb, pStream);
524✔
1503
  mndTransDrop(pTrans);
524✔
1504
  tFreeMDropStreamReq(&dropReq);
524✔
1505

1506
  if (code == 0) {
524!
1507
    return TSDB_CODE_ACTION_IN_PROGRESS;
524✔
1508
  } else {
UNCOV
1509
    TAOS_RETURN(code);
×
1510
  }
1511
}
1512

1513
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
630✔
1514
  SSdb   *pSdb = pMnode->pSdb;
630✔
1515
  void   *pIter = NULL;
630✔
1516
  int32_t code = 0;
630✔
1517

1518
  while (1) {
100✔
1519
    SStreamObj *pStream = NULL;
730✔
1520
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
730✔
1521
    if (pIter == NULL) break;
730✔
1522

1523
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
101✔
1524
      if (pStream->sourceDbUid != pStream->targetDbUid) {
34✔
1525
        sdbRelease(pSdb, pStream);
1✔
1526
        sdbCancelFetch(pSdb, pIter);
1✔
1527
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
1!
1528
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1529
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
1✔
1530
      } else {
1531
        // kill the related checkpoint trans
1532
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
33✔
1533
        if (transId != 0) {
33!
1534
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
UNCOV
1535
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1536
        }
1537

1538
        // drop the stream obj in execInfo
1539
        removeStreamTasksInBuf(pStream, &execInfo);
33✔
1540

1541
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
33✔
1542
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
33!
1543
          sdbRelease(pSdb, pStream);
×
1544
          sdbCancelFetch(pSdb, pIter);
×
UNCOV
1545
          return code;
×
1546
        }
1547
      }
1548
    }
1549

1550
    sdbRelease(pSdb, pStream);
100✔
1551
  }
1552

1553
  return 0;
629✔
1554
}
1555

1556
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
153✔
1557
  SMnode     *pMnode = pReq->info.node;
153✔
1558
  SSdb       *pSdb = pMnode->pSdb;
153✔
1559
  int32_t     numOfRows = 0;
153✔
1560
  SStreamObj *pStream = NULL;
153✔
1561
  int32_t     code = 0;
153✔
1562

1563
  while (numOfRows < rows) {
507!
1564
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
507✔
1565
    if (pShow->pIter == NULL) break;
507✔
1566

1567
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
354✔
1568
    if (code == 0) {
354!
1569
      numOfRows++;
354✔
1570
    }
1571
    sdbRelease(pSdb, pStream);
354✔
1572
  }
1573

1574
  pShow->numOfRows += numOfRows;
153✔
1575
  return numOfRows;
153✔
1576
}
1577

1578
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1579
  SSdb *pSdb = pMnode->pSdb;
×
1580
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
1581
}
×
1582

1583
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
1,674✔
1584
  SMnode     *pMnode = pReq->info.node;
1,674✔
1585
  SSdb       *pSdb = pMnode->pSdb;
1,674✔
1586
  int32_t     numOfRows = 0;
1,674✔
1587
  SStreamObj *pStream = NULL;
1,674✔
1588
  int32_t     code = 0;
1,674✔
1589

1590
  streamMutexLock(&execInfo.lock);
1,674✔
1591
  mndInitStreamExecInfo(pMnode, &execInfo);
1,674✔
1592
  streamMutexUnlock(&execInfo.lock);
1,674✔
1593

1594
  while (numOfRows < rowsCapacity) {
6,689✔
1595
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
6,653✔
1596
    if (pShow->pIter == NULL) {
6,653✔
1597
      break;
1,638✔
1598
    }
1599

1600
    // lock
1601
    taosRLockLatch(&pStream->lock);
5,015✔
1602

1603
    int32_t count = mndGetNumOfStreamTasks(pStream);
5,015✔
1604
    if (numOfRows + count > rowsCapacity) {
5,015✔
1605
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
29✔
1606
      if (code) {
29!
1607
        mError("failed to prepare the result block buffer, quit return value");
×
1608
        taosRUnLockLatch(&pStream->lock);
×
1609
        sdbRelease(pSdb, pStream);
×
UNCOV
1610
        continue;
×
1611
      }
1612
    }
1613

1614
    // add row for each task
1615
    SStreamTaskIter *pIter = NULL;
5,015✔
1616
    code = createStreamTaskIter(pStream, &pIter);
5,015✔
1617
    if (code) {
5,015!
1618
      taosRUnLockLatch(&pStream->lock);
×
1619
      sdbRelease(pSdb, pStream);
×
1620
      mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
1621
      continue;
×
1622
    }
1623

1624
    while (streamTaskIterNextTask(pIter)) {
27,424✔
1625
      SStreamTask *pTask = NULL;
22,409✔
1626
      code = streamTaskIterGetCurrent(pIter, &pTask);
22,409✔
1627
      if (code) {
22,409!
1628
        destroyStreamTaskIter(pIter);
×
UNCOV
1629
        break;
×
1630
      }
1631

1632
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
22,409✔
1633
      if (code == TSDB_CODE_SUCCESS) {
22,409!
1634
        numOfRows++;
22,409✔
1635
      }
1636
    }
1637

1638
    pBlock->info.rows = numOfRows;
5,015✔
1639

1640
    destroyStreamTaskIter(pIter);
5,015✔
1641
    taosRUnLockLatch(&pStream->lock);
5,015✔
1642

1643
    sdbRelease(pSdb, pStream);
5,015✔
1644
  }
1645

1646
  pShow->numOfRows += numOfRows;
1,674✔
1647
  return numOfRows;
1,674✔
1648
}
1649

1650
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
2✔
1651
  SSdb *pSdb = pMnode->pSdb;
2✔
1652
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
2✔
1653
}
2✔
1654

1655
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
22✔
1656
  SMnode     *pMnode = pReq->info.node;
22✔
1657
  SStreamObj *pStream = NULL;
22✔
1658
  int32_t     code = 0;
22✔
1659

1660
  SMPauseStreamReq pauseReq = {0};
22✔
1661
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
22!
UNCOV
1662
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1663
  }
1664

1665
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
22✔
1666
  if (pStream == NULL || code != 0) {
22!
1667
    if (pauseReq.igNotExists) {
2✔
1668
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
1!
1669
      return 0;
1✔
1670
    } else {
1671
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
1!
1672
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1673
    }
1674
  }
1675

1676
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
20!
1677

1678
  if (pStream->status == STREAM_STATUS__PAUSE) {
20!
1679
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1680
    return 0;
×
1681
  }
1682

1683
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
20!
1684
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1685
    return code;
×
1686
  }
1687

1688
  // check if it is conflict with other trans in both sourceDb and targetDb.
1689
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
20✔
1690
  if (code) {
20!
1691
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1692
    TAOS_RETURN(code);
×
1693
  }
1694

1695
  bool updated = mndStreamNodeIsUpdated(pMnode);
20✔
1696
  if (updated) {
20!
1697
    mError("tasks are not ready for pause, node update detected");
×
1698
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1699
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1700
  }
1701

1702
  {  // check for tasks, if tasks are not ready, not allowed to pause
1703
    bool found = false;
20✔
1704
    bool readyToPause = true;
20✔
1705
    streamMutexLock(&execInfo.lock);
20✔
1706

1707
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
340✔
1708
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
320✔
1709
      if (p == NULL) {
320!
UNCOV
1710
        continue;
×
1711
      }
1712

1713
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
320✔
1714
      if (pEntry == NULL) {
320!
UNCOV
1715
        continue;
×
1716
      }
1717

1718
      if (pEntry->id.streamId != pStream->uid) {
320✔
1719
        continue;
193✔
1720
      }
1721

1722
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
127!
UNCOV
1723
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
×
1724
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
UNCOV
1725
        readyToPause = false;
×
1726
      }
1727

1728
      found = true;
127✔
1729
    }
1730

1731
    streamMutexUnlock(&execInfo.lock);
20✔
1732
    if (!found) {
20!
1733
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1734
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1735
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1736
    }
1737

1738
    if (!readyToPause) {
20!
UNCOV
1739
      mError("stream:%s task not ready for pause yet", pauseReq.name);
×
UNCOV
1740
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1741
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1742
    }
1743
  }
1744

1745
  STrans *pTrans = NULL;
20✔
1746
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
20✔
1747
  if (pTrans == NULL || code) {
20!
1748
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1749
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1750
    return code;
×
1751
  }
1752

1753
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
20✔
1754
  if (code) {
20!
1755
    sdbRelease(pMnode->pSdb, pStream);
×
1756
    mndTransDrop(pTrans);
×
UNCOV
1757
    return code;
×
1758
  }
1759

1760
  // if nodeUpdate happened, not send pause trans
1761
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
20✔
1762
  if (code) {
20!
1763
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1764
    sdbRelease(pMnode->pSdb, pStream);
×
1765
    mndTransDrop(pTrans);
×
UNCOV
1766
    return code;
×
1767
  }
1768

1769
  // pause stream
1770
  taosWLockLatch(&pStream->lock);
20✔
1771
  pStream->status = STREAM_STATUS__PAUSE;
20✔
1772
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
20✔
1773
  if (code) {
20!
1774
    taosWUnLockLatch(&pStream->lock);
×
1775
    sdbRelease(pMnode->pSdb, pStream);
×
1776
    mndTransDrop(pTrans);
×
UNCOV
1777
    return code;
×
1778
  }
1779

1780
  taosWUnLockLatch(&pStream->lock);
20✔
1781

1782
  code = mndTransPrepare(pMnode, pTrans);
20✔
1783
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
20!
1784
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1785
    sdbRelease(pMnode->pSdb, pStream);
×
1786
    mndTransDrop(pTrans);
×
UNCOV
1787
    return code;
×
1788
  }
1789

1790
  sdbRelease(pMnode->pSdb, pStream);
20✔
1791
  mndTransDrop(pTrans);
20✔
1792

1793
  return TSDB_CODE_ACTION_IN_PROGRESS;
20✔
1794
}
1795

1796
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
21✔
1797
  SMnode     *pMnode = pReq->info.node;
21✔
1798
  SStreamObj *pStream = NULL;
21✔
1799
  int32_t     code = 0;
21✔
1800

1801
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
21!
UNCOV
1802
    return code;
×
1803
  }
1804

1805
  SMResumeStreamReq resumeReq = {0};
21✔
1806
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
21!
UNCOV
1807
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1808
  }
1809

1810
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
21✔
1811
  if (pStream == NULL || code != 0) {
21!
1812
    if (resumeReq.igNotExists) {
2✔
1813
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
1!
1814
      sdbRelease(pMnode->pSdb, pStream);
1✔
1815
      return 0;
1✔
1816
    } else {
1817
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
1!
1818
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1✔
1819
    }
1820
  }
1821

1822
  if (pStream->status != STREAM_STATUS__PAUSE) {
19!
UNCOV
1823
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1824
    return 0;
×
1825
  }
1826

1827
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
19!
1828
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
19!
1829
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1830
    return -1;
×
1831
  }
1832

1833
  // check if it is conflict with other trans in both sourceDb and targetDb.
1834
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
19✔
1835
  if (code) {
19!
1836
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1837
    return code;
×
1838
  }
1839

1840
  STrans *pTrans = NULL;
19✔
1841
  code =
1842
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
19✔
1843
  if (pTrans == NULL || code) {
19!
1844
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
1845
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1846
    return code;
×
1847
  }
1848

1849
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
19✔
1850
  if (code) {
19!
1851
    sdbRelease(pMnode->pSdb, pStream);
×
1852
    mndTransDrop(pTrans);
×
UNCOV
1853
    return code;
×
1854
  }
1855

1856
  // set the resume action
1857
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
19✔
1858
  if (code) {
19!
1859
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
1860
    sdbRelease(pMnode->pSdb, pStream);
×
1861
    mndTransDrop(pTrans);
×
UNCOV
1862
    return code;
×
1863
  }
1864

1865
  // resume stream
1866
  taosWLockLatch(&pStream->lock);
19✔
1867
  pStream->status = STREAM_STATUS__NORMAL;
19✔
1868
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
19!
UNCOV
1869
    taosWUnLockLatch(&pStream->lock);
×
1870

1871
    sdbRelease(pMnode->pSdb, pStream);
×
1872
    mndTransDrop(pTrans);
×
UNCOV
1873
    return code;
×
1874
  }
1875

1876
  taosWUnLockLatch(&pStream->lock);
19✔
1877
  code = mndTransPrepare(pMnode, pTrans);
19✔
1878
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
19!
1879
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1880
    sdbRelease(pMnode->pSdb, pStream);
×
1881
    mndTransDrop(pTrans);
×
UNCOV
1882
    return code;
×
1883
  }
1884

1885
  sdbRelease(pMnode->pSdb, pStream);
19✔
1886
  mndTransDrop(pTrans);
19✔
1887

1888
  return TSDB_CODE_ACTION_IN_PROGRESS;
19✔
1889
}
1890

UNCOV
1891
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
×
UNCOV
1892
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1893
  SStreamObj *pStream = NULL;
×
UNCOV
1894
  void       *pIter = NULL;
×
UNCOV
1895
  STrans     *pTrans = NULL;
×
UNCOV
1896
  int32_t     code = 0;
×
1897

1898
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
1899
  while (1) {
UNCOV
1900
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
1901
    if (pIter == NULL) {
×
UNCOV
1902
      break;
×
1903
    }
1904

UNCOV
1905
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
×
UNCOV
1906
    sdbRelease(pSdb, pStream);
×
1907

1908
    if (code) {
×
1909
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
1910
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1911
      return code;
×
1912
    }
1913
  }
1914

1915
  while (1) {
UNCOV
1916
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
1917
    if (pIter == NULL) {
×
UNCOV
1918
      break;
×
1919
    }
1920

1921
    // here create only one trans
UNCOV
1922
    if (pTrans == NULL) {
×
UNCOV
1923
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets", &pTrans);
×
1924
      if (pTrans == NULL || code) {
×
1925
        sdbRelease(pSdb, pStream);
×
1926
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1927
        return terrno = code;
×
1928
      }
1929

UNCOV
1930
      code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
×
1931
      if (code) {
×
UNCOV
1932
        mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
1933
      }
1934
    }
1935

UNCOV
1936
    if (!includeAllNodes) {
×
UNCOV
1937
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
×
UNCOV
1938
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
×
1939
      if (p1 == NULL && p2 == NULL) {
×
1940
        mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
×
1941
        sdbRelease(pSdb, pStream);
×
UNCOV
1942
        continue;
×
1943
      }
1944
    }
1945

UNCOV
1946
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
×
1947
           pStream->name, pTrans->id);
1948

UNCOV
1949
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
×
1950

1951
    // todo: not continue, drop all and retry again
1952
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1953
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
1954
             tstrerror(code));
1955
      sdbRelease(pSdb, pStream);
×
UNCOV
1956
      continue;
×
1957
    }
1958

UNCOV
1959
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
UNCOV
1960
    sdbRelease(pSdb, pStream);
×
1961

1962
    if (code != TSDB_CODE_SUCCESS) {
×
1963
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1964
      return code;
×
1965
    }
1966
  }
1967

1968
  // no need to build the trans to handle the vgroup update
1969
  if (pTrans == NULL) {
×
UNCOV
1970
    return 0;
×
1971
  }
1972

UNCOV
1973
  code = mndTransPrepare(pMnode, pTrans);
×
1974
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1975
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
1976
    sdbRelease(pMnode->pSdb, pStream);
×
1977
    mndTransDrop(pTrans);
×
UNCOV
1978
    return code;
×
1979
  }
1980

UNCOV
1981
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1982
  mndTransDrop(pTrans);
×
UNCOV
1983
  return code;
×
1984
}
1985

1986
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
658✔
1987
  SSdb       *pSdb = pMnode->pSdb;
658✔
1988
  SStreamObj *pStream = NULL;
658✔
1989
  void       *pIter = NULL;
658✔
1990
  int32_t     code = 0;
658✔
1991

1992
  mDebug("start to refresh node list by existed streams");
658✔
1993

1994
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
658✔
1995
  if (pHash == NULL) {
658!
UNCOV
1996
    return terrno;
×
1997
  }
1998

UNCOV
1999
  while (1) {
×
2000
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
658✔
2001
    if (pIter == NULL) {
658!
2002
      break;
658✔
2003
    }
2004

UNCOV
2005
    taosWLockLatch(&pStream->lock);
×
2006

UNCOV
2007
    SStreamTaskIter *pTaskIter = NULL;
×
UNCOV
2008
    code = createStreamTaskIter(pStream, &pTaskIter);
×
2009
    if (code) {
×
2010
      taosWUnLockLatch(&pStream->lock);
×
2011
      sdbRelease(pSdb, pStream);
×
2012
      mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
2013
      continue;
×
2014
    }
2015

UNCOV
2016
    while (streamTaskIterNextTask(pTaskIter)) {
×
UNCOV
2017
      SStreamTask *pTask = NULL;
×
UNCOV
2018
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
×
2019
      if (code) {
×
UNCOV
2020
        break;
×
2021
      }
2022

UNCOV
2023
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
×
UNCOV
2024
      epsetAssign(&entry.epset, &pTask->info.epSet);
×
UNCOV
2025
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
×
2026
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
×
UNCOV
2027
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2028
      }
2029
    }
2030

UNCOV
2031
    destroyStreamTaskIter(pTaskIter);
×
UNCOV
2032
    taosWUnLockLatch(&pStream->lock);
×
2033

UNCOV
2034
    sdbRelease(pSdb, pStream);
×
2035
  }
2036

2037
  taosArrayClear(pNodeList);
658✔
2038

2039
  // convert to list
2040
  pIter = NULL;
658✔
2041
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
658!
UNCOV
2042
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
×
2043

UNCOV
2044
    void *p = taosArrayPush(pNodeList, pEntry);
×
2045
    if (p == NULL) {
×
2046
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2047
      if (code == 0) {
×
UNCOV
2048
        code = terrno;
×
2049
      }
UNCOV
2050
      continue;
×
2051
    }
2052

UNCOV
2053
    char buf[256] = {0};
×
UNCOV
2054
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
×
2055
    if (ret != 0) {  // print error and continue
×
UNCOV
2056
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2057
    }
2058

UNCOV
2059
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
×
2060
  }
2061

2062
  taosHashCleanup(pHash);
658✔
2063

2064
  mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
658✔
2065
  return code;
658✔
2066
}
2067

2068
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2069
  void   *pIter = NULL;
×
2070
  int32_t code = 0;
×
2071
  while (1) {
×
2072
    SVgObj *pVgroup = NULL;
×
2073
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2074
    if (pIter == NULL) {
×
UNCOV
2075
      break;
×
2076
    }
2077

2078
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
UNCOV
2079
    sdbRelease(pSdb, pVgroup);
×
2080

2081
    if (code == 0) {
×
2082
      int32_t size = taosHashGetSize(pDBMap);
×
UNCOV
2083
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2084
    }
2085
  }
UNCOV
2086
}
×
2087

2088
// this function runs by only one thread, so it is not multi-thread safe
2089
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
959✔
2090
  int32_t code = 0;
959✔
2091
  bool    allReady = true;
959✔
2092
  SArray *pNodeSnapshot = NULL;
959✔
2093
  SMnode *pMnode = pMsg->info.node;
959✔
2094
  int64_t ts = taosGetTimestampSec();
959✔
2095
  bool    updateAllVgroups = false;
959✔
2096

2097
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
959✔
2098
  if (old != 0) {
959!
2099
    mDebug("still in checking node change");
×
UNCOV
2100
    return 0;
×
2101
  }
2102

2103
  mDebug("start to do node changing check");
959✔
2104

2105
  streamMutexLock(&execInfo.lock);
959✔
2106
  int32_t numOfNodes = extractStreamNodeList(pMnode);
959✔
2107
  streamMutexUnlock(&execInfo.lock);
959✔
2108

2109
  if (numOfNodes == 0) {
959!
2110
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2111
    execInfo.ts = ts;
×
2112
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
UNCOV
2113
    return 0;
×
2114
  }
2115

2116
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
959✔
2117
  if (code) {
959!
UNCOV
2118
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2119
  }
2120

2121
  if (!allReady) {
959✔
2122
    taosArrayDestroy(pNodeSnapshot);
9✔
2123
    atomic_store_32(&mndNodeCheckSentinel, 0);
9✔
2124
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
9!
2125
    return 0;
9✔
2126
  }
2127

2128
  streamMutexLock(&execInfo.lock);
950✔
2129

2130
  code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
950✔
2131
  if (code) {
950!
UNCOV
2132
    goto _end;
×
2133
  }
2134

2135
  SVgroupChangeInfo changeInfo = {0};
950✔
2136
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
950✔
2137
  if (code) {
950!
UNCOV
2138
    goto _end;
×
2139
  }
2140

2141
  {
2142
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
950!
2143
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2144
      updateAllVgroups = true;
×
2145
      execInfo.switchFromFollower = false;  // reset the flag
×
UNCOV
2146
      addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
×
2147
    }
2148
  }
2149

2150
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
950!
2151
    // kill current active checkpoint transaction, since the transaction is vnode wide.
UNCOV
2152
    killAllCheckpointTrans(pMnode, &changeInfo);
×
UNCOV
2153
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
×
2154

2155
    // keep the new vnode snapshot if success
UNCOV
2156
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2157
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
×
2158
      if (code) {
×
2159
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
2160
        goto _end;
×
2161
      }
2162

UNCOV
2163
      execInfo.ts = ts;
×
UNCOV
2164
      mDebug("create trans successfully, update cached node list, numOfNodes:%d",
×
2165
             (int)taosArrayGetSize(execInfo.pNodeList));
2166
    } else {
UNCOV
2167
      mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
×
2168
    }
2169
  } else {
2170
    mDebug("no update found in nodeList");
950✔
2171
  }
2172

2173
  mndDestroyVgroupChangeInfo(&changeInfo);
950✔
2174

2175
  _end:
950✔
2176
  streamMutexUnlock(&execInfo.lock);
950✔
2177
  taosArrayDestroy(pNodeSnapshot);
950✔
2178

2179
  mDebug("end to do stream task node change checking");
950✔
2180
  atomic_store_32(&mndNodeCheckSentinel, 0);
950✔
2181
  return 0;
950✔
2182
}
2183

2184
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
2,006✔
2185
  SMnode *pMnode = pReq->info.node;
2,006✔
2186
  SSdb   *pSdb = pMnode->pSdb;
2,006✔
2187
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
2,006✔
2188
    return 0;
1,047✔
2189
  }
2190

2191
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
959✔
2192
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
959✔
2193
  if (pMsg == NULL) {
959!
UNCOV
2194
    return terrno;
×
2195
  }
2196

2197
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
959✔
2198
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
959✔
2199
}
2200

2201
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
849✔
2202
  SStreamTaskIter *pIter = NULL;
849✔
2203
  int32_t          code = createStreamTaskIter(pStream, &pIter);
849✔
2204
  if (code) {
849!
2205
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
2206
    return;
×
2207
  }
2208

2209
  while (streamTaskIterNextTask(pIter)) {
5,101✔
2210
    SStreamTask *pTask = NULL;
4,252✔
2211
    code = streamTaskIterGetCurrent(pIter, &pTask);
4,252✔
2212
    if (code) {
4,252!
UNCOV
2213
      break;
×
2214
    }
2215

2216
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
4,252✔
2217
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
4,252✔
2218
    if (p == NULL) {
4,252✔
2219
      STaskStatusEntry entry = {0};
3,961✔
2220
      streamTaskStatusInit(&entry, pTask);
3,961✔
2221

2222
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
3,961✔
2223
      if (code == 0) {
3,961!
2224
        void *  px = taosArrayPush(pExecNode->pTaskList, &id);
3,961✔
2225
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
3,961✔
2226
        if (px) {
3,961!
2227
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
3,961!
2228
        } else {
UNCOV
2229
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2230
        }
2231
      } else {
UNCOV
2232
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t) entry.id.taskId);
×
2233
      }
2234

2235
      // add the new vgroups if not added yet
2236
      bool exist = false;
3,961✔
2237
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
25,091✔
2238
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
24,222✔
2239
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
24,222!
2240
          exist = true;
3,092✔
2241
          break;
3,092✔
2242
        }
2243
      }
2244

2245
      if (!exist) {
3,961✔
2246
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
869✔
2247
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
869✔
2248

2249
        void* px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
869✔
2250
        if (px) {
869!
2251
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
869!
2252
        } else {
UNCOV
2253
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList))
×
2254
        }
2255
      }
2256
    }
2257
  }
2258

2259
  destroyStreamTaskIter(pIter);
849✔
2260
}
2261

2262
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
1,370✔
2263
  int32_t num = taosArrayGetSize(pList);
1,370✔
2264
  for (int32_t i = 0; i < num; ++i) {
4,761✔
2265
    int32_t *pId = taosArrayGet(pList, i);
3,391✔
2266
    if (pId == NULL) {
3,391!
UNCOV
2267
      continue;
×
2268
    }
2269

2270
    if (taskId == *pId) {
3,391!
UNCOV
2271
      return;
×
2272
    }
2273
  }
2274

2275
  int32_t numOfTasks = taosArrayGetSize(pList);
1,370✔
2276
  void   *p = taosArrayPush(pList, &taskId);
1,370✔
2277
  if (p) {
1,370!
2278
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
1,370✔
2279
  } else {
UNCOV
2280
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2281
           uid, numOfTasks);
2282
  }
2283
}
2284

2285
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
1,370✔
2286
  SMnode                  *pMnode = pReq->info.node;
1,370✔
2287
  SStreamTaskCheckpointReq req = {0};
1,370✔
2288

2289
  SDecoder decoder = {0};
1,370✔
2290
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
1,370✔
2291

2292
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
1,370!
2293
    tDecoderClear(&decoder);
×
2294
    mError("invalid task checkpoint req msg received");
×
UNCOV
2295
    return TSDB_CODE_INVALID_MSG;
×
2296
  }
2297
  tDecoderClear(&decoder);
1,370✔
2298

2299
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
1,370✔
2300

2301
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2302
  streamMutexLock(&execInfo.lock);
1,370✔
2303

2304
  SStreamObj *pStream = NULL;
1,370✔
2305
  int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream);
1,370✔
2306
  if (pStream == NULL || code != 0) {
1,370!
UNCOV
2307
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2308
          req.streamId);
2309

2310
    // not in meta-store yet, try to acquire the task in exec buffer
2311
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2312
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2313
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2314
    if (p == NULL) {
×
2315
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2316
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
2317
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2318
    } else {
UNCOV
2319
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2320
             req.streamId, req.taskId);
2321
    }
2322
  }
2323

2324
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
1,370!
2325

2326
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
1,370✔
2327
  if (pReqTaskList == NULL) {
1,370✔
2328
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
239✔
2329
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
239✔
2330
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
239✔
2331
    if (code) {
239!
UNCOV
2332
      mError("failed to put into transfer state stream map, code: out of memory");
×
2333
    }
2334
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
239✔
2335
  } else {
2336
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
1,131✔
2337
  }
2338

2339
  int32_t total = taosArrayGetSize(*pReqTaskList);
1,370✔
2340
  if (total == numOfTasks) {  // all tasks has send the reqs
1,370✔
2341
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
236✔
2342
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
236!
2343

2344
    if (pStream != NULL) {  // TODO:handle error
236!
2345
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
236✔
2346
      if (code) {
236!
2347
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
236!
2348
      }
2349
    } else {
2350
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2351
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2352
      // sleep(500ms)
2353
    }
2354

2355
    // remove this entry
2356
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
236✔
2357

2358
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
236✔
2359
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
236✔
2360
  }
2361

2362
  if (pStream != NULL) {
1,370!
2363
    mndReleaseStream(pMnode, pStream);
1,370✔
2364
  }
2365

2366
  streamMutexUnlock(&execInfo.lock);
1,370✔
2367

2368
  {
2369
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
1,370✔
2370
    rsp.pCont = rpcMallocCont(rsp.contLen);
1,370✔
2371
    if (rsp.pCont == NULL) {
1,370!
UNCOV
2372
      return terrno;
×
2373
    }
2374

2375
    SMsgHead *pHead = rsp.pCont;
1,370✔
2376
    pHead->vgId = htonl(req.nodeId);
1,370✔
2377

2378
    tmsgSendRsp(&rsp);
1,370✔
2379
    pReq->info.handle = NULL;  // disable auto rsp
1,370✔
2380
  }
2381

2382
  return 0;
1,370✔
2383
}
2384

2385
// valid the info according to the HbMsg
2386
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
2,681✔
2387
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
2,681✔
2388
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
2,681✔
2389
  if (pTaskEntry == NULL) {
2,681✔
2390
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
20!
2391
    return false;
20✔
2392
  }
2393

2394
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
2,661!
UNCOV
2395
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2396
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
UNCOV
2397
    return false;
×
2398
  }
2399

2400
  // now the task in checkpoint procedure
2401
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
2,661!
UNCOV
2402
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2403
           " discard",
2404
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
UNCOV
2405
    return false;
×
2406
  }
2407

2408
  if (reportChkptId >= pReport->checkpointId) {
2,661!
UNCOV
2409
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2410
           " discard",
2411
           pReport->taskId, pReport->checkpointId, reportChkptId);
UNCOV
2412
    return false;
×
2413
  }
2414

2415
  return true;
2,661✔
2416
}
2417

2418
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) {
2,681✔
2419
  bool valid = validateChkptReport(pReport, reportChkptId);
2,681✔
2420
  if (!valid) {
2,681✔
2421
    return;
20✔
2422
  }
2423

2424
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
7,581✔
2425
    STaskChkptInfo *p = taosArrayGet(pList, i);
4,920✔
2426
    if (p == NULL) {
4,920!
UNCOV
2427
      continue;
×
2428
    }
2429

2430
    if (p->taskId == pReport->taskId) {
4,920!
2431
      if (p->checkpointId > pReport->checkpointId) {
×
UNCOV
2432
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2433
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2434
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
UNCOV
2435
        mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2436
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2437

UNCOV
2438
        memcpy(p, pReport, sizeof(STaskChkptInfo));
×
2439
      } else {
UNCOV
2440
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2441
      }
UNCOV
2442
      return;
×
2443
    }
2444
  }
2445

2446
  STaskChkptInfo info = {
2,661✔
2447
      .streamId = pReport->streamId,
2,661✔
2448
      .taskId = pReport->taskId,
2,661✔
2449
      .transId = pReport->transId,
2,661✔
2450
      .dropHTask = pReport->dropHTask,
2,661✔
2451
      .version = pReport->checkpointVer,
2,661✔
2452
      .ts = pReport->checkpointTs,
2,661✔
2453
      .checkpointId = pReport->checkpointId,
2,661✔
2454
      .nodeId = pReport->nodeId,
2,661✔
2455
  };
2456

2457
  void *p = taosArrayPush(pList, &info);
2,661✔
2458
  if (p == NULL) {
2,661!
UNCOV
2459
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2460
  } else {
2461
    int32_t size = taosArrayGetSize(pList);
2,661✔
2462
    mDebug("stream:0x%"PRIx64" %d tasks has send checkpoint-report", pReport->streamId, size);
2,661✔
2463
  }
2464
}
2465

2466
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
2,681✔
2467
  SMnode           *pMnode = pReq->info.node;
2,681✔
2468
  SCheckpointReport req = {0};
2,681✔
2469

2470
  SDecoder decoder = {0};
2,681✔
2471
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
2,681✔
2472

2473
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
2,681!
2474
    tDecoderClear(&decoder);
×
2475
    mError("invalid task checkpoint-report msg received");
×
UNCOV
2476
    return TSDB_CODE_INVALID_MSG;
×
2477
  }
2478
  tDecoderClear(&decoder);
2,681✔
2479

2480
  streamMutexLock(&execInfo.lock);
2,681✔
2481
  mndInitStreamExecInfo(pMnode, &execInfo);
2,681✔
2482
  streamMutexUnlock(&execInfo.lock);
2,681✔
2483

2484
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
2,681✔
2485
         " checkpointVer:%" PRId64 " transId:%d",
2486
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2487

2488
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2489
  streamMutexLock(&execInfo.lock);
2,681✔
2490

2491
  SStreamObj *pStream = NULL;
2,681✔
2492
  int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream);
2,681✔
2493
  if (pStream == NULL || code != 0) {
2,681!
UNCOV
2494
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2495

2496
    // not in meta-store yet, try to acquire the task in exec buffer
2497
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2498
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2499
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2500
    if (p == NULL) {
×
2501
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2502
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
2503
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2504
    } else {
UNCOV
2505
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2506
             req.streamId, req.taskId);
2507
    }
2508
  }
2509

2510
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
2,681!
2511

2512
  SChkptReportInfo *pInfo = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
2,681✔
2513
  if (pInfo == NULL) {
2,681✔
2514
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
238✔
2515
    if (info.pTaskList != NULL) {
238!
2516
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
238✔
2517
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
238✔
2518
      if (code) {
238!
UNCOV
2519
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2520
      }
2521

2522
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
238✔
2523
    }
2524
  } else {
2525
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
2,443✔
2526
  }
2527

2528
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
2,681✔
2529
  if (total == numOfTasks) {  // all tasks has send the reqs
2,681✔
2530
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
761!
2531
          " will be issued soon",
2532
          req.streamId, pStream->name, total, req.checkpointId);
2533
  }
2534

2535
  if (pStream != NULL) {
2,681!
2536
    mndReleaseStream(pMnode, pStream);
2,681✔
2537
  }
2538

2539
  streamMutexUnlock(&execInfo.lock);
2,681✔
2540

2541
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
2,681✔
2542
  return code;
2,681✔
2543
}
2544

2545
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks, bool *pAllSame) {
76✔
2546
  int32_t num = 0;
76✔
2547
  int64_t chkId = INT64_MAX;
76✔
2548
  *pExistedTasks = 0;
76✔
2549
  *pAllSame = true;
76✔
2550

2551
  for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
1,380✔
2552
    STaskId* p = taosArrayGet(execInfo.pTaskList, i);
1,304✔
2553
    if (p == NULL) {
1,304!
UNCOV
2554
      continue;
×
2555
    }
2556

2557
    if (p->streamId != streamId) {
1,304✔
2558
      continue;
868✔
2559
    }
2560

2561
    num += 1;
436✔
2562
    STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
436✔
2563
    if (chkId > pe->checkpointInfo.latestId) {
436✔
2564
      if (chkId != INT64_MAX) {
76!
UNCOV
2565
        *pAllSame = false;
×
2566
      }
2567
      chkId = pe->checkpointInfo.latestId;
76✔
2568
    }
2569
  }
2570

2571
  *pExistedTasks = num;
76✔
2572
  if (num < numOfTasks) { // not all task send info to mnode through hbMsg, no valid checkpoint Id
76!
UNCOV
2573
    return -1;
×
2574
  }
2575

2576
  return chkId;
76✔
2577
}
2578

2579
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
2,681✔
2580
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
2,681✔
2581
  rsp.pCont = rpcMallocCont(rsp.contLen);
2,681✔
2582
  if (rsp.pCont != NULL) {
2,681!
2583
    SMsgHead *pHead = rsp.pCont;
2,681✔
2584
    pHead->vgId = htonl(vgId);
2,681✔
2585

2586
    tmsgSendRsp(&rsp);
2,681✔
2587
    pInfo->handle = NULL;  // disable auto rsp
2,681✔
2588
  }
2589
}
2,681✔
2590

2591
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
8,418✔
2592
  SMnode *pMnode = pMsg->info.node;
8,418✔
2593
  int64_t now = taosGetTimestampMs();
8,418✔
2594
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
8,418✔
2595
  if (pStreamList == NULL) {
8,418!
UNCOV
2596
    return terrno;
×
2597
  }
2598

2599
  mDebug("start to process consensus-checkpointId in tmr");
8,418✔
2600

2601
  bool    allReady = true;
8,418✔
2602
  SArray *pNodeSnapshot = NULL;
8,418✔
2603

2604
  int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
8,418✔
2605
  taosArrayDestroy(pNodeSnapshot);
8,418✔
2606
  if (code) {
8,418!
UNCOV
2607
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
2608
  }
2609

2610
  if (!allReady) {
8,418✔
2611
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
297!
2612
    taosArrayDestroy(pStreamList);
297✔
2613
    return 0;
297✔
2614
  }
2615

2616
  streamMutexLock(&execInfo.lock);
8,121✔
2617

2618
  void *pIter = NULL;
8,121✔
2619
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
8,137✔
2620
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
16✔
2621

2622
    int64_t streamId = -1;
16✔
2623
    int32_t num = taosArrayGetSize(pInfo->pTaskList);
16✔
2624
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
16✔
2625
    if (pList == NULL) {
16!
UNCOV
2626
      continue;
×
2627
    }
2628

2629
    SStreamObj *pStream = NULL;
16✔
2630
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
16✔
2631
    if (pStream == NULL || code != 0) {  // stream has been dropped already
16!
2632
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2633
      taosArrayDestroy(pList);
×
UNCOV
2634
      continue;
×
2635
    }
2636

2637
    for (int32_t j = 0; j < num; ++j) {
92✔
2638
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
76✔
2639
      if (pe == NULL) {
76!
UNCOV
2640
        continue;
×
2641
      }
2642

2643
      streamId = pe->req.streamId;
76✔
2644

2645
      int32_t existed = 0;
76✔
2646
      bool    allSame = true;
76✔
2647
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
76✔
2648
      if (chkId == -1) {
76!
UNCOV
2649
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2650
               pInfo->numOfTasks, pe->req.taskId);
UNCOV
2651
        break;
×
2652
      }
2653

2654
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
76!
2655
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
76✔
2656
               pe->req.startTs, (now - pe->ts) / 1000.0);
2657
        if (chkId > pe->req.checkpointId) {
76!
2658
          streamMutexUnlock(&execInfo.lock);
×
2659
          taosArrayDestroy(pStreamList);
×
UNCOV
2660
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2661
                 pe->req.checkpointId, chkId);
UNCOV
2662
          return TSDB_CODE_FAILED;
×
2663
        }
2664
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
76✔
2665
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
76!
UNCOV
2666
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2667
        }
2668

2669
        void* p = taosArrayPush(pList, &pe->req.taskId);
76✔
2670
        if (p == NULL) {
76!
UNCOV
2671
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2672
        }
2673
        streamId = pe->req.streamId;
76✔
2674
      } else {
UNCOV
2675
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
×
2676
               pe->req.startTs, (now - pe->ts) / 1000.0);
2677
      }
2678
    }
2679

2680
    mndReleaseStream(pMnode, pStream);
16✔
2681

2682
    if (taosArrayGetSize(pList) > 0) {
16!
2683
      for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
92✔
2684
        int32_t *taskId = taosArrayGet(pList, i);
76✔
2685
        if (taskId == NULL) {
76!
UNCOV
2686
          continue;
×
2687
        }
2688

2689
        for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
76!
2690
          SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
76✔
2691
          if ((pe != NULL) && (pe->req.taskId == *taskId)) {
76!
2692
            taosArrayRemove(pInfo->pTaskList, k);
76✔
2693
            break;
76✔
2694
          }
2695
        }
2696
      }
2697
    }
2698

2699
    taosArrayDestroy(pList);
16✔
2700

2701
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
16!
2702
      mndClearConsensusRspEntry(pInfo);
16✔
2703
      if (streamId == -1) {
16!
2704
        streamMutexUnlock(&execInfo.lock);
×
2705
        taosArrayDestroy(pStreamList);
×
2706
        mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
×
UNCOV
2707
        return TSDB_CODE_FAILED;
×
2708
      }
2709
      void* p = taosArrayPush(pStreamList, &streamId);
16✔
2710
      if (p == NULL) {
16!
UNCOV
2711
        mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
×
2712
      }
2713
    }
2714
  }
2715

2716
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
8,137✔
2717
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
16✔
2718
    if (pStreamId == NULL) {
16!
UNCOV
2719
      continue;
×
2720
    }
2721

2722
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
16✔
2723
  }
2724

2725
  streamMutexUnlock(&execInfo.lock);
8,121✔
2726

2727
  taosArrayDestroy(pStreamList);
8,121✔
2728
  mDebug("end to process consensus-checkpointId in tmr");
8,121✔
2729
  return code;
8,121✔
2730
}
2731

2732
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
×
2733
  int32_t code = mndProcessCreateStreamReq(pReq);
×
2734
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2735
    pReq->info.rsp = rpcMallocCont(1);
×
2736
    if (pReq->info.rsp == NULL) {
×
UNCOV
2737
      return terrno;
×
2738
    }
2739

2740
    pReq->info.rspLen = 1;
×
2741
    pReq->info.noResp = false;
×
UNCOV
2742
    pReq->code = code;
×
2743
  }
UNCOV
2744
  return code;
×
2745
}
2746

2747
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
×
2748
  int32_t code = mndProcessDropStreamReq(pReq);
×
2749
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2750
    pReq->info.rsp = rpcMallocCont(1);
×
2751
    if (pReq->info.rsp == NULL) {
×
UNCOV
2752
      return terrno;
×
2753
    }
2754

2755
    pReq->info.rspLen = 1;
×
2756
    pReq->info.noResp = false;
×
UNCOV
2757
    pReq->code = code;
×
2758
  }
UNCOV
2759
  return code;
×
2760
}
2761

2762
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
16,667✔
2763
  if (pExecInfo->initTaskList || pMnode == NULL) {
16,667!
2764
    return;
16,598✔
2765
  }
2766

2767
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
69✔
2768
  pExecInfo->initTaskList = true;
69✔
2769
}
2770

2771
void mndStreamResetInitTaskListLoadFlag() {
620✔
2772
  mInfo("reset task list buffer init flag for leader");
620!
2773
  execInfo.initTaskList = false;
620✔
2774
}
620✔
2775

2776
void mndUpdateStreamExecInfoRole(SMnode* pMnode, int32_t role) {
724✔
2777
  execInfo.switchFromFollower = false;
724✔
2778

2779
  if (execInfo.role == NODE_ROLE_UNINIT) {
724✔
2780
    execInfo.role = role;
666✔
2781
    if (role == NODE_ROLE_LEADER) {
666✔
2782
      mInfo("init mnode is set to leader");
596!
2783
    } else {
2784
      mInfo("init mnode is set to follower");
70!
2785
    }
2786
  } else {
2787
    if (role == NODE_ROLE_LEADER) {
58✔
2788
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
24!
2789
        execInfo.role = role;
24✔
2790
        execInfo.switchFromFollower = true;
24✔
2791
        mInfo("mnode switch to be leader from follower");
24!
2792
      } else {
UNCOV
2793
        mInfo("mnode remain to be leader, do nothing");
×
2794
      }
2795
    } else {  // follower's
2796
      if (execInfo.role == NODE_ROLE_LEADER) {
34!
2797
        execInfo.role = role;
×
UNCOV
2798
        mInfo("mnode switch to be follower from leader");
×
2799
      } else {
2800
        mInfo("mnode remain to be follower, do nothing");
34!
2801
      }
2802
    }
2803
  }
2804
}
724✔
2805

2806
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
69✔
2807
  SSdb       *pSdb = pMnode->pSdb;
69✔
2808
  SStreamObj *pStream = NULL;
69✔
2809
  void       *pIter = NULL;
69✔
2810

2811
  while (1) {
2812
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
187✔
2813
    if (pIter == NULL) {
187✔
2814
      break;
69✔
2815
    }
2816

2817
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
118✔
2818
    sdbRelease(pSdb, pStream);
118✔
2819
  }
2820
}
69✔
2821

2822
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
637✔
2823
  STrans *pTrans = NULL;
637✔
2824
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
637✔
2825
                                 "update checkpoint-info", &pTrans);
2826
  if (pTrans == NULL || code) {
637!
2827
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2828
    return code;
×
2829
  }
2830

2831
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
637✔
2832
  if (code){
637!
2833
    sdbRelease(pMnode->pSdb, pStream);
×
2834
    mndTransDrop(pTrans);
×
UNCOV
2835
    return code;
×
2836
  }
2837

2838
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
637✔
2839
  if (code) {
637!
2840
    sdbRelease(pMnode->pSdb, pStream);
×
2841
    mndTransDrop(pTrans);
×
UNCOV
2842
    return code;
×
2843
  }
2844

2845
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
637✔
2846
  if (code) {
637!
2847
    sdbRelease(pMnode->pSdb, pStream);
×
2848
    mndTransDrop(pTrans);
×
UNCOV
2849
    return code;
×
2850
  }
2851

2852
  code = mndTransPrepare(pMnode, pTrans);
637✔
2853
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
637!
2854
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
2855
    sdbRelease(pMnode->pSdb, pStream);
×
2856
    mndTransDrop(pTrans);
×
UNCOV
2857
    return code;
×
2858
  }
2859

2860
  sdbRelease(pMnode->pSdb, pStream);
637✔
2861
  mndTransDrop(pTrans);
637✔
2862

2863
  return TSDB_CODE_ACTION_IN_PROGRESS;
637✔
2864
}
2865

2866
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
3✔
2867
  SMnode      *pMnode = pReq->info.node;
3✔
2868
  int32_t      code = 0;
3✔
2869
  SOrphanTask *pTask = NULL;
3✔
2870
  int32_t      i = 0;
3✔
2871
  STrans      *pTrans = NULL;
3✔
2872
  int32_t      numOfTasks = 0;
3✔
2873

2874
  SMStreamDropOrphanMsg msg = {0};
3✔
2875
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
3✔
2876
  if (code) {
3!
UNCOV
2877
    return code;
×
2878
  }
2879

2880
  numOfTasks = taosArrayGetSize(msg.pList);
3✔
2881
  if (numOfTasks == 0) {
3!
2882
    mDebug("no orphan tasks to drop, no need to create trans");
×
UNCOV
2883
    goto _err;
×
2884
  }
2885

2886
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
3!
2887

2888
  i = 0;
3✔
2889
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
3!
UNCOV
2890
    i += 1;
×
2891
  }
2892

2893
  if (pTask == NULL) {
3!
2894
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
UNCOV
2895
    goto _err;
×
2896
  }
2897

2898
  // check if it is conflict with other trans in both sourceDb and targetDb.
2899
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
3✔
2900
  if (code) {
3!
UNCOV
2901
    goto _err;
×
2902
  }
2903

2904
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
3✔
2905

2906
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
3✔
2907
  if (pTrans == NULL || code != 0) {
3!
2908
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
2909
    goto _err;
×
2910
  }
2911

2912
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
3✔
2913
  if (code) {
3!
UNCOV
2914
    goto _err;
×
2915
  }
2916

2917
  // drop all tasks
2918
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
3!
2919
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
2920
    goto _err;
×
2921
  }
2922

2923
  // drop stream
2924
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
3!
UNCOV
2925
    goto _err;
×
2926
  }
2927

2928
  code = mndTransPrepare(pMnode, pTrans);
3✔
2929
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3!
2930
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
2931
    goto _err;
×
2932
  }
2933

2934
_err:
3✔
2935
  tDestroyDropOrphanTaskMsg(&msg);
3✔
2936
  mndTransDrop(pTrans);
3✔
2937

2938
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
3!
2939
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
3!
2940
  }
2941
  return code;
3✔
2942
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc