• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.88
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndStream.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq);
44
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq);
45
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
46

47
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
48
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
49

50
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
51
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
53
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
55
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
56
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
57
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
58
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
59
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
60
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
61
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
62
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
63
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
64
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
65
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
66
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
67
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
68
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
69

70
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
71
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
72

73
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
74
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
75
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
77
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
78

79
int32_t mndInitStream(SMnode *pMnode) {
8✔
80
  SSdbTable table = {
8✔
81
      .sdbType = SDB_STREAM,
82
      .keyType = SDB_KEY_BINARY,
83
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
84
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
85
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
86
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
87
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
88
  };
89
  SSdbTable tableSeq = {
8✔
90
      .sdbType = SDB_STREAM_SEQ,
91
      .keyType = SDB_KEY_BINARY,
92
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
93
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
94
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
95
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
96
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
97
  };
98

99
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
8✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_FAILED_STREAM, mndProcessFailedStreamReq);
8✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_CHECK_STREAM_TIMER, mndProcessCheckStreamStatusReq);
8✔
102
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
8✔
103
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
8✔
104

105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
8✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
8✔
107
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
8✔
108
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
8✔
109
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
8✔
110
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
8✔
111
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
8✔
112
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
8✔
113
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
8✔
114

115
  // for msgs inside mnode
116
  // TODO change the name
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
8✔
118
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
8✔
119
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
8✔
120
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
8✔
121

122
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
8✔
123
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_ALL_STOP_RSP, mndTransProcessRsp);
8✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
8✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
8✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
8✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
8✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
8✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
8✔
130
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
8✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
8✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
8✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
8✔
134

135
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
8✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
8✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
8✔
138

139
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
8✔
140
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
8✔
141
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
8✔
142
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
8✔
143

144
  int32_t code = mndInitExecInfo();
8✔
145
  if (code) {
8!
UNCOV
146
    return code;
×
147
  }
148

149
  code = sdbSetTable(pMnode->pSdb, table);
8✔
150
  if (code) {
8!
UNCOV
151
    return code;
×
152
  }
153

154
  code = sdbSetTable(pMnode->pSdb, tableSeq);
8✔
155
  return code;
8✔
156
}
157

158
void mndCleanupStream(SMnode *pMnode) {
8✔
159
  taosArrayDestroy(execInfo.pTaskList);
8✔
160
  taosArrayDestroy(execInfo.pNodeList);
8✔
161
  taosArrayDestroy(execInfo.pKilledChkptTrans);
8✔
162
  taosHashCleanup(execInfo.pTaskMap);
8✔
163
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
8✔
164
  taosHashCleanup(execInfo.pTransferStateStreams);
8✔
165
  taosHashCleanup(execInfo.pChkptStreams);
8✔
166
  taosHashCleanup(execInfo.pStreamConsensus);
8✔
167
  (void)taosThreadMutexDestroy(&execInfo.lock);
8✔
168
  mDebug("mnd stream exec info cleanup");
8!
169
}
8✔
170

UNCOV
171
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
×
UNCOV
172
  int32_t     code = 0;
×
UNCOV
173
  int32_t     lino = 0;
×
UNCOV
174
  SSdbRow    *pRow = NULL;
×
UNCOV
175
  SStreamObj *pStream = NULL;
×
UNCOV
176
  void       *buf = NULL;
×
UNCOV
177
  int8_t      sver = 0;
×
178
  int32_t     tlen;
UNCOV
179
  int32_t     dataPos = 0;
×
180

181
  code = sdbGetRawSoftVer(pRaw, &sver);
×
182
  TSDB_CHECK_CODE(code, lino, _over);
×
183

UNCOV
184
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
×
UNCOV
185
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
UNCOV
186
    goto _over;
×
187
  }
188

UNCOV
189
  pRow = sdbAllocRow(sizeof(SStreamObj));
×
UNCOV
190
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
×
191

UNCOV
192
  pStream = sdbGetRowObj(pRow);
×
UNCOV
193
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
×
194

UNCOV
195
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
×
196

UNCOV
197
  buf = taosMemoryMalloc(tlen + 1);
×
UNCOV
198
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
×
199

UNCOV
200
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
×
201

202
  SDecoder decoder;
UNCOV
203
  tDecoderInit(&decoder, buf, tlen + 1);
×
204
  code = tDecodeSStreamObj(&decoder, pStream, sver);
×
UNCOV
205
  tDecoderClear(&decoder);
×
206

UNCOV
207
  if (code < 0) {
×
UNCOV
208
    tFreeStreamObj(pStream);
×
209
  }
210

211
_over:
×
212
  taosMemoryFreeClear(buf);
×
213

UNCOV
214
  if (code != TSDB_CODE_SUCCESS) {
×
215
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
216
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
UNCOV
217
    taosMemoryFreeClear(pRow);
×
218

UNCOV
219
    terrno = code;
×
UNCOV
220
    return NULL;
×
221
  } else {
UNCOV
222
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
×
223
           pStream->checkpointId);
224

UNCOV
225
    terrno = 0;
×
UNCOV
226
    return pRow;
×
227
  }
228
}
229

UNCOV
230
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
×
UNCOV
231
  mTrace("stream:%s, perform insert action", pStream->name);
×
UNCOV
232
  return 0;
×
233
}
234

UNCOV
235
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
×
UNCOV
236
  mInfo("stream:%s, perform delete action", pStream->name);
×
UNCOV
237
  taosWLockLatch(&pStream->lock);
×
UNCOV
238
  tFreeStreamObj(pStream);
×
UNCOV
239
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
240
  return 0;
×
241
}
242

UNCOV
243
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
×
UNCOV
244
  mTrace("stream:%s, perform update action", pOldStream->name);
×
UNCOV
245
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
×
246

UNCOV
247
  taosWLockLatch(&pOldStream->lock);
×
248

UNCOV
249
  pOldStream->status = pNewStream->status;
×
UNCOV
250
  pOldStream->updateTime = pNewStream->updateTime;
×
UNCOV
251
  pOldStream->checkpointId = pNewStream->checkpointId;
×
UNCOV
252
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
×
UNCOV
253
  if (pOldStream->tasks == NULL){
×
UNCOV
254
    pOldStream->tasks = pNewStream->tasks;
×
UNCOV
255
    pNewStream->tasks = NULL;
×
256
  }
UNCOV
257
  if (pOldStream->pHTasksList == NULL){
×
UNCOV
258
    pOldStream->pHTasksList = pNewStream->pHTasksList;
×
UNCOV
259
    pNewStream->pHTasksList = NULL;
×
260
  }
UNCOV
261
  taosWUnLockLatch(&pOldStream->lock);
×
UNCOV
262
  return 0;
×
263
}
264

UNCOV
265
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
×
UNCOV
266
  int32_t code = 0;
×
UNCOV
267
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
268
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
269
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
270
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
271
  }
272
  return code;
×
273
}
274

UNCOV
275
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
×
UNCOV
276
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
277
  sdbRelease(pSdb, pStream);
×
278
}
×
279

UNCOV
280
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
UNCOV
281
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
UNCOV
282
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
UNCOV
283
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
UNCOV
284
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
285

UNCOV
286
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
×
287
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
×
UNCOV
288
      pCreate->targetStbFullName[0] == 0) {
×
UNCOV
289
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
290
  }
UNCOV
291
  return TSDB_CODE_SUCCESS;
×
292
}
293

294
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
×
UNCOV
295
  pWrapper->nCols = taosArrayGetSize(pFields);
×
UNCOV
296
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
×
UNCOV
297
  if (NULL == pWrapper->pSchema) {
×
298
    return terrno;
×
299
  }
300

UNCOV
301
  int32_t index = 0;
×
UNCOV
302
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
×
UNCOV
303
    SField *pField = (SField *)taosArrayGet(pFields, i);
×
UNCOV
304
    if (pField == NULL) {
×
UNCOV
305
      return terrno;
×
306
    }
307

UNCOV
308
    if (TSDB_DATA_TYPE_NULL == pField->type) {
×
UNCOV
309
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
UNCOV
310
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
311
    } else {
UNCOV
312
      pWrapper->pSchema[index].type = pField->type;
×
UNCOV
313
      pWrapper->pSchema[index].bytes = pField->bytes;
×
314
    }
315
    pWrapper->pSchema[index].colId = index + 1;
×
UNCOV
316
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
×
UNCOV
317
    pWrapper->pSchema[index].flags = pField->flags;
×
UNCOV
318
    index += 1;
×
319
  }
320

UNCOV
321
  return TSDB_CODE_SUCCESS;
×
322
}
323

UNCOV
324
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
×
UNCOV
325
  if (pWrapper->nCols < 2) {
×
UNCOV
326
    return false;
×
327
  }
UNCOV
328
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
×
UNCOV
329
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
×
UNCOV
330
      return true;
×
331
    }
332
  }
UNCOV
333
  return false;
×
334
}
335

UNCOV
336
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
×
UNCOV
337
  SNode      *pAst = NULL;
×
UNCOV
338
  SQueryPlan *pPlan = NULL;
×
UNCOV
339
  int32_t     code = 0;
×
340

UNCOV
341
  mInfo("stream:%s to create", pCreate->name);
×
UNCOV
342
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
×
UNCOV
343
  pObj->createTime = taosGetTimestampMs();
×
UNCOV
344
  pObj->updateTime = pObj->createTime;
×
UNCOV
345
  pObj->version = 1;
×
346

UNCOV
347
  if (pCreate->smaId > 0) {
×
UNCOV
348
    pObj->subTableWithoutMd5 = 1;
×
349
  }
350

UNCOV
351
  pObj->smaId = pCreate->smaId;
×
UNCOV
352
  pObj->indexForMultiAggBalance = -1;
×
353

UNCOV
354
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
×
355

UNCOV
356
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
×
UNCOV
357
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
×
358

UNCOV
359
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
×
UNCOV
360
  pObj->status = STREAM_STATUS__NORMAL;
×
361

362
  pObj->conf.igExpired = pCreate->igExpired;
×
363
  pObj->conf.trigger = pCreate->triggerType;
×
UNCOV
364
  pObj->conf.triggerParam = pCreate->maxDelay;
×
365
  pObj->conf.watermark = pCreate->watermark;
×
UNCOV
366
  pObj->conf.fillHistory = pCreate->fillHistory;
×
UNCOV
367
  pObj->deleteMark = pCreate->deleteMark;
×
UNCOV
368
  pObj->igCheckUpdate = pCreate->igUpdate;
×
369

UNCOV
370
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
×
UNCOV
371
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
×
UNCOV
372
  if (pSourceDb == NULL) {
×
UNCOV
373
    code = terrno;
×
UNCOV
374
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
375
          tstrerror(code));
376
    goto _ERR;
×
377
  }
378

UNCOV
379
  pObj->sourceDbUid = pSourceDb->uid;
×
UNCOV
380
  mndReleaseDb(pMnode, pSourceDb);
×
381

UNCOV
382
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
383

UNCOV
384
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
×
UNCOV
385
  if (pTargetDb == NULL) {
×
UNCOV
386
    code = terrno;
×
UNCOV
387
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
388
           tstrerror(code));
UNCOV
389
    goto _ERR;
×
390
  }
391

UNCOV
392
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
×
393

UNCOV
394
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
×
UNCOV
395
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
396
  } else {
UNCOV
397
    pObj->targetStbUid = pCreate->targetStbUid;
×
398
  }
399
  pObj->targetDbUid = pTargetDb->uid;
×
UNCOV
400
  mndReleaseDb(pMnode, pTargetDb);
×
401

UNCOV
402
  pObj->sql = pCreate->sql;
×
UNCOV
403
  pObj->ast = pCreate->ast;
×
404

UNCOV
405
  pCreate->sql = NULL;
×
UNCOV
406
  pCreate->ast = NULL;
×
407

408
  // deserialize ast
UNCOV
409
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
×
UNCOV
410
    goto _ERR;
×
411
  }
412

413
  // create output schema
UNCOV
414
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
×
UNCOV
415
    goto _ERR;
×
416
  }
417

UNCOV
418
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
×
UNCOV
419
  if (numOfNULL > 0) {
×
420
    pObj->outputSchema.nCols += numOfNULL;
×
421
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
×
422
    if (!pFullSchema) {
×
423
      code = terrno;
×
424
      goto _ERR;
×
425
    }
426

UNCOV
427
    int32_t nullIndex = 0;
×
UNCOV
428
    int32_t dataIndex = 0;
×
UNCOV
429
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
×
UNCOV
430
      if (nullIndex >= numOfNULL) {
×
UNCOV
431
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
UNCOV
432
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
433
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
434
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
UNCOV
435
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
UNCOV
436
        dataIndex++;
×
437
      } else {
UNCOV
438
        SColLocation *pos = NULL;
×
UNCOV
439
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
×
UNCOV
440
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
×
441
        }
442

UNCOV
443
        if (pos == NULL) {
×
UNCOV
444
          mError("invalid null column index, %d", nullIndex);
×
UNCOV
445
          continue;
×
446
        }
447

UNCOV
448
        if (i < pos->slotId) {
×
UNCOV
449
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
UNCOV
450
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
UNCOV
451
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
UNCOV
452
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
UNCOV
453
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
UNCOV
454
          dataIndex++;
×
455
        } else {
UNCOV
456
          pFullSchema[i].bytes = 0;
×
UNCOV
457
          pFullSchema[i].colId = pos->colId;
×
UNCOV
458
          pFullSchema[i].flags = COL_SET_NULL;
×
UNCOV
459
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
×
UNCOV
460
          pFullSchema[i].type = pos->type;
×
UNCOV
461
          nullIndex++;
×
462
        }
463
      }
464
    }
465

UNCOV
466
    taosMemoryFree(pObj->outputSchema.pSchema);
×
UNCOV
467
    pObj->outputSchema.pSchema = pFullSchema;
×
468
  }
469

UNCOV
470
  SPlanContext cxt = {
×
471
      .pAstRoot = pAst,
472
      .topicQuery = false,
473
      .streamQuery = true,
474
      .triggerType =
UNCOV
475
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
×
UNCOV
476
      .watermark = pObj->conf.watermark,
×
UNCOV
477
      .igExpired = pObj->conf.igExpired,
×
UNCOV
478
      .deleteMark = pObj->deleteMark,
×
479
      .igCheckUpdate = pObj->igCheckUpdate,
×
UNCOV
480
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
×
481
  };
482

483
  // using ast and param to build physical plan
UNCOV
484
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
×
UNCOV
485
    goto _ERR;
×
486
  }
487

488
  // save physcial plan
UNCOV
489
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
×
UNCOV
490
    goto _ERR;
×
491
  }
492

UNCOV
493
  pObj->tagSchema.nCols = pCreate->numOfTags;
×
UNCOV
494
  if (pCreate->numOfTags) {
×
495
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
×
UNCOV
496
    if (pObj->tagSchema.pSchema == NULL) {
×
UNCOV
497
      code = terrno;
×
UNCOV
498
      goto _ERR;
×
499
    }
500
  }
501

502
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
UNCOV
503
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
×
UNCOV
504
    SField *pField = taosArrayGet(pCreate->pTags, i);
×
UNCOV
505
    if (pField == NULL) {
×
UNCOV
506
      continue;
×
507
    }
508

UNCOV
509
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
×
UNCOV
510
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
×
UNCOV
511
    pObj->tagSchema.pSchema[i].flags = pField->flags;
×
UNCOV
512
    pObj->tagSchema.pSchema[i].type = pField->type;
×
UNCOV
513
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
×
514
  }
515

516
_ERR:
×
UNCOV
517
  if (pAst != NULL) nodesDestroyNode(pAst);
×
UNCOV
518
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
×
UNCOV
519
  return code;
×
520
}
521

522
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
×
523
  SEncoder encoder;
UNCOV
524
  tEncoderInit(&encoder, NULL, 0);
×
525

UNCOV
526
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
UNCOV
527
    pTask->ver = SSTREAM_TASK_VER;
×
528
  }
529

UNCOV
530
  int32_t code = tEncodeStreamTask(&encoder, pTask);
×
531
  if (code == -1) {
×
UNCOV
532
    tEncoderClear(&encoder);
×
UNCOV
533
    return TSDB_CODE_INVALID_MSG;
×
534
  }
535

UNCOV
536
  int32_t size = encoder.pos;
×
UNCOV
537
  int32_t tlen = sizeof(SMsgHead) + size;
×
UNCOV
538
  tEncoderClear(&encoder);
×
539

UNCOV
540
  void *buf = taosMemoryCalloc(1, tlen);
×
UNCOV
541
  if (buf == NULL) {
×
542
    return terrno;
×
543
  }
544

UNCOV
545
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
×
546

UNCOV
547
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
UNCOV
548
  tEncoderInit(&encoder, abuf, size);
×
UNCOV
549
  code = tEncodeStreamTask(&encoder, pTask);
×
550
  tEncoderClear(&encoder);
×
551

UNCOV
552
  if (code != 0) {
×
UNCOV
553
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
UNCOV
554
    taosMemoryFree(buf);
×
UNCOV
555
    return code;
×
556
  }
557

UNCOV
558
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
×
559
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
560
  if (code) {
×
561
    taosMemoryFree(buf);
×
562
  }
563

UNCOV
564
  return code;
×
565
}
566

UNCOV
567
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
×
568
  SStreamTaskIter *pIter = NULL;
×
569
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
570
  if (code) {
×
UNCOV
571
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
572
    return code;
×
573
  }
574

575
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
576
    SStreamTask *pTask = NULL;
×
UNCOV
577
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
578
    if (code) {
×
UNCOV
579
      destroyStreamTaskIter(pIter);
×
UNCOV
580
      return code;
×
581
    }
582

UNCOV
583
    code = mndPersistTaskDeployReq(pTrans, pTask);
×
UNCOV
584
    if (code) {
×
UNCOV
585
      destroyStreamTaskIter(pIter);
×
UNCOV
586
      return code;
×
587
    }
588
  }
589

UNCOV
590
  destroyStreamTaskIter(pIter);
×
591

592
  // persistent stream task for already stored ts data
593
  if (pStream->conf.fillHistory) {
×
UNCOV
594
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
×
595

UNCOV
596
    for (int32_t i = 0; i < level; i++) {
×
UNCOV
597
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
×
598

UNCOV
599
      int32_t numOfTasks = taosArrayGetSize(pLevel);
×
UNCOV
600
      for (int32_t j = 0; j < numOfTasks; j++) {
×
UNCOV
601
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
×
UNCOV
602
        code = mndPersistTaskDeployReq(pTrans, pTask);
×
UNCOV
603
        if (code) {
×
UNCOV
604
          return code;
×
605
        }
606
      }
607
    }
608
  }
609

UNCOV
610
  return code;
×
611
}
612

UNCOV
613
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
614
  int32_t code = 0;
×
UNCOV
615
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
×
UNCOV
616
    return code;
×
617
  }
618

UNCOV
619
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
620
}
621

UNCOV
622
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
×
UNCOV
623
  SStbObj *pStb = NULL;
×
UNCOV
624
  SDbObj  *pDb = NULL;
×
UNCOV
625
  int32_t  code = 0;
×
UNCOV
626
  int32_t  lino = 0;
×
627

UNCOV
628
  SMCreateStbReq createReq = {0};
×
UNCOV
629
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
630
  createReq.numOfColumns = pStream->outputSchema.nCols;
×
UNCOV
631
  createReq.numOfTags = 1;  // group id
×
UNCOV
632
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
×
UNCOV
633
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
×
634

635
  // build fields
UNCOV
636
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
×
UNCOV
637
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
×
UNCOV
638
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
639

UNCOV
640
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
×
UNCOV
641
    pField->flags = pStream->outputSchema.pSchema[i].flags;
×
UNCOV
642
    pField->type = pStream->outputSchema.pSchema[i].type;
×
UNCOV
643
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
×
UNCOV
644
    pField->compress = createDefaultColCmprByType(pField->type);
×
645
  }
646

UNCOV
647
  if (pStream->tagSchema.nCols == 0) {
×
UNCOV
648
    createReq.numOfTags = 1;
×
UNCOV
649
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
UNCOV
650
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
651

652
    // build tags
UNCOV
653
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
UNCOV
654
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
655

UNCOV
656
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
657
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
UNCOV
658
    pField->flags = 0;
×
UNCOV
659
    pField->bytes = 8;
×
660
  } else {
UNCOV
661
    createReq.numOfTags = pStream->tagSchema.nCols;
×
UNCOV
662
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
×
UNCOV
663
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
664

UNCOV
665
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
×
UNCOV
666
      SField *pField = taosArrayGet(createReq.pTags, i);
×
UNCOV
667
      if (pField == NULL) {
×
668
        continue;
×
669
      }
670

UNCOV
671
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
×
UNCOV
672
      pField->flags = pStream->tagSchema.pSchema[i].flags;
×
673
      pField->type = pStream->tagSchema.pSchema[i].type;
×
674
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
×
675
    }
676
  }
677

UNCOV
678
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
×
679
    goto _OVER;
×
680
  }
681

UNCOV
682
  pStb = mndAcquireStb(pMnode, createReq.name);
×
UNCOV
683
  if (pStb != NULL) {
×
UNCOV
684
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
685
    goto _OVER;
×
686
  }
687

UNCOV
688
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
×
689
  if (pDb == NULL) {
×
690
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
UNCOV
691
    goto _OVER;
×
692
  }
693

UNCOV
694
  int32_t numOfStbs = -1;
×
UNCOV
695
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
×
696
    goto _OVER;
×
697
  }
698

UNCOV
699
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
×
UNCOV
700
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
UNCOV
701
    goto _OVER;
×
702
  }
703

UNCOV
704
  SStbObj stbObj = {0};
×
705

UNCOV
706
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
×
UNCOV
707
    goto _OVER;
×
708
  }
709

UNCOV
710
  stbObj.uid = pStream->targetStbUid;
×
711

UNCOV
712
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
×
713
    mndFreeStb(&stbObj);
×
714
    goto _OVER;
×
715
  }
716

UNCOV
717
  tFreeSMCreateStbReq(&createReq);
×
718
  mndFreeStb(&stbObj);
×
UNCOV
719
  mndReleaseStb(pMnode, pStb);
×
720
  mndReleaseDb(pMnode, pDb);
×
UNCOV
721
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
×
UNCOV
722
  return code;
×
723

UNCOV
724
_OVER:
×
UNCOV
725
  tFreeSMCreateStbReq(&createReq);
×
UNCOV
726
  mndReleaseStb(pMnode, pStb);
×
UNCOV
727
  mndReleaseDb(pMnode, pDb);
×
728

UNCOV
729
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
730
         tstrerror(code));
UNCOV
731
  return code;
×
732
}
733

734
// 1. stream number check
735
// 2. target stable can not be target table of other existed streams.
UNCOV
736
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
×
UNCOV
737
  int32_t     numOfStream = 0;
×
738
  SStreamObj *pStream = NULL;
×
UNCOV
739
  void       *pIter = NULL;
×
740

741
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
UNCOV
742
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
×
UNCOV
743
      ++numOfStream;
×
744
    }
745

746

UNCOV
747
    if (numOfStream > MND_STREAM_MAX_NUM) {
×
UNCOV
748
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
749
             pStreamObj->name);
UNCOV
750
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
751
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
752
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
753
    }
754

755
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
×
UNCOV
756
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
×
757
             pStreamObj->name);
UNCOV
758
      sdbRelease(pMnode->pSdb, pStream);
×
759
      sdbCancelFetch(pMnode->pSdb, pIter);
×
760
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
×
761
    }
762
    sdbRelease(pMnode->pSdb, pStream);
×
763
  }
764

765
  return TSDB_CODE_SUCCESS;
×
766
}
767

768
static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
×
769

770
static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
×
771
                                       SStreamTask *pTask) {
772
  int32_t code = TSDB_CODE_SUCCESS;
×
773
  int32_t lino = 0;
×
774

UNCOV
775
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
776
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
777

778
  pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
×
UNCOV
779
  TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
×
780
  pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
×
UNCOV
781
  pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
×
UNCOV
782
  pTask->notifyInfo.streamName = taosStrdup(mndGetDbStr(createReq->name));
×
UNCOV
783
  TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
×
UNCOV
784
  pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
×
UNCOV
785
  TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
×
UNCOV
786
  pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
×
UNCOV
787
  TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
×
788

UNCOV
789
_end:
×
UNCOV
790
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
791
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
792
  }
UNCOV
793
  return code;
×
794
}
795

UNCOV
796
static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
×
797
  int32_t code = TSDB_CODE_SUCCESS;
×
798
  int32_t lino = 0;
×
799
  int32_t level = 0;
×
800
  int32_t nTasks = 0;
×
801
  SArray *pLevel = NULL;
×
802

803
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
UNCOV
804
  TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
805

UNCOV
806
  if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
×
807
    goto _end;
×
808
  }
809

810
  level = taosArrayGetSize(pStream->tasks);
×
811
  for (int32_t i = 0; i < level; ++i) {
×
812
    pLevel = taosArrayGetP(pStream->tasks, i);
×
813
    nTasks = taosArrayGetSize(pLevel);
×
814
    for (int32_t j = 0; j < nTasks; ++j) {
×
UNCOV
815
      code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
UNCOV
816
      TSDB_CHECK_CODE(code, lino, _end);
×
817
    }
818
  }
819

UNCOV
820
  if (pStream->conf.fillHistory && createReq->notifyHistory) {
×
821
    level = taosArrayGetSize(pStream->pHTasksList);
×
UNCOV
822
    for (int32_t i = 0; i < level; ++i) {
×
UNCOV
823
      pLevel = taosArrayGetP(pStream->pHTasksList, i);
×
UNCOV
824
      nTasks = taosArrayGetSize(pLevel);
×
UNCOV
825
      for (int32_t j = 0; j < nTasks; ++j) {
×
UNCOV
826
        code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
UNCOV
827
        TSDB_CHECK_CODE(code, lino, _end);
×
828
      }
829
    }
830
  }
831

UNCOV
832
_end:
×
UNCOV
833
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
834
    mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
×
835
  }
UNCOV
836
  return code;
×
837
}
838

UNCOV
839
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq) {
×
UNCOV
840
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
841
  SStreamObj *pStream = NULL;
×
UNCOV
842
  void       *pIter = NULL;
×
843

UNCOV
844
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
UNCOV
845
    taosWLockLatch(&pStream->lock);
×
UNCOV
846
    if (pStream->status == STREAM_STATUS__INIT && (taosGetTimestampMs() - pStream->createTime > tsStreamFailedTimeout ||
×
UNCOV
847
                                                   taosGetTimestampMs() - pStream->createTime < 0)){
×
848
      pStream->status = STREAM_STATUS__FAILED;
×
849
      tstrncpy(pStream->reserve, "timeout", sizeof(pStream->reserve));
×
UNCOV
850
      mInfo("stream:%s, set status to failed success because of timeout", pStream->name);
×
851
    }
UNCOV
852
    taosWUnLockLatch(&pStream->lock);
×
UNCOV
853
    sdbRelease(pMnode->pSdb, pStream);
×
854
  }
855

UNCOV
856
  return 0;
×
857
}
858

UNCOV
859
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq) {
×
UNCOV
860
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
861
  SStreamObj *pStream = NULL;
×
UNCOV
862
  int32_t     code = TSDB_CODE_SUCCESS;
×
UNCOV
863
  int32_t     errCode = *(int32_t*)pReq->pCont;
×
864
  char streamName[TSDB_STREAM_FNAME_LEN] = {0};
×
UNCOV
865
  memcpy(streamName, POINTER_SHIFT(pReq->pCont,INT_BYTES), TMIN(pReq->contLen - INT_BYTES, TSDB_STREAM_FNAME_LEN - 1));
×
866

867
#ifdef WINDOWS
868
  code = TSDB_CODE_MND_INVALID_PLATFORM;
869
  return code;
870
#endif
871

UNCOV
872
  mInfo("stream:%s, start to set stream failed", streamName);
×
873

UNCOV
874
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
UNCOV
875
  if (pStream == NULL) {
×
UNCOV
876
    mError("stream:%s, failed to get stream when failed stream since %s", streamName, tstrerror(code));
×
UNCOV
877
    return code;
×
878
  }
879

880
  taosWLockLatch(&pStream->lock);
×
UNCOV
881
  pStream->status = STREAM_STATUS__FAILED;
×
UNCOV
882
  tstrncpy(pStream->reserve, tstrerror(errCode), sizeof(pStream->reserve));
×
UNCOV
883
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
884
  mndReleaseStream(pMnode, pStream);
×
885

886
  mInfo("stream:%s, end to set stream failed success", streamName);
×
887

888
  return code;
×
889
}
890

UNCOV
891
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
×
UNCOV
892
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
893
  SStreamObj *pStream = NULL;
×
UNCOV
894
  SStreamObj  streamObj = {0};
×
UNCOV
895
  char       *sql = NULL;
×
UNCOV
896
  int32_t     sqlLen = 0;
×
UNCOV
897
  const char *pMsg = "create stream tasks on dnodes";
×
UNCOV
898
  int32_t     code = TSDB_CODE_SUCCESS;
×
899
  int32_t     lino = 0;
×
900
  STrans     *pTrans = NULL;
×
901

UNCOV
902
  SCMCreateStreamReq createReq = {0};
×
UNCOV
903
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
×
UNCOV
904
  TSDB_CHECK_CODE(code, lino, _OVER);
×
905

906
#ifdef WINDOWS
907
  code = TSDB_CODE_MND_INVALID_PLATFORM;
908
  goto _OVER;
909
#endif
910

UNCOV
911
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
×
UNCOV
912
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
×
UNCOV
913
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
914
    goto _OVER;
×
915
  }
916

UNCOV
917
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
×
UNCOV
918
  if (pStream != NULL && code == 0) {
×
UNCOV
919
    if (pStream->tasks != NULL){
×
UNCOV
920
      if (createReq.igExists) {
×
UNCOV
921
        mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
×
UNCOV
922
        mndReleaseStream(pMnode, pStream);
×
UNCOV
923
        tFreeSCMCreateStreamReq(&createReq);
×
UNCOV
924
        return code;
×
925
      } else {
926
        code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
927
        goto _OVER;
×
928
      }
929
    }
UNCOV
930
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
UNCOV
931
    goto _OVER;
×
932
  }
933

934
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
×
935
    goto _OVER;
×
936
  }
937

UNCOV
938
  if (createReq.sql != NULL) {
×
UNCOV
939
    sql = taosStrdup(createReq.sql);
×
UNCOV
940
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
×
941
  }
942

943
  // check for the taskEp update trans
UNCOV
944
  if (isNodeUpdateTransActive()) {
×
UNCOV
945
    mError("stream:%s failed to create stream, node update trans is active", createReq.name);
×
UNCOV
946
    code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
947
    goto _OVER;
×
948
  }
949

UNCOV
950
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
×
UNCOV
951
  if (pSourceDb == NULL) {
×
952
    code = terrno;
×
953
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
954
          tstrerror(code));
UNCOV
955
    goto _OVER;
×
956
  }
957

UNCOV
958
  code = mndCheckForSnode(pMnode, pSourceDb);
×
UNCOV
959
  mndReleaseDb(pMnode, pSourceDb);
×
UNCOV
960
  if (code != 0) {
×
UNCOV
961
    goto _OVER;
×
962
  }
963

964
  // build stream obj from request
UNCOV
965
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
×
966
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
967
    goto _OVER;
×
968
  }
969

UNCOV
970
  bool buildEmptyStream = false;
×
UNCOV
971
  if (createReq.lastTs == 0 && createReq.fillHistory != STREAM_FILL_HISTORY_OFF){
×
UNCOV
972
    streamObj.status = STREAM_STATUS__INIT;
×
UNCOV
973
    buildEmptyStream = true;
×
974
  }
975

976
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
×
977
    goto _OVER;
×
978
  }
979

UNCOV
980
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
×
UNCOV
981
    goto _OVER;
×
982
  }
983

984
  code = doStreamCheck(pMnode, &streamObj);
×
UNCOV
985
  TSDB_CHECK_CODE(code, lino, _OVER);
×
986

987
  // schedule stream task for stream obj
UNCOV
988
  if (!buildEmptyStream) {
×
989
    code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
×
UNCOV
990
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
991
      mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
UNCOV
992
      goto _OVER;
×
993
    }
994
    // add notify info into all stream tasks
UNCOV
995
    code = addStreamNotifyInfo(&createReq, &streamObj);
×
UNCOV
996
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
997
      mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
×
UNCOV
998
      goto _OVER;
×
999
    }
1000

1001
    // add into buffer firstly
1002
    // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
UNCOV
1003
    streamMutexLock(&execInfo.lock);
×
UNCOV
1004
    mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
×
UNCOV
1005
    saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
×
UNCOV
1006
    streamMutexUnlock(&execInfo.lock);
×
1007
  }
1008

UNCOV
1009
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
×
UNCOV
1010
  if (pTrans == NULL || code) {
×
UNCOV
1011
    goto _OVER;
×
1012
  }
1013

1014
  // create stb for stream
1015
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE && !buildEmptyStream) {
×
1016
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
×
1017
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
1018
      goto _OVER;
×
1019
    }
1020
  } else {
1021
    mDebug("stream:%s no need create stable", createReq.name);
×
1022
  }
1023

1024
  // add stream to trans
1025
  code = mndPersistStream(pTrans, &streamObj);
×
1026
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1027
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
1028
    goto _OVER;
×
1029
  }
1030

1031
  // execute creation
1032
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1033
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1034
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
1035
    goto _OVER;
×
1036
  }
1037

1038
  SName dbname = {0};
×
1039
  if (tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) != 0) {
×
UNCOV
1040
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
1041
  }
1042

1043
  SName name = {0};
×
1044
  if (tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE) != 0) {
×
1045
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
1046
  }
1047

1048
  // reuse this function for stream
1049
  if (sql != NULL && sqlLen > 0) {
×
1050
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
1051
  } else {
1052
    char detail[1000] = {0};
×
1053
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
×
UNCOV
1054
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
×
1055
  }
1056

1057
_OVER:
×
UNCOV
1058
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1059
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
×
1060
  } else {
1061
    mDebug("stream:%s create stream completed", createReq.name);
×
1062
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1063
  }
1064

1065
  mndTransDrop(pTrans);
×
1066
  mndReleaseStream(pMnode, pStream);
×
1067
  tFreeSCMCreateStreamReq(&createReq);
×
1068
  tFreeStreamObj(&streamObj);
×
1069

UNCOV
1070
  if (sql != NULL) {
×
UNCOV
1071
    taosMemoryFreeClear(sql);
×
1072
  }
1073

1074
  return code;
×
1075
}
1076

1077
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
1078
  SMnode          *pMnode = pReq->info.node;
×
UNCOV
1079
  SStreamObj      *pStream = NULL;
×
UNCOV
1080
  int32_t          code = 0;
×
1081
  SMPauseStreamReq pauseReq = {0};
×
1082

1083
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1084
    return TSDB_CODE_INVALID_MSG;
×
1085
  }
1086

UNCOV
1087
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
UNCOV
1088
  if (pStream == NULL || code != 0) {
×
1089
    if (pauseReq.igNotExists) {
×
1090
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
UNCOV
1091
      return 0;
×
1092
    } else {
UNCOV
1093
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
UNCOV
1094
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1095
    }
1096
  }
1097

UNCOV
1098
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
UNCOV
1099
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
UNCOV
1100
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1101
    return code;
×
1102
  }
1103

1104
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
1105
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
UNCOV
1106
  if (code) {
×
UNCOV
1107
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1108
    return code;
×
1109
  }
1110

UNCOV
1111
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
UNCOV
1112
  if (updated) {
×
UNCOV
1113
    mError("tasks are not ready for restart, node update detected");
×
UNCOV
1114
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1115
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1116
  }
1117

UNCOV
1118
  STrans *pTrans = NULL;
×
UNCOV
1119
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
1120
                       &pTrans);
1121
  if (pTrans == NULL || code) {
×
UNCOV
1122
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1123
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1124
    return code;
×
1125
  }
1126

UNCOV
1127
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
UNCOV
1128
  if (code) {
×
UNCOV
1129
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1130
    mndTransDrop(pTrans);
×
UNCOV
1131
    return code;
×
1132
  }
1133

1134
  // if nodeUpdate happened, not send pause trans
UNCOV
1135
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
UNCOV
1136
  if (code) {
×
UNCOV
1137
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
1138
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1139
    mndTransDrop(pTrans);
×
1140
    return code;
×
1141
  }
1142

UNCOV
1143
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1144
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1145
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1146
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1147
    mndTransDrop(pTrans);
×
UNCOV
1148
    return code;
×
1149
  }
1150

UNCOV
1151
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1152
  mndTransDrop(pTrans);
×
1153

UNCOV
1154
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1155
}
1156

UNCOV
1157
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
×
UNCOV
1158
  SStreamObj *pStream = NULL;
×
UNCOV
1159
  void       *pIter = NULL;
×
UNCOV
1160
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1161
  int64_t     maxChkptId = 0;
×
1162

1163
  while (1) {
UNCOV
1164
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
1165
    if (pIter == NULL) break;
×
1166

UNCOV
1167
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
×
UNCOV
1168
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
×
1169
           pStream->checkpointId);
UNCOV
1170
    sdbRelease(pSdb, pStream);
×
1171
  }
1172

1173
  {  // check the max checkpoint id from all vnodes.
UNCOV
1174
    int64_t maxCheckpointId = -1;
×
UNCOV
1175
    if (lock) {
×
1176
      streamMutexLock(&execInfo.lock);
×
1177
    }
1178

UNCOV
1179
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
UNCOV
1180
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
1181
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
1182
      if (p == NULL || pEntry == NULL) {
×
UNCOV
1183
        continue;
×
1184
      }
1185

UNCOV
1186
      if (pEntry->checkpointInfo.failed) {
×
UNCOV
1187
        continue;
×
1188
      }
1189

UNCOV
1190
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
×
UNCOV
1191
        maxCheckpointId = pEntry->checkpointInfo.latestId;
×
1192
      }
1193
    }
1194

UNCOV
1195
    if (lock) {
×
UNCOV
1196
      streamMutexUnlock(&execInfo.lock);
×
1197
    }
1198

1199
    if (maxCheckpointId > maxChkptId) {
×
UNCOV
1200
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1201
             maxCheckpointId);
UNCOV
1202
      maxChkptId = maxCheckpointId;
×
1203
    }
1204
  }
1205

UNCOV
1206
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
×
UNCOV
1207
  return maxChkptId + 1;
×
1208
}
1209

UNCOV
1210
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
×
1211
                                               int8_t mndTrigger, bool lock) {
UNCOV
1212
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1213
  bool    conflict = false;
×
UNCOV
1214
  int64_t ts = taosGetTimestampMs();
×
1215
  STrans *pTrans = NULL;
×
1216

UNCOV
1217
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
×
UNCOV
1218
    return code;
×
1219
  }
1220

UNCOV
1221
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
×
UNCOV
1222
  if (code) {
×
UNCOV
1223
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1224
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
UNCOV
1225
    goto _ERR;
×
1226
  }
1227

UNCOV
1228
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
×
1229
                       "gen checkpoint for stream", &pTrans);
UNCOV
1230
  if (code) {
×
UNCOV
1231
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1232
           tstrerror(code));
UNCOV
1233
    goto _ERR;
×
1234
  }
1235

UNCOV
1236
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
×
UNCOV
1237
  if (code) {
×
UNCOV
1238
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
UNCOV
1239
    goto _ERR;
×
1240
  }
1241

UNCOV
1242
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
×
1243

UNCOV
1244
  taosWLockLatch(&pStream->lock);
×
UNCOV
1245
  pStream->currentTick = 1;
×
1246

1247
  // 1. redo action: broadcast checkpoint source msg for all source vg
UNCOV
1248
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
×
UNCOV
1249
  for (int32_t i = 0; i < totalLevel; i++) {
×
UNCOV
1250
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
×
UNCOV
1251
    SStreamTask *p = taosArrayGetP(pLevel, 0);
×
1252

1253
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
×
UNCOV
1254
      int32_t sz = taosArrayGetSize(pLevel);
×
UNCOV
1255
      for (int32_t j = 0; j < sz; j++) {
×
UNCOV
1256
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
×
UNCOV
1257
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
×
1258

UNCOV
1259
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1260
          taosWUnLockLatch(&pStream->lock);
×
UNCOV
1261
          goto _ERR;
×
1262
        }
1263
      }
1264
    }
1265
  }
1266

1267
  // 2. reset tick
UNCOV
1268
  pStream->checkpointId = checkpointId;
×
UNCOV
1269
  pStream->checkpointFreq = taosGetTimestampMs();
×
UNCOV
1270
  pStream->currentTick = 0;
×
1271

1272
  // 3. commit log: stream checkpoint info
UNCOV
1273
  pStream->version = pStream->version + 1;
×
UNCOV
1274
  taosWUnLockLatch(&pStream->lock);
×
1275

UNCOV
1276
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
×
UNCOV
1277
    goto _ERR;
×
1278
  }
1279

UNCOV
1280
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1281
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1282
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1283
  } else {
UNCOV
1284
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1285
  }
1286

UNCOV
1287
_ERR:
×
UNCOV
1288
  mndTransDrop(pTrans);
×
UNCOV
1289
  return code;
×
1290
}
1291

UNCOV
1292
int32_t extractStreamNodeList(SMnode *pMnode) {
×
UNCOV
1293
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
×
UNCOV
1294
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
×
UNCOV
1295
    if (code) {
×
UNCOV
1296
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
1297
      return code;
×
1298
    }
1299
  }
1300

UNCOV
1301
  return taosArrayGetSize(execInfo.pNodeList);
×
1302
}
1303

UNCOV
1304
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
×
UNCOV
1305
  int32_t code = 0;
×
UNCOV
1306
  if (mndStreamNodeIsUpdated(pMnode)) {
×
UNCOV
1307
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1308
  }
1309

UNCOV
1310
  streamMutexLock(&execInfo.lock);
×
UNCOV
1311
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
×
UNCOV
1312
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
×
UNCOV
1313
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
×
UNCOV
1314
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
UNCOV
1315
      code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1316
    }
1317
  }
1318

UNCOV
1319
  streamMutexUnlock(&execInfo.lock);
×
UNCOV
1320
  return code;
×
1321
}
1322

UNCOV
1323
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
×
UNCOV
1324
  int64_t ts = -1;
×
UNCOV
1325
  int32_t taskId = -1;
×
1326

UNCOV
1327
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
×
UNCOV
1328
    STaskId          *p = taosArrayGet(pTaskList, i);
×
UNCOV
1329
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
1330
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
×
UNCOV
1331
      continue;
×
1332
    }
1333

1334
    // -1 denote not ready now or never ready till now
UNCOV
1335
    if (pEntry->hTaskId != 0) {
×
UNCOV
1336
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
×
1337
            " exists, checkpoint not issued",
1338
            pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1339
            pEntry->hTaskId);
UNCOV
1340
      return -1;
×
1341
    }
1342

UNCOV
1343
    if (pEntry->status != TASK_STATUS__READY) {
×
UNCOV
1344
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
×
1345
            (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
UNCOV
1346
      return -1;
×
1347
    }
1348

UNCOV
1349
    if (ts < pEntry->startTime) {
×
UNCOV
1350
      ts = pEntry->startTime;
×
UNCOV
1351
      taskId = pEntry->id.taskId;
×
1352
    }
1353
  }
1354

UNCOV
1355
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
×
UNCOV
1356
  return ts;
×
1357
}
1358

1359
typedef struct {
1360
  int64_t streamId;
1361
  int64_t duration;
1362
} SCheckpointInterval;
1363

UNCOV
1364
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
×
UNCOV
1365
  const SCheckpointInterval *pInt1 = p1;
×
UNCOV
1366
  const SCheckpointInterval *pInt2 = p2;
×
1367
  if (pInt1->duration == pInt2->duration) {
×
UNCOV
1368
    return 0;
×
1369
  }
1370

1371
  return pInt1->duration > pInt2->duration ? -1 : 1;
×
1372
}
1373

1374
// all tasks of this stream should be ready, otherwise do nothing
UNCOV
1375
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
×
1376
  bool ready = false;
×
1377

1378
  streamMutexLock(&execInfo.lock);
×
1379

1380
  int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
×
UNCOV
1381
  if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
×
1382

UNCOV
1383
    if (lastReadyTs != -1) {
×
UNCOV
1384
      mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64
×
1385
            "ms less than threshold",
1386
            pStream->uid, lastReadyTs, (now - lastReadyTs));
1387
    }
1388

UNCOV
1389
    ready = false;
×
1390
  } else {
UNCOV
1391
    ready = true;
×
1392
  }
1393

UNCOV
1394
  streamMutexUnlock(&execInfo.lock);
×
UNCOV
1395
  return ready;
×
1396
}
1397

UNCOV
1398
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
×
UNCOV
1399
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1400
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1401
  void       *pIter = NULL;
×
UNCOV
1402
  SStreamObj *pStream = NULL;
×
UNCOV
1403
  int32_t     code = 0;
×
UNCOV
1404
  int32_t     numOfCheckpointTrans = 0;
×
UNCOV
1405
  SArray     *pLongChkpts = NULL;
×
UNCOV
1406
  SArray     *pList = NULL;
×
1407
  int64_t     now = taosGetTimestampMs();
×
1408

UNCOV
1409
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
×
UNCOV
1410
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1411
  }
1412

UNCOV
1413
  pList = taosArrayInit(4, sizeof(SCheckpointInterval));
×
UNCOV
1414
  if (pList == NULL) {
×
UNCOV
1415
    mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
UNCOV
1416
    return terrno;
×
1417
  }
1418

UNCOV
1419
  pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo));
×
UNCOV
1420
  if (pLongChkpts == NULL) {
×
UNCOV
1421
    mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
1422
    taosArrayDestroy(pList);
×
UNCOV
1423
    return terrno;
×
1424
  }
1425

1426
  // check if ongong checkpoint trans or long chkpt trans exist.
1427
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts);
×
UNCOV
1428
  if (code) {
×
UNCOV
1429
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1430

UNCOV
1431
    taosArrayDestroy(pList);
×
UNCOV
1432
    taosArrayDestroy(pLongChkpts);
×
UNCOV
1433
    return code;
×
1434
  }
1435

1436
  // kill long exec checkpoint and set task status
UNCOV
1437
  if (taosArrayGetSize(pLongChkpts) > 0) {
×
UNCOV
1438
    killChkptAndResetStreamTask(pMnode, pLongChkpts);
×
1439

UNCOV
1440
    taosArrayDestroy(pList);
×
UNCOV
1441
    taosArrayDestroy(pLongChkpts);
×
1442
    return TSDB_CODE_SUCCESS;
×
1443
  }
1444

UNCOV
1445
  taosArrayDestroy(pLongChkpts);
×
1446

UNCOV
1447
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
UNCOV
1448
    int64_t duration = now - pStream->checkpointFreq;
×
UNCOV
1449
    if (duration < tsStreamCheckpointInterval * 1000) {
×
UNCOV
1450
      sdbRelease(pSdb, pStream);
×
UNCOV
1451
      continue;
×
1452
    }
1453

UNCOV
1454
    bool ready = isStreamReadyHelp(now, pStream);
×
UNCOV
1455
    if (!ready) {
×
UNCOV
1456
      sdbRelease(pSdb, pStream);
×
UNCOV
1457
      continue;
×
1458
    }
1459

1460
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
×
UNCOV
1461
    void               *p = taosArrayPush(pList, &in);
×
UNCOV
1462
    if (p) {
×
UNCOV
1463
      int32_t currentSize = taosArrayGetSize(pList);
×
UNCOV
1464
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
×
1465
             "s), concurrently launch threshold:%d",
1466
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1467
             tsMaxConcurrentCheckpoint);
1468
    } else {
UNCOV
1469
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1470
    }
UNCOV
1471
    sdbRelease(pSdb, pStream);
×
1472
  }
1473

UNCOV
1474
  int32_t size = taosArrayGetSize(pList);
×
UNCOV
1475
  if (size == 0) {
×
1476
    taosArrayDestroy(pList);
×
1477
    return code;
×
1478
  }
1479

UNCOV
1480
  taosArraySort(pList, streamWaitComparFn);
×
1481

UNCOV
1482
  int32_t numOfQual = taosArrayGetSize(pList);
×
UNCOV
1483
  if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
×
UNCOV
1484
    mDebug(
×
1485
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1486
        "checkpoint trans are not allowed, wait for 30s",
1487
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
UNCOV
1488
    taosArrayDestroy(pList);
×
UNCOV
1489
    return code;
×
1490
  }
1491

UNCOV
1492
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
×
UNCOV
1493
  mDebug(
×
1494
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1495
      "concurrent trans threshold:%d",
1496
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1497

UNCOV
1498
  int32_t started = 0;
×
UNCOV
1499
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
×
1500

UNCOV
1501
  for (int32_t i = 0; i < numOfQual; ++i) {
×
UNCOV
1502
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
×
UNCOV
1503
    if (pCheckpointInfo == NULL) {
×
UNCOV
1504
      continue;
×
1505
    }
1506

UNCOV
1507
    SStreamObj *p = NULL;
×
UNCOV
1508
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
×
UNCOV
1509
    if (p != NULL && code == 0) {
×
UNCOV
1510
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
×
UNCOV
1511
      sdbRelease(pSdb, p);
×
1512

UNCOV
1513
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1514
        started += 1;
×
1515

UNCOV
1516
        if (started >= capacity) {
×
UNCOV
1517
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
×
1518
                 (started + numOfCheckpointTrans));
UNCOV
1519
          break;
×
1520
        }
1521
      } else {
UNCOV
1522
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1523
      }
1524
    }
1525
  }
1526

1527
  taosArrayDestroy(pList);
×
1528
  return code;
×
1529
}
1530

UNCOV
1531
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
×
UNCOV
1532
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1533
  SStreamObj *pStream = NULL;
×
1534
  int32_t     code = 0;
×
1535

1536
  SMDropStreamReq dropReq = {0};
×
UNCOV
1537
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
×
UNCOV
1538
    mError("invalid drop stream msg recv, discarded");
×
UNCOV
1539
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1540
    TAOS_RETURN(code);
×
1541
  }
1542

1543
  mDebug("recv drop stream:%s msg", dropReq.name);
×
1544

1545
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
×
UNCOV
1546
  if (pStream == NULL || code != 0) {
×
UNCOV
1547
    if (dropReq.igNotExists) {
×
UNCOV
1548
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
UNCOV
1549
      sdbRelease(pMnode->pSdb, pStream);
×
1550
      tFreeMDropStreamReq(&dropReq);
×
1551
      return 0;
×
1552
    } else {
1553
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
1554
      tFreeMDropStreamReq(&dropReq);
×
UNCOV
1555
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1556
    }
1557
  }
1558

UNCOV
1559
  if (pStream->smaId != 0) {
×
1560
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
×
1561

1562
    void    *pIter = NULL;
×
1563
    SSmaObj *pSma = NULL;
×
1564
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
UNCOV
1565
    while (pIter) {
×
UNCOV
1566
      if (pSma && pSma->uid == pStream->smaId) {
×
UNCOV
1567
        sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
1568
        sdbRelease(pMnode->pSdb, pStream);
×
1569

1570
        sdbCancelFetch(pMnode->pSdb, pIter);
×
1571
        tFreeMDropStreamReq(&dropReq);
×
1572
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
1573

UNCOV
1574
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
×
1575
               dropReq.name, pStream->uid, tstrerror(terrno));
UNCOV
1576
        TAOS_RETURN(code);
×
1577
      }
1578

1579
      if (pSma) {
×
1580
        sdbRelease(pMnode->pSdb, pSma);
×
1581
      }
1582

UNCOV
1583
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1584
    }
1585
  }
1586

UNCOV
1587
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
×
1588
    sdbRelease(pMnode->pSdb, pStream);
×
1589
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1590
    return -1;
×
1591
  }
1592

1593
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
1594
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
×
UNCOV
1595
  if (code) {
×
UNCOV
1596
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1597
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1598
    return code;
×
1599
  }
1600

UNCOV
1601
  STrans *pTrans = NULL;
×
UNCOV
1602
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
×
UNCOV
1603
  if (pTrans == NULL || code) {
×
UNCOV
1604
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
UNCOV
1605
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1606
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1607
    TAOS_RETURN(code);
×
1608
  }
1609

UNCOV
1610
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
×
UNCOV
1611
  if (code) {
×
UNCOV
1612
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
UNCOV
1613
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1614
    mndTransDrop(pTrans);
×
UNCOV
1615
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1616
    TAOS_RETURN(code);
×
1617
  }
1618

1619
  // drop all tasks
UNCOV
1620
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
×
UNCOV
1621
  if (code) {
×
UNCOV
1622
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
UNCOV
1623
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1624
    mndTransDrop(pTrans);
×
UNCOV
1625
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1626
    TAOS_RETURN(code);
×
1627
  }
1628

1629
  // drop stream
UNCOV
1630
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
×
UNCOV
1631
  if (code) {
×
UNCOV
1632
    sdbRelease(pMnode->pSdb, pStream);
×
1633
    mndTransDrop(pTrans);
×
1634
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1635
    TAOS_RETURN(code);
×
1636
  }
1637

UNCOV
1638
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1639
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1640
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1641
    sdbRelease(pMnode->pSdb, pStream);
×
1642
    mndTransDrop(pTrans);
×
1643
    tFreeMDropStreamReq(&dropReq);
×
1644
    TAOS_RETURN(code);
×
1645
  }
1646

1647
  // kill the related checkpoint trans
UNCOV
1648
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
×
UNCOV
1649
  if (transId != 0) {
×
UNCOV
1650
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
UNCOV
1651
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1652
  }
1653

UNCOV
1654
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
×
1655
         pStream->uid, transId);
1656

UNCOV
1657
  removeStreamTasksInBuf(pStream, &execInfo);
×
1658

UNCOV
1659
  SName name = {0};
×
UNCOV
1660
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1661
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
×
1662

UNCOV
1663
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1664
  mndTransDrop(pTrans);
×
UNCOV
1665
  tFreeMDropStreamReq(&dropReq);
×
1666

UNCOV
1667
  if (code == 0) {
×
UNCOV
1668
    return TSDB_CODE_ACTION_IN_PROGRESS;
×
1669
  } else {
UNCOV
1670
    TAOS_RETURN(code);
×
1671
  }
1672
}
1673

1674
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
8✔
1675
  SSdb   *pSdb = pMnode->pSdb;
8✔
1676
  void   *pIter = NULL;
8✔
1677
  int32_t code = 0;
8✔
1678

1679
  while (1) {
×
1680
    SStreamObj *pStream = NULL;
8✔
1681
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
8✔
1682
    if (pIter == NULL) break;
8!
1683

UNCOV
1684
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
×
UNCOV
1685
      if (pStream->sourceDbUid != pStream->targetDbUid) {
×
UNCOV
1686
        sdbRelease(pSdb, pStream);
×
UNCOV
1687
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1688
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
×
1689
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
UNCOV
1690
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
×
1691
      } else {
1692
        // kill the related checkpoint trans
UNCOV
1693
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
×
UNCOV
1694
        if (transId != 0) {
×
UNCOV
1695
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
UNCOV
1696
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1697
        }
1698

1699
        // drop the stream obj in execInfo
UNCOV
1700
        removeStreamTasksInBuf(pStream, &execInfo);
×
1701

UNCOV
1702
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
×
UNCOV
1703
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1704
          sdbRelease(pSdb, pStream);
×
UNCOV
1705
          sdbCancelFetch(pSdb, pIter);
×
1706
          return code;
×
1707
        }
1708
      }
1709
    }
1710

UNCOV
1711
    sdbRelease(pSdb, pStream);
×
1712
  }
1713

1714
  return 0;
8✔
1715
}
1716

UNCOV
1717
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
1718
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1719
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1720
  int32_t     numOfRows = 0;
×
UNCOV
1721
  SStreamObj *pStream = NULL;
×
UNCOV
1722
  int32_t     code = 0;
×
1723

1724
  while (numOfRows < rows) {
×
1725
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
1726
    if (pShow->pIter == NULL) break;
×
1727

UNCOV
1728
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
×
UNCOV
1729
    if (code == 0) {
×
UNCOV
1730
      numOfRows++;
×
1731
    }
UNCOV
1732
    sdbRelease(pSdb, pStream);
×
1733
  }
1734

1735
  pShow->numOfRows += numOfRows;
×
UNCOV
1736
  return numOfRows;
×
1737
}
1738

UNCOV
1739
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
UNCOV
1740
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1741
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
1742
}
×
1743

UNCOV
1744
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
UNCOV
1745
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1746
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1747
  int32_t     numOfRows = 0;
×
UNCOV
1748
  SStreamObj *pStream = NULL;
×
UNCOV
1749
  int32_t     code = 0;
×
1750

UNCOV
1751
  streamMutexLock(&execInfo.lock);
×
UNCOV
1752
  mndInitStreamExecInfo(pMnode, &execInfo);
×
UNCOV
1753
  streamMutexUnlock(&execInfo.lock);
×
1754

UNCOV
1755
  while (numOfRows < rowsCapacity) {
×
1756
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
1757
    if (pShow->pIter == NULL) {
×
1758
      break;
×
1759
    }
1760

1761
    // lock
UNCOV
1762
    taosRLockLatch(&pStream->lock);
×
1763

UNCOV
1764
    int32_t count = mndGetNumOfStreamTasks(pStream);
×
UNCOV
1765
    if (numOfRows + count > rowsCapacity) {
×
UNCOV
1766
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
×
UNCOV
1767
      if (code) {
×
1768
        mError("failed to prepare the result block buffer, quit return value");
×
UNCOV
1769
        taosRUnLockLatch(&pStream->lock);
×
UNCOV
1770
        sdbRelease(pSdb, pStream);
×
UNCOV
1771
        continue;
×
1772
      }
1773
    }
1774

UNCOV
1775
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
×
UNCOV
1776
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
×
UNCOV
1777
    if (pSourceDb != NULL) {
×
UNCOV
1778
      precision = pSourceDb->cfg.precision;
×
UNCOV
1779
      mndReleaseDb(pMnode, pSourceDb);
×
1780
    }
1781

1782
    // add row for each task
UNCOV
1783
    SStreamTaskIter *pIter = NULL;
×
UNCOV
1784
    code = createStreamTaskIter(pStream, &pIter);
×
1785
    if (code) {
×
1786
      taosRUnLockLatch(&pStream->lock);
×
UNCOV
1787
      sdbRelease(pSdb, pStream);
×
UNCOV
1788
      mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
1789
      continue;
×
1790
    }
1791

1792
    while (streamTaskIterNextTask(pIter)) {
×
1793
      SStreamTask *pTask = NULL;
×
UNCOV
1794
      code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
1795
      if (code) {
×
UNCOV
1796
        destroyStreamTaskIter(pIter);
×
UNCOV
1797
        break;
×
1798
      }
1799

1800
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
×
UNCOV
1801
      if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
1802
        numOfRows++;
×
1803
      }
1804
    }
1805

UNCOV
1806
    pBlock->info.rows = numOfRows;
×
1807

UNCOV
1808
    destroyStreamTaskIter(pIter);
×
UNCOV
1809
    taosRUnLockLatch(&pStream->lock);
×
1810

1811
    sdbRelease(pSdb, pStream);
×
1812
  }
1813

UNCOV
1814
  pShow->numOfRows += numOfRows;
×
UNCOV
1815
  return numOfRows;
×
1816
}
1817

UNCOV
1818
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
UNCOV
1819
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1820
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
1821
}
×
1822

UNCOV
1823
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
×
UNCOV
1824
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1825
  SStreamObj *pStream = NULL;
×
UNCOV
1826
  int32_t     code = 0;
×
1827

UNCOV
1828
  SMPauseStreamReq pauseReq = {0};
×
UNCOV
1829
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
UNCOV
1830
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1831
  }
1832

UNCOV
1833
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
1834
  if (pStream == NULL || code != 0) {
×
1835
    if (pauseReq.igNotExists) {
×
1836
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
×
UNCOV
1837
      return 0;
×
1838
    } else {
UNCOV
1839
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
×
UNCOV
1840
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1841
    }
1842
  }
1843

UNCOV
1844
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
×
1845

UNCOV
1846
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
UNCOV
1847
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1848
    return code;
×
1849
  }
1850

1851
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
1852
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
×
UNCOV
1853
  if (code) {
×
UNCOV
1854
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1855
    TAOS_RETURN(code);
×
1856
  }
1857

1858
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
UNCOV
1859
  if (updated) {
×
UNCOV
1860
    mError("tasks are not ready for pause, node update detected");
×
UNCOV
1861
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1862
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1863
  }
1864

1865
  {  // check for tasks, if tasks are not ready, not allowed to pause
1866
    bool found = false;
×
1867
    bool readyToPause = true;
×
UNCOV
1868
    streamMutexLock(&execInfo.lock);
×
1869

UNCOV
1870
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
UNCOV
1871
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
1872
      if (p == NULL) {
×
UNCOV
1873
        continue;
×
1874
      }
1875

1876
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
1877
      if (pEntry == NULL) {
×
UNCOV
1878
        continue;
×
1879
      }
1880

UNCOV
1881
      if (pEntry->id.streamId != pStream->uid) {
×
UNCOV
1882
        continue;
×
1883
      }
1884

1885
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
×
1886
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
×
1887
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
UNCOV
1888
        readyToPause = false;
×
1889
      }
1890

UNCOV
1891
      found = true;
×
1892
    }
1893

UNCOV
1894
    streamMutexUnlock(&execInfo.lock);
×
UNCOV
1895
    if (!found) {
×
UNCOV
1896
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
UNCOV
1897
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1898
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1899
    }
1900

UNCOV
1901
    if (!readyToPause) {
×
1902
      mError("stream:%s task not ready for pause yet", pauseReq.name);
×
UNCOV
1903
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1904
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1905
    }
1906
  }
1907

UNCOV
1908
  STrans *pTrans = NULL;
×
UNCOV
1909
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
×
UNCOV
1910
  if (pTrans == NULL || code) {
×
UNCOV
1911
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1912
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1913
    return code;
×
1914
  }
1915

UNCOV
1916
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
×
UNCOV
1917
  if (code) {
×
UNCOV
1918
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1919
    mndTransDrop(pTrans);
×
UNCOV
1920
    return code;
×
1921
  }
1922

1923
  // if nodeUpdate happened, not send pause trans
1924
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
×
1925
  if (code) {
×
UNCOV
1926
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1927
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1928
    mndTransDrop(pTrans);
×
UNCOV
1929
    return code;
×
1930
  }
1931

1932
  // pause stream
UNCOV
1933
  taosWLockLatch(&pStream->lock);
×
UNCOV
1934
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
UNCOV
1935
  if (code) {
×
UNCOV
1936
    taosWUnLockLatch(&pStream->lock);
×
UNCOV
1937
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1938
    mndTransDrop(pTrans);
×
1939
    return code;
×
1940
  }
1941

UNCOV
1942
  taosWUnLockLatch(&pStream->lock);
×
1943

UNCOV
1944
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1945
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1946
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1947
    sdbRelease(pMnode->pSdb, pStream);
×
1948
    mndTransDrop(pTrans);
×
UNCOV
1949
    return code;
×
1950
  }
1951

UNCOV
1952
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1953
  mndTransDrop(pTrans);
×
1954

1955
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1956
}
1957

UNCOV
1958
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
×
UNCOV
1959
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1960
  SStreamObj *pStream = NULL;
×
UNCOV
1961
  int32_t     code = 0;
×
1962

UNCOV
1963
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
1964
    return code;
×
1965
  }
1966

1967
  SMResumeStreamReq resumeReq = {0};
×
1968
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
×
UNCOV
1969
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1970
  }
1971

UNCOV
1972
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
×
UNCOV
1973
  if (pStream == NULL || code != 0) {
×
1974
    if (resumeReq.igNotExists) {
×
1975
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
×
1976
      sdbRelease(pMnode->pSdb, pStream);
×
1977
      return 0;
×
1978
    } else {
UNCOV
1979
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
×
UNCOV
1980
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1981
    }
1982
  }
1983

UNCOV
1984
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
×
UNCOV
1985
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
×
1986
    sdbRelease(pMnode->pSdb, pStream);
×
1987
    return -1;
×
1988
  }
1989

1990
  // check if it is conflict with other trans in both sourceDb and targetDb.
1991
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
×
1992
  if (code) {
×
UNCOV
1993
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1994
    return code;
×
1995
  }
1996

1997
  STrans *pTrans = NULL;
×
1998
  code =
UNCOV
1999
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
×
2000
  if (pTrans == NULL || code) {
×
UNCOV
2001
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
2002
    sdbRelease(pMnode->pSdb, pStream);
×
2003
    return code;
×
2004
  }
2005

2006
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
×
UNCOV
2007
  if (code) {
×
2008
    sdbRelease(pMnode->pSdb, pStream);
×
2009
    mndTransDrop(pTrans);
×
UNCOV
2010
    return code;
×
2011
  }
2012

2013
  // set the resume action
2014
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
×
UNCOV
2015
  if (code) {
×
UNCOV
2016
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
UNCOV
2017
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2018
    mndTransDrop(pTrans);
×
UNCOV
2019
    return code;
×
2020
  }
2021

2022
  // resume stream
UNCOV
2023
  taosWLockLatch(&pStream->lock);
×
UNCOV
2024
  pStream->status = STREAM_STATUS__NORMAL;
×
UNCOV
2025
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
×
UNCOV
2026
    taosWUnLockLatch(&pStream->lock);
×
2027

UNCOV
2028
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2029
    mndTransDrop(pTrans);
×
UNCOV
2030
    return code;
×
2031
  }
2032

UNCOV
2033
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
2034
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
2035
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2036
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
2037
    sdbRelease(pMnode->pSdb, pStream);
×
2038
    mndTransDrop(pTrans);
×
UNCOV
2039
    return code;
×
2040
  }
2041

UNCOV
2042
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2043
  mndTransDrop(pTrans);
×
2044

UNCOV
2045
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
2046
}
2047

UNCOV
2048
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
UNCOV
2049
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
2050
  SStreamObj *pStream = NULL;
×
UNCOV
2051
  int32_t     code = 0;
×
2052

UNCOV
2053
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
2054
    return code;
×
2055
  }
2056

UNCOV
2057
  SMResetStreamReq resetReq = {0};
×
UNCOV
2058
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
UNCOV
2059
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
2060
  }
2061

UNCOV
2062
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
2063

2064
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
2065
  if (pStream == NULL || code != 0) {
×
2066
    if (resetReq.igNotExists) {
×
UNCOV
2067
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
UNCOV
2068
      return 0;
×
2069
    } else {
UNCOV
2070
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
UNCOV
2071
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2072
    }
2073
  }
2074

2075
  //todo(liao hao jun)
2076
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
2077
}
2078

UNCOV
2079
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes, STrans** pUpdateTrans) {
×
UNCOV
2080
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
2081
  void       *pIter = NULL;
×
UNCOV
2082
  STrans     *pTrans = NULL;
×
2083
  int32_t     code = 0;
×
UNCOV
2084
  *pUpdateTrans = NULL;
×
2085

2086
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
UNCOV
2087
  while (1) {
×
UNCOV
2088
    SStreamObj *pStream = NULL;
×
UNCOV
2089
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
2090
    if (pIter == NULL) {
×
UNCOV
2091
      break;
×
2092
    }
2093

2094
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
×
UNCOV
2095
    sdbRelease(pSdb, pStream);
×
2096

UNCOV
2097
    if (code) {
×
UNCOV
2098
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
UNCOV
2099
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
2100
      return code;
×
2101
    }
2102
  }
2103

UNCOV
2104
  while (1) {
×
UNCOV
2105
    SStreamObj *pStream = NULL;
×
UNCOV
2106
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
2107
    if (pIter == NULL) {
×
UNCOV
2108
      break;
×
2109
    }
2110

2111
    // here create only one trans
UNCOV
2112
    if (pTrans == NULL) {
×
2113
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
×
2114
                           "update task epsets", &pTrans);
UNCOV
2115
      if (pTrans == NULL || code) {
×
UNCOV
2116
        sdbRelease(pSdb, pStream);
×
UNCOV
2117
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
2118
        return terrno = code;
×
2119
      }
2120
    }
2121

UNCOV
2122
    if (!includeAllNodes) {
×
UNCOV
2123
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
×
UNCOV
2124
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
×
UNCOV
2125
      if (p1 == NULL && p2 == NULL) {
×
UNCOV
2126
        mDebug("stream:0x%" PRIx64 " %s not involved in nodeUpdate, ignore", pStream->uid, pStream->name);
×
2127
        sdbRelease(pSdb, pStream);
×
2128
        continue;
×
2129
      }
2130
    }
2131

UNCOV
2132
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
×
2133
           pStream->name, pTrans->id);
2134

2135
    // NOTE: for each stream, we register one trans entry for task update
UNCOV
2136
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
×
2137
    if (code) {
×
UNCOV
2138
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
2139
    }
2140

UNCOV
2141
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
×
2142

2143
    // todo: not continue, drop all and retry again
2144
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2145
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
2146
             tstrerror(code));
UNCOV
2147
      sdbRelease(pSdb, pStream);
×
UNCOV
2148
      continue;
×
2149
    }
2150

UNCOV
2151
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
UNCOV
2152
    sdbRelease(pSdb, pStream);
×
2153

UNCOV
2154
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2155
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
2156
      return code;
×
2157
    }
2158
  }
2159

2160
  // no need to build the trans to handle the vgroup update
UNCOV
2161
  *pUpdateTrans = pTrans;
×
UNCOV
2162
  return code;
×
2163
}
2164

2165
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
×
UNCOV
2166
  SSdb       *pSdb = pMnode->pSdb;
×
2167
  SStreamObj *pStream = NULL;
×
UNCOV
2168
  void       *pIter = NULL;
×
UNCOV
2169
  int32_t     code = 0;
×
2170

UNCOV
2171
  mDebug("start to refresh node list by existed streams");
×
2172

2173
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
UNCOV
2174
  if (pHash == NULL) {
×
UNCOV
2175
    return terrno;
×
2176
  }
2177

UNCOV
2178
  while (1) {
×
UNCOV
2179
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
2180
    if (pIter == NULL) {
×
UNCOV
2181
      break;
×
2182
    }
2183

UNCOV
2184
    taosWLockLatch(&pStream->lock);
×
2185

2186
    SStreamTaskIter *pTaskIter = NULL;
×
2187
    code = createStreamTaskIter(pStream, &pTaskIter);
×
2188
    if (code) {
×
2189
      taosWUnLockLatch(&pStream->lock);
×
2190
      sdbRelease(pSdb, pStream);
×
2191
      mError("failed to create task iter for stream:%s", pStream->name);
×
2192
      continue;
×
2193
    }
2194

2195
    while (streamTaskIterNextTask(pTaskIter)) {
×
2196
      SStreamTask *pTask = NULL;
×
UNCOV
2197
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
×
2198
      if (code) {
×
2199
        break;
×
2200
      }
2201

UNCOV
2202
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
×
2203
      epsetAssign(&entry.epset, &pTask->info.epSet);
×
UNCOV
2204
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
×
UNCOV
2205
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
×
UNCOV
2206
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2207
      }
2208
    }
2209

2210
    destroyStreamTaskIter(pTaskIter);
×
UNCOV
2211
    taosWUnLockLatch(&pStream->lock);
×
2212

UNCOV
2213
    sdbRelease(pSdb, pStream);
×
2214
  }
2215

2216
  taosArrayClear(pNodeList);
×
2217

2218
  // convert to list
UNCOV
2219
  pIter = NULL;
×
UNCOV
2220
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
×
2221
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
×
2222

2223
    void *p = taosArrayPush(pNodeList, pEntry);
×
2224
    if (p == NULL) {
×
UNCOV
2225
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
UNCOV
2226
      if (code == 0) {
×
UNCOV
2227
        code = terrno;
×
2228
      }
UNCOV
2229
      continue;
×
2230
    }
2231

UNCOV
2232
    char    buf[256] = {0};
×
UNCOV
2233
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
×
UNCOV
2234
    if (ret != 0) {                                                // print error and continue
×
UNCOV
2235
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2236
    }
2237

UNCOV
2238
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
×
2239
  }
2240

UNCOV
2241
  taosHashCleanup(pHash);
×
2242

UNCOV
2243
  mDebug("numOfvNodes:%d get after extracting nodeInfo from all streams", (int32_t)taosArrayGetSize(pNodeList));
×
UNCOV
2244
  return code;
×
2245
}
2246

UNCOV
2247
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
UNCOV
2248
  void   *pIter = NULL;
×
UNCOV
2249
  int32_t code = 0;
×
UNCOV
2250
  while (1) {
×
2251
    SVgObj *pVgroup = NULL;
×
2252
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
2253
    if (pIter == NULL) {
×
UNCOV
2254
      break;
×
2255
    }
2256

UNCOV
2257
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
UNCOV
2258
    sdbRelease(pSdb, pVgroup);
×
2259

UNCOV
2260
    if (code == 0) {
×
UNCOV
2261
      int32_t size = taosHashGetSize(pDBMap);
×
2262
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2263
    }
2264
  }
2265
}
×
2266

UNCOV
2267
static int32_t doProcessNodeCheckHelp(SArray *pNodeSnapshot, SMnode *pMnode, SVgroupChangeInfo *pChangeInfo,
×
2268
                                      bool *pUpdateAllVgroups) {
UNCOV
2269
  int32_t code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
×
2270
  if (code) {
×
UNCOV
2271
    mDebug("failed to remove expired node entry in buf, code:%s", tstrerror(code));
×
UNCOV
2272
    return code;
×
2273
  }
2274

UNCOV
2275
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, pChangeInfo);
×
UNCOV
2276
  if (code) {
×
UNCOV
2277
    mDebug("failed to find changed vnode(s) during vnode(s) check, code:%s", tstrerror(code));
×
UNCOV
2278
    return code;
×
2279
  }
2280

2281
  {
UNCOV
2282
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
×
UNCOV
2283
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
UNCOV
2284
      *pUpdateAllVgroups = true;
×
2285
      execInfo.switchFromFollower = false;  // reset the flag
×
UNCOV
2286
      addAllDbsIntoHashmap(pChangeInfo->pDBMap, pMnode->pSdb);
×
2287
    }
2288
  }
2289

UNCOV
2290
  if (taosArrayGetSize(pChangeInfo->pUpdateNodeList) > 0 || (*pUpdateAllVgroups)) {
×
2291
    // kill current active checkpoint transaction, since the transaction is vnode wide.
UNCOV
2292
    killAllCheckpointTrans(pMnode, pChangeInfo);
×
2293
  } else {
UNCOV
2294
    mDebug("no update found in vnode(s) list");
×
2295
  }
2296

UNCOV
2297
  return code;
×
2298
}
2299

2300
// this function runs by only one thread, so it is not multi-thread safe
2301
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
×
UNCOV
2302
  int32_t           code = 0;
×
UNCOV
2303
  bool              allReady = true;
×
UNCOV
2304
  SArray           *pNodeSnapshot = NULL;
×
UNCOV
2305
  SMnode           *pMnode = pMsg->info.node;
×
UNCOV
2306
  int64_t           tsms = taosGetTimestampMs();
×
UNCOV
2307
  int64_t           ts = tsms / 1000;
×
UNCOV
2308
  bool              updateAllVgroups = false;
×
UNCOV
2309
  SVgroupChangeInfo changeInfo = {0};
×
2310

UNCOV
2311
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
×
UNCOV
2312
  if (old != 0) {
×
UNCOV
2313
    mDebug("still in checking node change");
×
UNCOV
2314
    return 0;
×
2315
  }
2316

UNCOV
2317
  mDebug("start to do node changing check, ts:%" PRId64, tsms);
×
2318

UNCOV
2319
  streamMutexLock(&execInfo.lock);
×
UNCOV
2320
  int32_t numOfNodes = extractStreamNodeList(pMnode);
×
2321
  streamMutexUnlock(&execInfo.lock);
×
2322

UNCOV
2323
  if (numOfNodes == 0) {
×
UNCOV
2324
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
UNCOV
2325
    execInfo.ts = ts;
×
UNCOV
2326
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
UNCOV
2327
    return 0;
×
2328
  }
2329

UNCOV
2330
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
×
UNCOV
2331
  if (code) {
×
UNCOV
2332
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2333
  }
2334

UNCOV
2335
  if (!allReady) {
×
UNCOV
2336
    taosArrayDestroy(pNodeSnapshot);
×
UNCOV
2337
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
UNCOV
2338
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
×
UNCOV
2339
    return 0;
×
2340
  }
2341

UNCOV
2342
  streamMutexLock(&execInfo.lock);
×
UNCOV
2343
  code = doProcessNodeCheckHelp(pNodeSnapshot, pMnode, &changeInfo, &updateAllVgroups);
×
UNCOV
2344
  streamMutexUnlock(&execInfo.lock);
×
2345

UNCOV
2346
  if (code) {
×
UNCOV
2347
    goto _end;
×
2348
  }
2349

UNCOV
2350
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
×
UNCOV
2351
    mDebug("vnode(s) change detected, build trans to update stream task epsets");
×
2352

UNCOV
2353
    STrans *pTrans = NULL;
×
2354

UNCOV
2355
    streamMutexLock(&execInfo.lock);
×
UNCOV
2356
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans);
×
UNCOV
2357
    streamMutexUnlock(&execInfo.lock);
×
2358

2359
    // NOTE: sync trans out of lock
2360
    if (code == 0 && pTrans != NULL) {
×
UNCOV
2361
      code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
2362
      if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2363
        mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2364
      }
2365

UNCOV
2366
      mndTransDrop(pTrans);
×
2367
    }
2368

2369
    // keep the new vnode snapshot if success
UNCOV
2370
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2371
      streamMutexLock(&execInfo.lock);
×
2372

UNCOV
2373
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
×
UNCOV
2374
      int32_t num = (int)taosArrayGetSize(execInfo.pNodeList);
×
UNCOV
2375
      if (code == 0) {
×
UNCOV
2376
        execInfo.ts = ts;
×
UNCOV
2377
        mDebug("create trans successfully, update cached node list, numOfNodes:%d", num);
×
2378
      }
2379

UNCOV
2380
      streamMutexUnlock(&execInfo.lock);
×
2381

UNCOV
2382
      if (code) {
×
2383
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
2384
        goto _end;
×
2385
      }
2386
    }
2387
  }
2388

UNCOV
2389
  mndDestroyVgroupChangeInfo(&changeInfo);
×
2390

UNCOV
2391
_end:
×
UNCOV
2392
  taosArrayDestroy(pNodeSnapshot);
×
2393

UNCOV
2394
  mDebug("end to do stream task node change checking, elapsed time:%" PRId64 "ms", taosGetTimestampMs() - tsms);
×
UNCOV
2395
  atomic_store_32(&mndNodeCheckSentinel, 0);
×
2396

UNCOV
2397
  return 0;
×
2398
}
2399

UNCOV
2400
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
×
UNCOV
2401
  SMnode *pMnode = pReq->info.node;
×
UNCOV
2402
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2403
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
×
UNCOV
2404
    return 0;
×
2405
  }
2406

2407
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
×
UNCOV
2408
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
×
UNCOV
2409
  if (pMsg == NULL) {
×
UNCOV
2410
    return terrno;
×
2411
  }
2412

UNCOV
2413
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
×
UNCOV
2414
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
2415
}
2416

UNCOV
2417
static int32_t mndProcessStatusCheck(SRpcMsg *pReq) {
×
UNCOV
2418
  SMnode *pMnode = pReq->info.node;
×
UNCOV
2419
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2420
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
×
UNCOV
2421
    return 0;
×
2422
  }
2423

UNCOV
2424
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
×
UNCOV
2425
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
×
UNCOV
2426
  if (pMsg == NULL) {
×
UNCOV
2427
    return terrno;
×
2428
  }
2429

UNCOV
2430
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
×
UNCOV
2431
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
2432
}
2433

UNCOV
2434
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
×
2435
  SStreamTaskIter *pIter = NULL;
×
UNCOV
2436
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
2437
  if (code) {
×
UNCOV
2438
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
2439
    return;
×
2440
  }
2441

UNCOV
2442
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
2443
    SStreamTask *pTask = NULL;
×
UNCOV
2444
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
2445
    if (code) {
×
UNCOV
2446
      break;
×
2447
    }
2448

2449
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
×
2450
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
×
UNCOV
2451
    if (p == NULL) {
×
UNCOV
2452
      STaskStatusEntry entry = {0};
×
UNCOV
2453
      streamTaskStatusInit(&entry, pTask);
×
2454

UNCOV
2455
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
×
UNCOV
2456
      if (code == 0) {
×
UNCOV
2457
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
×
UNCOV
2458
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
×
UNCOV
2459
        if (px) {
×
UNCOV
2460
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2461
        } else {
2462
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2463
        }
2464
      } else {
UNCOV
2465
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2466
      }
2467

2468
      // add the new vgroups if not added yet
2469
      bool exist = false;
×
2470
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
×
2471
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
×
2472
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
×
UNCOV
2473
          exist = true;
×
2474
          break;
×
2475
        }
2476
      }
2477

UNCOV
2478
      if (!exist) {
×
UNCOV
2479
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
×
UNCOV
2480
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
×
2481

UNCOV
2482
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
×
UNCOV
2483
        if (px) {
×
UNCOV
2484
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
×
2485
        } else {
UNCOV
2486
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2487
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2488
        }
2489
      }
2490
    }
2491
  }
2492

UNCOV
2493
  destroyStreamTaskIter(pIter);
×
2494
}
2495

UNCOV
2496
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
×
UNCOV
2497
  int32_t num = taosArrayGetSize(pList);
×
UNCOV
2498
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
2499
    int32_t *pId = taosArrayGet(pList, i);
×
UNCOV
2500
    if (pId == NULL) {
×
UNCOV
2501
      continue;
×
2502
    }
2503

UNCOV
2504
    if (taskId == *pId) {
×
UNCOV
2505
      return;
×
2506
    }
2507
  }
2508

UNCOV
2509
  int32_t numOfTasks = taosArrayGetSize(pList);
×
UNCOV
2510
  void   *p = taosArrayPush(pList, &taskId);
×
UNCOV
2511
  if (p) {
×
UNCOV
2512
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
×
2513
  } else {
UNCOV
2514
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2515
           uid, numOfTasks);
2516
  }
2517
}
2518

UNCOV
2519
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
×
UNCOV
2520
  SMnode                  *pMnode = pReq->info.node;
×
UNCOV
2521
  SStreamTaskCheckpointReq req = {0};
×
2522

UNCOV
2523
  SDecoder decoder = {0};
×
UNCOV
2524
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
2525

UNCOV
2526
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
×
2527
    tDecoderClear(&decoder);
×
UNCOV
2528
    mError("invalid task checkpoint req msg received");
×
UNCOV
2529
    return TSDB_CODE_INVALID_MSG;
×
2530
  }
UNCOV
2531
  tDecoderClear(&decoder);
×
2532

UNCOV
2533
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
×
2534

2535
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
UNCOV
2536
  streamMutexLock(&execInfo.lock);
×
2537

UNCOV
2538
  SStreamObj *pStream = NULL;
×
UNCOV
2539
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
×
UNCOV
2540
  if (pStream == NULL || code != 0) {
×
UNCOV
2541
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2542
          req.streamId);
2543

2544
    // not in meta-store yet, try to acquire the task in exec buffer
2545
    // the checkpoint req arrives too soon before the completion of the create stream trans.
UNCOV
2546
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
UNCOV
2547
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
2548
    if (p == NULL) {
×
UNCOV
2549
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2550
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
2551
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2552
    } else {
UNCOV
2553
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2554
             req.streamId, req.taskId);
2555
    }
2556
  }
2557

UNCOV
2558
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
×
2559

2560
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
×
UNCOV
2561
  if (pReqTaskList == NULL) {
×
UNCOV
2562
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
×
UNCOV
2563
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
×
2564
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
×
UNCOV
2565
    if (code) {
×
UNCOV
2566
      mError("failed to put into transfer state stream map, code: out of memory");
×
2567
    }
UNCOV
2568
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
×
2569
  } else {
UNCOV
2570
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
×
2571
  }
2572

UNCOV
2573
  int32_t total = taosArrayGetSize(*pReqTaskList);
×
UNCOV
2574
  if (total == numOfTasks) {  // all tasks have sent the reqs
×
UNCOV
2575
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
×
UNCOV
2576
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
×
2577

UNCOV
2578
    if (pStream != NULL) {  // TODO:handle error
×
UNCOV
2579
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
×
UNCOV
2580
      if (code) {
×
UNCOV
2581
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
×
2582
      }
2583
    } else {
2584
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2585
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2586
      // sleep(500ms)
2587
    }
2588

2589
    // remove this entry
2590
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
×
2591

UNCOV
2592
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
×
UNCOV
2593
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
×
2594
  }
2595

2596
  if (pStream != NULL) {
×
2597
    mndReleaseStream(pMnode, pStream);
×
2598
  }
2599

2600
  streamMutexUnlock(&execInfo.lock);
×
2601

2602
  {
UNCOV
2603
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
×
UNCOV
2604
    rsp.pCont = rpcMallocCont(rsp.contLen);
×
UNCOV
2605
    if (rsp.pCont == NULL) {
×
UNCOV
2606
      return terrno;
×
2607
    }
2608

UNCOV
2609
    SMsgHead *pHead = rsp.pCont;
×
UNCOV
2610
    pHead->vgId = htonl(req.nodeId);
×
2611

UNCOV
2612
    tmsgSendRsp(&rsp);
×
UNCOV
2613
    pReq->info.handle = NULL;  // disable auto rsp
×
2614
  }
2615

UNCOV
2616
  return 0;
×
2617
}
2618

2619
// valid the info according to the HbMsg
UNCOV
2620
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
×
UNCOV
2621
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
×
UNCOV
2622
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
2623
  if (pTaskEntry == NULL) {
×
UNCOV
2624
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
×
UNCOV
2625
    return false;
×
2626
  }
2627

UNCOV
2628
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
×
UNCOV
2629
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2630
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
UNCOV
2631
    return false;
×
2632
  }
2633

2634
  // now the task in checkpoint procedure
2635
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
×
2636
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2637
           " discard",
2638
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
UNCOV
2639
    return false;
×
2640
  }
2641

UNCOV
2642
  if (reportChkptId >= pReport->checkpointId) {
×
UNCOV
2643
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2644
           " discard",
2645
           pReport->taskId, pReport->checkpointId, reportChkptId);
UNCOV
2646
    return false;
×
2647
  }
2648

UNCOV
2649
  return true;
×
2650
}
2651

UNCOV
2652
static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) {
×
UNCOV
2653
  bool valid = validateChkptReport(pReport, reportedChkptId);
×
UNCOV
2654
  if (!valid) {
×
2655
    return;
×
2656
  }
2657

UNCOV
2658
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
2659
    STaskChkptInfo *p = taosArrayGet(pList, i);
×
2660
    if (p == NULL) {
×
2661
      continue;
×
2662
    }
2663

2664
    if (p->taskId == pReport->taskId) {
×
UNCOV
2665
      if (p->checkpointId > pReport->checkpointId) {
×
2666
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2667
               pReport->taskId, p->checkpointId, pReport->checkpointId);
UNCOV
2668
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
UNCOV
2669
        mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2670
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2671

2672
        // update the checkpoint report info
UNCOV
2673
        p->checkpointId = pReport->checkpointId;
×
UNCOV
2674
        p->ts = pReport->checkpointTs;
×
UNCOV
2675
        p->version = pReport->checkpointVer;
×
UNCOV
2676
        p->transId = pReport->transId;
×
UNCOV
2677
        p->dropHTask = pReport->dropHTask;
×
2678
      } else {
UNCOV
2679
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2680
      }
2681
      return;
×
2682
    }
2683
  }
2684

UNCOV
2685
  STaskChkptInfo info = {
×
UNCOV
2686
      .streamId = pReport->streamId,
×
UNCOV
2687
      .taskId = pReport->taskId,
×
UNCOV
2688
      .transId = pReport->transId,
×
UNCOV
2689
      .dropHTask = pReport->dropHTask,
×
UNCOV
2690
      .version = pReport->checkpointVer,
×
UNCOV
2691
      .ts = pReport->checkpointTs,
×
UNCOV
2692
      .checkpointId = pReport->checkpointId,
×
UNCOV
2693
      .nodeId = pReport->nodeId,
×
2694
  };
2695

UNCOV
2696
  void *p = taosArrayPush(pList, &info);
×
UNCOV
2697
  if (p == NULL) {
×
UNCOV
2698
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2699
  } else {
UNCOV
2700
    int32_t size = taosArrayGetSize(pList);
×
UNCOV
2701
    mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report",
×
2702
           pReport->streamId, pReport->taskId, size);
2703
  }
2704
}
2705

UNCOV
2706
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
×
UNCOV
2707
  SMnode           *pMnode = pReq->info.node;
×
UNCOV
2708
  SCheckpointReport req = {0};
×
2709

UNCOV
2710
  SDecoder decoder = {0};
×
UNCOV
2711
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
2712

UNCOV
2713
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
×
UNCOV
2714
    tDecoderClear(&decoder);
×
UNCOV
2715
    mError("invalid task checkpoint-report msg received");
×
2716
    return TSDB_CODE_INVALID_MSG;
×
2717
  }
UNCOV
2718
  tDecoderClear(&decoder);
×
2719

UNCOV
2720
  streamMutexLock(&execInfo.lock);
×
UNCOV
2721
  mndInitStreamExecInfo(pMnode, &execInfo);
×
UNCOV
2722
  streamMutexUnlock(&execInfo.lock);
×
2723

UNCOV
2724
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
×
2725
         " checkpointVer:%" PRId64 " transId:%d",
2726
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2727

2728
  // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans.
UNCOV
2729
  streamMutexLock(&execInfo.lock);
×
2730

UNCOV
2731
  SStreamObj *pStream = NULL;
×
UNCOV
2732
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
×
UNCOV
2733
  if (pStream == NULL || code != 0) {
×
UNCOV
2734
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2735

2736
    // not in meta-store yet, try to acquire the task in exec buffer
2737
    // the checkpoint req arrives too soon before the completion of the creation of stream trans.
UNCOV
2738
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
UNCOV
2739
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
2740
    if (p == NULL) {
×
UNCOV
2741
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
UNCOV
2742
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
2743
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2744
    } else {
UNCOV
2745
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2746
             req.streamId, req.taskId);
2747
    }
2748
  }
2749

UNCOV
2750
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
×
2751

2752
  SChkptReportInfo *pInfo =
UNCOV
2753
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
×
UNCOV
2754
  if (pInfo == NULL) {
×
UNCOV
2755
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
×
UNCOV
2756
    if (info.pTaskList != NULL) {
×
UNCOV
2757
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
×
UNCOV
2758
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
×
2759
      if (code) {
×
UNCOV
2760
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2761
      }
2762

UNCOV
2763
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
×
2764
    }
2765
  } else {
UNCOV
2766
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
×
2767
  }
2768

UNCOV
2769
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
×
UNCOV
2770
  if (total == numOfTasks) {  // all tasks have sent the reqs
×
UNCOV
2771
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
×
2772
          " will be issued soon",
2773
          req.streamId, pStream->name, total, req.checkpointId);
2774
  }
2775

UNCOV
2776
  if (pStream != NULL) {
×
UNCOV
2777
    mndReleaseStream(pMnode, pStream);
×
2778
  }
2779

UNCOV
2780
  streamMutexUnlock(&execInfo.lock);
×
2781

UNCOV
2782
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
×
UNCOV
2783
  return code;
×
2784
}
2785

2786
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
×
UNCOV
2787
  int32_t num = 0;
×
UNCOV
2788
  int64_t chkId = INT64_MAX;
×
UNCOV
2789
  *pExistedTasks = 0;
×
UNCOV
2790
  *pAllSame = true;
×
2791

2792
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
UNCOV
2793
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
2794
    if (p == NULL) {
×
UNCOV
2795
      continue;
×
2796
    }
2797

UNCOV
2798
    if (p->streamId != streamId) {
×
UNCOV
2799
      continue;
×
2800
    }
2801

UNCOV
2802
    num += 1;
×
UNCOV
2803
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
2804
    if (chkId > pe->checkpointInfo.latestId) {
×
UNCOV
2805
      if (chkId != INT64_MAX) {
×
UNCOV
2806
        *pAllSame = false;
×
2807
      }
UNCOV
2808
      chkId = pe->checkpointInfo.latestId;
×
2809
    }
2810
  }
2811

UNCOV
2812
  *pExistedTasks = num;
×
UNCOV
2813
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
×
UNCOV
2814
    return -1;
×
2815
  }
2816

UNCOV
2817
  return chkId;
×
2818
}
2819

UNCOV
2820
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
×
UNCOV
2821
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
×
UNCOV
2822
  rsp.pCont = rpcMallocCont(rsp.contLen);
×
2823
  if (rsp.pCont != NULL) {
×
2824
    SMsgHead *pHead = rsp.pCont;
×
2825
    pHead->vgId = htonl(vgId);
×
2826

UNCOV
2827
    tmsgSendRsp(&rsp);
×
UNCOV
2828
    pInfo->handle = NULL;  // disable auto rsp
×
2829
  }
2830
}
×
2831

UNCOV
2832
static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) {
×
UNCOV
2833
  int32_t alreadySend = taosArrayGetSize(pList);
×
2834

UNCOV
2835
  for (int32_t i = 0; i < alreadySend; ++i) {
×
2836
    int32_t *taskId = taosArrayGet(pList, i);
×
UNCOV
2837
    if (taskId == NULL) {
×
UNCOV
2838
      continue;
×
2839
    }
2840

UNCOV
2841
    for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
×
UNCOV
2842
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
×
UNCOV
2843
      if ((pe != NULL) && (pe->req.taskId == *taskId)) {
×
UNCOV
2844
        taosArrayRemove(pInfo->pTaskList, k);
×
UNCOV
2845
        break;
×
2846
      }
2847
    }
2848
  }
2849

UNCOV
2850
  return alreadySend;
×
2851
}
2852

2853
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
2✔
2854
  SMnode *pMnode = pMsg->info.node;
2✔
2855
  int64_t now = taosGetTimestampMs();
2✔
2856
  bool    allReady = true;
2✔
2857
  SArray *pNodeSnapshot = NULL;
2✔
2858
  int32_t maxAllowedTrans = 20;
2✔
2859
  int32_t numOfTrans = 0;
2✔
2860
  int32_t code = 0;
2✔
2861
  void   *pIter = NULL;
2✔
2862

2863
  SArray *pList = taosArrayInit(4, sizeof(int32_t));
2✔
2864
  if (pList == NULL) {
2!
UNCOV
2865
    return terrno;
×
2866
  }
2867

2868
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
2✔
2869
  if (pStreamList == NULL) {
2!
UNCOV
2870
    taosArrayDestroy(pList);
×
UNCOV
2871
    return terrno;
×
2872
  }
2873

2874
  mDebug("start to process consensus-checkpointId in tmr");
2!
2875

2876
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
2✔
2877
  taosArrayDestroy(pNodeSnapshot);
2✔
2878
  if (code) {
2!
UNCOV
2879
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
2880
  }
2881

2882
  if (!allReady) {
2✔
2883
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
1!
2884
    taosArrayDestroy(pStreamList);
1✔
2885
    taosArrayDestroy(pList);
1✔
2886
    return 0;
1✔
2887
  }
2888

2889
  streamMutexLock(&execInfo.lock);
1✔
2890

2891
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
1!
UNCOV
2892
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
×
2893

UNCOV
2894
    taosArrayClear(pList);
×
2895

UNCOV
2896
    int64_t     streamId = -1;
×
UNCOV
2897
    int32_t     num = taosArrayGetSize(pInfo->pTaskList);
×
UNCOV
2898
    SStreamObj *pStream = NULL;
×
2899

UNCOV
2900
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
×
UNCOV
2901
    if (pStream == NULL || code != 0) {  // stream has been dropped already
×
UNCOV
2902
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
UNCOV
2903
      void *p = taosArrayPush(pStreamList, &pInfo->streamId);
×
UNCOV
2904
      if (p == NULL) {
×
UNCOV
2905
        mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
×
2906
               " code:%s, continue",
2907
               pInfo->streamId, tstrerror(terrno));
2908
      }
UNCOV
2909
      continue;
×
2910
    }
2911

UNCOV
2912
    for (int32_t j = 0; j < num; ++j) {
×
UNCOV
2913
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
×
UNCOV
2914
      if (pe == NULL) {
×
UNCOV
2915
        continue;
×
2916
      }
2917

UNCOV
2918
      if (streamId == -1) {
×
UNCOV
2919
        streamId = pe->req.streamId;
×
2920
      }
2921

UNCOV
2922
      int32_t existed = 0;
×
UNCOV
2923
      bool    allSame = true;
×
UNCOV
2924
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
×
UNCOV
2925
      if (chkId == -1) {
×
UNCOV
2926
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2927
               pInfo->numOfTasks, pe->req.taskId);
2928
        break;
×
2929
      }
2930

UNCOV
2931
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
×
UNCOV
2932
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
×
2933
               pe->req.startTs, (now - pe->ts) / 1000.0);
2934
        if (chkId > pe->req.checkpointId) {
×
2935
          streamMutexUnlock(&execInfo.lock);
×
UNCOV
2936
          taosArrayDestroy(pStreamList);
×
UNCOV
2937
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2938
                 pe->req.checkpointId, chkId);
2939

UNCOV
2940
          mndReleaseStream(pMnode, pStream);
×
UNCOV
2941
          taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
UNCOV
2942
          return TSDB_CODE_FAILED;
×
2943
        }
2944

2945
        // todo: check for redundant consensus-checkpoint trans, if this kinds of trans repeatly failed.
UNCOV
2946
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
×
UNCOV
2947
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2948
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2949
        }
2950

UNCOV
2951
        void *p = taosArrayPush(pList, &pe->req.taskId);
×
UNCOV
2952
        if (p == NULL) {
×
UNCOV
2953
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2954
        }
2955
      } else {
UNCOV
2956
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
×
2957
               pe->req.startTs, (now - pe->ts) / 1000.0);
2958
      }
2959
    }
2960

UNCOV
2961
    mndReleaseStream(pMnode, pStream);
×
2962

UNCOV
2963
    int32_t alreadySend = doCleanReqList(pList, pInfo);
×
2964

2965
    // clear request stream item with empty task list
UNCOV
2966
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
×
UNCOV
2967
      mndClearConsensusRspEntry(pInfo);
×
UNCOV
2968
      if (streamId == -1) {
×
UNCOV
2969
        mError("streamId is -1, streamId:%" PRIx64 " in consensus-checkpointId hashMap, cont", pInfo->streamId);
×
2970
      }
2971

UNCOV
2972
      void *p = taosArrayPush(pStreamList, &streamId);
×
UNCOV
2973
      if (p == NULL) {
×
UNCOV
2974
        mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId);
×
2975
      }
2976
    }
2977

UNCOV
2978
    numOfTrans += alreadySend;
×
UNCOV
2979
    if (numOfTrans > maxAllowedTrans) {
×
UNCOV
2980
      mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend);
×
UNCOV
2981
      taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
UNCOV
2982
      break;
×
2983
    }
2984
  }
2985

2986
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
1!
UNCOV
2987
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
×
UNCOV
2988
    if (pStreamId == NULL) {
×
UNCOV
2989
      continue;
×
2990
    }
2991

UNCOV
2992
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
×
2993
  }
2994

2995
  streamMutexUnlock(&execInfo.lock);
1✔
2996

2997
  taosArrayDestroy(pStreamList);
1✔
2998
  taosArrayDestroy(pList);
1✔
2999

3000
  mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
1!
3001
  return code;
1✔
3002
}
3003

UNCOV
3004
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
×
UNCOV
3005
  int32_t code = mndProcessCreateStreamReq(pReq);
×
UNCOV
3006
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3007
    pReq->info.rsp = rpcMallocCont(1);
×
UNCOV
3008
    if (pReq->info.rsp == NULL) {
×
UNCOV
3009
      return terrno;
×
3010
    }
3011

UNCOV
3012
    pReq->info.rspLen = 1;
×
UNCOV
3013
    pReq->info.noResp = false;
×
UNCOV
3014
    pReq->code = code;
×
3015
  }
UNCOV
3016
  return code;
×
3017
}
3018

UNCOV
3019
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
×
3020
  int32_t code = mndProcessDropStreamReq(pReq);
×
3021
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3022
    pReq->info.rsp = rpcMallocCont(1);
×
UNCOV
3023
    if (pReq->info.rsp == NULL) {
×
UNCOV
3024
      return terrno;
×
3025
    }
3026

3027
    pReq->info.rspLen = 1;
×
3028
    pReq->info.noResp = false;
×
UNCOV
3029
    pReq->code = code;
×
3030
  }
UNCOV
3031
  return code;
×
3032
}
3033

3034
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
1✔
3035
  if (pExecInfo->initTaskList || pMnode == NULL) {
1!
3036
    return;
1✔
3037
  }
3038

UNCOV
3039
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
×
3040
  pExecInfo->initTaskList = true;
×
3041
}
3042

3043
void mndStreamResetInitTaskListLoadFlag() {
8✔
3044
  mInfo("reset task list buffer init flag for leader");
8!
3045
  execInfo.initTaskList = false;
8✔
3046
}
8✔
3047

3048
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
8✔
3049
  execInfo.switchFromFollower = false;
8✔
3050

3051
  if (execInfo.role == NODE_ROLE_UNINIT) {
8!
3052
    execInfo.role = role;
8✔
3053
    if (role == NODE_ROLE_LEADER) {
8!
3054
      mInfo("init mnode is set to leader");
8!
3055
    } else {
UNCOV
3056
      mInfo("init mnode is set to follower");
×
3057
    }
3058
  } else {
UNCOV
3059
    if (role == NODE_ROLE_LEADER) {
×
UNCOV
3060
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
×
UNCOV
3061
        execInfo.role = role;
×
UNCOV
3062
        execInfo.switchFromFollower = true;
×
UNCOV
3063
        mInfo("mnode switch to be leader from follower");
×
3064
      } else {
UNCOV
3065
        mInfo("mnode remain to be leader, do nothing");
×
3066
      }
3067
    } else {  // follower's
UNCOV
3068
      if (execInfo.role == NODE_ROLE_LEADER) {
×
UNCOV
3069
        execInfo.role = role;
×
3070
        mInfo("mnode switch to be follower from leader");
×
3071
      } else {
UNCOV
3072
        mInfo("mnode remain to be follower, do nothing");
×
3073
      }
3074
    }
3075
  }
3076
}
8✔
3077

UNCOV
3078
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
×
UNCOV
3079
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
3080
  SStreamObj *pStream = NULL;
×
UNCOV
3081
  void       *pIter = NULL;
×
3082

3083
  while (1) {
UNCOV
3084
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
3085
    if (pIter == NULL) {
×
UNCOV
3086
      break;
×
3087
    }
3088

UNCOV
3089
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
×
UNCOV
3090
    sdbRelease(pSdb, pStream);
×
3091
  }
UNCOV
3092
}
×
3093

3094
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
×
UNCOV
3095
  STrans *pTrans = NULL;
×
UNCOV
3096
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
×
3097
                               "update checkpoint-info", &pTrans);
UNCOV
3098
  if (pTrans == NULL || code) {
×
UNCOV
3099
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3100
    return code;
×
3101
  }
3102

UNCOV
3103
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
×
UNCOV
3104
  if (code) {
×
UNCOV
3105
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3106
    mndTransDrop(pTrans);
×
3107
    return code;
×
3108
  }
3109

UNCOV
3110
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
×
UNCOV
3111
  if (code) {
×
3112
    sdbRelease(pMnode->pSdb, pStream);
×
3113
    mndTransDrop(pTrans);
×
UNCOV
3114
    return code;
×
3115
  }
3116

UNCOV
3117
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
3118
  if (code) {
×
UNCOV
3119
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3120
    mndTransDrop(pTrans);
×
UNCOV
3121
    return code;
×
3122
  }
3123

3124
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
3125
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3126
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
3127
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3128
    mndTransDrop(pTrans);
×
UNCOV
3129
    return code;
×
3130
  }
3131

UNCOV
3132
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3133
  mndTransDrop(pTrans);
×
3134

UNCOV
3135
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
3136
}
3137

UNCOV
3138
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
×
UNCOV
3139
  SMnode      *pMnode = pReq->info.node;
×
UNCOV
3140
  int32_t      code = 0;
×
UNCOV
3141
  SOrphanTask *pTask = NULL;
×
UNCOV
3142
  int32_t      i = 0;
×
UNCOV
3143
  STrans      *pTrans = NULL;
×
UNCOV
3144
  int32_t      numOfTasks = 0;
×
3145

UNCOV
3146
  SMStreamDropOrphanMsg msg = {0};
×
UNCOV
3147
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
×
UNCOV
3148
  if (code) {
×
UNCOV
3149
    return code;
×
3150
  }
3151

UNCOV
3152
  numOfTasks = taosArrayGetSize(msg.pList);
×
UNCOV
3153
  if (numOfTasks == 0) {
×
UNCOV
3154
    mDebug("no orphan tasks to drop, no need to create trans");
×
UNCOV
3155
    goto _err;
×
3156
  }
3157

UNCOV
3158
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
×
3159

UNCOV
3160
  i = 0;
×
UNCOV
3161
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
×
UNCOV
3162
    i += 1;
×
3163
  }
3164

UNCOV
3165
  if (pTask == NULL) {
×
UNCOV
3166
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
UNCOV
3167
    goto _err;
×
3168
  }
3169

3170
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
3171
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
×
UNCOV
3172
  if (code) {
×
UNCOV
3173
    goto _err;
×
3174
  }
3175

UNCOV
3176
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
×
3177

UNCOV
3178
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
×
UNCOV
3179
  if (pTrans == NULL || code != 0) {
×
UNCOV
3180
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
3181
    goto _err;
×
3182
  }
3183

UNCOV
3184
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
×
UNCOV
3185
  if (code) {
×
UNCOV
3186
    goto _err;
×
3187
  }
3188

3189
  // drop all tasks
UNCOV
3190
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
×
UNCOV
3191
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
3192
    goto _err;
×
3193
  }
3194

3195
  // drop stream
UNCOV
3196
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
UNCOV
3197
    goto _err;
×
3198
  }
3199

UNCOV
3200
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
3201
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3202
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
3203
    goto _err;
×
3204
  }
3205

UNCOV
3206
_err:
×
UNCOV
3207
  tDestroyDropOrphanTaskMsg(&msg);
×
UNCOV
3208
  mndTransDrop(pTrans);
×
3209

UNCOV
3210
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3211
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
×
3212
  }
UNCOV
3213
  return code;
×
3214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc