• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndStream.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
44

45
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
46
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
47

48
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
49
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
51
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
53
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
54
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
55
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
56
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
57
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
58
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
59
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
60
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
61
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
62
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
63
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
64
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
65
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
66
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
67

68
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
69
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
70

71
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
72
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
73
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
74
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
75
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
76

UNCOV
77
int32_t mndInitStream(SMnode *pMnode) {
×
UNCOV
78
  SSdbTable table = {
×
79
      .sdbType = SDB_STREAM,
80
      .keyType = SDB_KEY_BINARY,
81
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
82
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
83
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
84
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
85
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
86
  };
UNCOV
87
  SSdbTable tableSeq = {
×
88
      .sdbType = SDB_STREAM_SEQ,
89
      .keyType = SDB_KEY_BINARY,
90
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
91
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
92
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
93
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
94
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
95
  };
96

UNCOV
97
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
×
UNCOV
98
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
×
UNCOV
99
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
×
100

UNCOV
101
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
×
UNCOV
102
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
×
UNCOV
103
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
×
UNCOV
104
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
×
UNCOV
105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
×
UNCOV
106
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
×
UNCOV
107
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
×
UNCOV
108
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
×
UNCOV
109
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
×
110

111
  // for msgs inside mnode
112
  // TODO change the name
UNCOV
113
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
×
UNCOV
114
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
×
UNCOV
115
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
×
UNCOV
116
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
×
117

UNCOV
118
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
×
UNCOV
119
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
×
UNCOV
120
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
×
UNCOV
121
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
×
UNCOV
122
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
×
UNCOV
123
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
×
UNCOV
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
×
UNCOV
125
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
×
UNCOV
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
×
UNCOV
127
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
×
UNCOV
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
×
129

UNCOV
130
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
×
UNCOV
131
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
×
UNCOV
132
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
×
133

UNCOV
134
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
×
UNCOV
135
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
×
UNCOV
136
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
×
UNCOV
137
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
×
138

UNCOV
139
  int32_t code = mndInitExecInfo();
×
UNCOV
140
  if (code) {
×
141
    return code;
×
142
  }
143

UNCOV
144
  code = sdbSetTable(pMnode->pSdb, table);
×
UNCOV
145
  if (code) {
×
146
    return code;
×
147
  }
148

UNCOV
149
  code = sdbSetTable(pMnode->pSdb, tableSeq);
×
UNCOV
150
  return code;
×
151
}
152

UNCOV
153
void mndCleanupStream(SMnode *pMnode) {
×
UNCOV
154
  taosArrayDestroy(execInfo.pTaskList);
×
UNCOV
155
  taosArrayDestroy(execInfo.pNodeList);
×
UNCOV
156
  taosArrayDestroy(execInfo.pKilledChkptTrans);
×
UNCOV
157
  taosHashCleanup(execInfo.pTaskMap);
×
UNCOV
158
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
×
UNCOV
159
  taosHashCleanup(execInfo.pTransferStateStreams);
×
UNCOV
160
  taosHashCleanup(execInfo.pChkptStreams);
×
UNCOV
161
  taosHashCleanup(execInfo.pStreamConsensus);
×
UNCOV
162
  (void)taosThreadMutexDestroy(&execInfo.lock);
×
UNCOV
163
  mDebug("mnd stream exec info cleanup");
×
UNCOV
164
}
×
165

UNCOV
166
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
×
UNCOV
167
  int32_t     code = 0;
×
UNCOV
168
  int32_t     lino = 0;
×
UNCOV
169
  SSdbRow    *pRow = NULL;
×
UNCOV
170
  SStreamObj *pStream = NULL;
×
UNCOV
171
  void       *buf = NULL;
×
UNCOV
172
  int8_t      sver = 0;
×
173
  int32_t     tlen;
UNCOV
174
  int32_t     dataPos = 0;
×
175

UNCOV
176
  code = sdbGetRawSoftVer(pRaw, &sver);
×
UNCOV
177
  TSDB_CHECK_CODE(code, lino, _over);
×
178

UNCOV
179
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
×
180
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
181
    goto _over;
×
182
  }
183

UNCOV
184
  pRow = sdbAllocRow(sizeof(SStreamObj));
×
UNCOV
185
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
×
186

UNCOV
187
  pStream = sdbGetRowObj(pRow);
×
UNCOV
188
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
×
189

UNCOV
190
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
×
191

UNCOV
192
  buf = taosMemoryMalloc(tlen + 1);
×
UNCOV
193
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
×
194

UNCOV
195
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
×
196

197
  SDecoder decoder;
UNCOV
198
  tDecoderInit(&decoder, buf, tlen + 1);
×
UNCOV
199
  code = tDecodeSStreamObj(&decoder, pStream, sver);
×
UNCOV
200
  tDecoderClear(&decoder);
×
201

UNCOV
202
  if (code < 0) {
×
203
    tFreeStreamObj(pStream);
×
204
  }
205

UNCOV
206
_over:
×
UNCOV
207
  taosMemoryFreeClear(buf);
×
208

UNCOV
209
  if (code != TSDB_CODE_SUCCESS) {
×
210
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
211
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
212
    taosMemoryFreeClear(pRow);
×
213

214
    terrno = code;
×
215
    return NULL;
×
216
  } else {
UNCOV
217
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
×
218
           pStream->checkpointId);
219

UNCOV
220
    terrno = 0;
×
UNCOV
221
    return pRow;
×
222
  }
223
}
224

UNCOV
225
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
×
UNCOV
226
  mTrace("stream:%s, perform insert action", pStream->name);
×
UNCOV
227
  return 0;
×
228
}
229

UNCOV
230
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
×
UNCOV
231
  mTrace("stream:%s, perform delete action", pStream->name);
×
UNCOV
232
  taosWLockLatch(&pStream->lock);
×
UNCOV
233
  tFreeStreamObj(pStream);
×
UNCOV
234
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
235
  return 0;
×
236
}
237

UNCOV
238
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
×
UNCOV
239
  mTrace("stream:%s, perform update action", pOldStream->name);
×
UNCOV
240
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
×
241

UNCOV
242
  taosWLockLatch(&pOldStream->lock);
×
243

UNCOV
244
  pOldStream->status = pNewStream->status;
×
UNCOV
245
  pOldStream->updateTime = pNewStream->updateTime;
×
UNCOV
246
  pOldStream->checkpointId = pNewStream->checkpointId;
×
UNCOV
247
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
×
248

UNCOV
249
  taosWUnLockLatch(&pOldStream->lock);
×
UNCOV
250
  return 0;
×
251
}
252

UNCOV
253
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
×
UNCOV
254
  int32_t code = 0;
×
UNCOV
255
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
256
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
UNCOV
257
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
UNCOV
258
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
259
  }
UNCOV
260
  return code;
×
261
}
262

UNCOV
263
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
×
UNCOV
264
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
265
  sdbRelease(pSdb, pStream);
×
UNCOV
266
}
×
267

268
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
269
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
270
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
271
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
272
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
273

UNCOV
274
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
×
UNCOV
275
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
×
UNCOV
276
      pCreate->targetStbFullName[0] == 0) {
×
277
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
278
  }
UNCOV
279
  return TSDB_CODE_SUCCESS;
×
280
}
281

UNCOV
282
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
×
UNCOV
283
  pWrapper->nCols = taosArrayGetSize(pFields);
×
UNCOV
284
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
×
UNCOV
285
  if (NULL == pWrapper->pSchema) {
×
286
    return terrno;
×
287
  }
288

UNCOV
289
  int32_t index = 0;
×
UNCOV
290
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
×
UNCOV
291
    SField *pField = (SField *)taosArrayGet(pFields, i);
×
UNCOV
292
    if (pField == NULL) {
×
293
      return terrno;
×
294
    }
295

UNCOV
296
    if (TSDB_DATA_TYPE_NULL == pField->type) {
×
297
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
298
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
299
    } else {
UNCOV
300
      pWrapper->pSchema[index].type = pField->type;
×
UNCOV
301
      pWrapper->pSchema[index].bytes = pField->bytes;
×
302
    }
UNCOV
303
    pWrapper->pSchema[index].colId = index + 1;
×
UNCOV
304
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
×
UNCOV
305
    pWrapper->pSchema[index].flags = pField->flags;
×
UNCOV
306
    index += 1;
×
307
  }
308

UNCOV
309
  return TSDB_CODE_SUCCESS;
×
310
}
311

UNCOV
312
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
×
UNCOV
313
  if (pWrapper->nCols < 2) {
×
314
    return false;
×
315
  }
UNCOV
316
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
×
UNCOV
317
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
×
UNCOV
318
      return true;
×
319
    }
320
  }
UNCOV
321
  return false;
×
322
}
323

UNCOV
324
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
×
UNCOV
325
  SNode      *pAst = NULL;
×
UNCOV
326
  SQueryPlan *pPlan = NULL;
×
UNCOV
327
  int32_t     code = 0;
×
328

UNCOV
329
  mInfo("stream:%s to create", pCreate->name);
×
UNCOV
330
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
×
UNCOV
331
  pObj->createTime = taosGetTimestampMs();
×
UNCOV
332
  pObj->updateTime = pObj->createTime;
×
UNCOV
333
  pObj->version = 1;
×
334

UNCOV
335
  if (pCreate->smaId > 0) {
×
UNCOV
336
    pObj->subTableWithoutMd5 = 1;
×
337
  }
338

UNCOV
339
  pObj->smaId = pCreate->smaId;
×
UNCOV
340
  pObj->indexForMultiAggBalance = -1;
×
341

UNCOV
342
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
×
343

UNCOV
344
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
×
UNCOV
345
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
×
346

UNCOV
347
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
×
UNCOV
348
  pObj->status = 0;
×
349

UNCOV
350
  pObj->conf.igExpired = pCreate->igExpired;
×
UNCOV
351
  pObj->conf.trigger = pCreate->triggerType;
×
UNCOV
352
  pObj->conf.triggerParam = pCreate->maxDelay;
×
UNCOV
353
  pObj->conf.watermark = pCreate->watermark;
×
UNCOV
354
  pObj->conf.fillHistory = pCreate->fillHistory;
×
UNCOV
355
  pObj->deleteMark = pCreate->deleteMark;
×
UNCOV
356
  pObj->igCheckUpdate = pCreate->igUpdate;
×
357

UNCOV
358
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
×
UNCOV
359
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
×
UNCOV
360
  if (pSourceDb == NULL) {
×
361
    code = terrno;
×
362
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
363
          tstrerror(code));
364
    goto FAIL;
×
365
  }
366

UNCOV
367
  pObj->sourceDbUid = pSourceDb->uid;
×
UNCOV
368
  mndReleaseDb(pMnode, pSourceDb);
×
369

UNCOV
370
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
371

UNCOV
372
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
×
UNCOV
373
  if (pTargetDb == NULL) {
×
374
    code = terrno;
×
375
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
376
           tstrerror(code));
377
    goto FAIL;
×
378
  }
379

UNCOV
380
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
×
381

UNCOV
382
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
×
UNCOV
383
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
384
  } else {
UNCOV
385
    pObj->targetStbUid = pCreate->targetStbUid;
×
386
  }
UNCOV
387
  pObj->targetDbUid = pTargetDb->uid;
×
UNCOV
388
  mndReleaseDb(pMnode, pTargetDb);
×
389

UNCOV
390
  pObj->sql = pCreate->sql;
×
UNCOV
391
  pObj->ast = pCreate->ast;
×
392

UNCOV
393
  pCreate->sql = NULL;
×
UNCOV
394
  pCreate->ast = NULL;
×
395

396
  // deserialize ast
UNCOV
397
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
×
398
    goto FAIL;
×
399
  }
400

401
  // create output schema
UNCOV
402
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
×
403
    goto FAIL;
×
404
  }
405

UNCOV
406
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
×
UNCOV
407
  if (numOfNULL > 0) {
×
UNCOV
408
    pObj->outputSchema.nCols += numOfNULL;
×
UNCOV
409
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
×
UNCOV
410
    if (!pFullSchema) {
×
411
      code = terrno;
×
412
      goto FAIL;
×
413
    }
414

UNCOV
415
    int32_t nullIndex = 0;
×
UNCOV
416
    int32_t dataIndex = 0;
×
UNCOV
417
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
×
UNCOV
418
      if (nullIndex >= numOfNULL) {
×
419
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
420
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
421
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
422
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
423
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
424
        dataIndex++;
×
425
      } else {
UNCOV
426
        SColLocation *pos = NULL;
×
UNCOV
427
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
×
UNCOV
428
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
×
429
        }
430

UNCOV
431
        if (pos == NULL) {
×
432
          mError("invalid null column index, %d", nullIndex);
×
433
          continue;
×
434
        }
435

UNCOV
436
        if (i < pos->slotId) {
×
UNCOV
437
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
UNCOV
438
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
UNCOV
439
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
UNCOV
440
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
UNCOV
441
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
UNCOV
442
          dataIndex++;
×
443
        } else {
UNCOV
444
          pFullSchema[i].bytes = 0;
×
UNCOV
445
          pFullSchema[i].colId = pos->colId;
×
UNCOV
446
          pFullSchema[i].flags = COL_SET_NULL;
×
UNCOV
447
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
×
UNCOV
448
          pFullSchema[i].type = pos->type;
×
UNCOV
449
          nullIndex++;
×
450
        }
451
      }
452
    }
453

UNCOV
454
    taosMemoryFree(pObj->outputSchema.pSchema);
×
UNCOV
455
    pObj->outputSchema.pSchema = pFullSchema;
×
456
  }
457

UNCOV
458
  SPlanContext cxt = {
×
459
      .pAstRoot = pAst,
460
      .topicQuery = false,
461
      .streamQuery = true,
462
      .triggerType =
UNCOV
463
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
×
UNCOV
464
      .watermark = pObj->conf.watermark,
×
UNCOV
465
      .igExpired = pObj->conf.igExpired,
×
UNCOV
466
      .deleteMark = pObj->deleteMark,
×
UNCOV
467
      .igCheckUpdate = pObj->igCheckUpdate,
×
UNCOV
468
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
×
469
  };
470

471
  // using ast and param to build physical plan
UNCOV
472
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
×
473
    goto FAIL;
×
474
  }
475

476
  // save physcial plan
UNCOV
477
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
×
478
    goto FAIL;
×
479
  }
480

UNCOV
481
  pObj->tagSchema.nCols = pCreate->numOfTags;
×
UNCOV
482
  if (pCreate->numOfTags) {
×
UNCOV
483
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
×
UNCOV
484
    if (pObj->tagSchema.pSchema == NULL) {
×
485
      code = terrno;
×
486
      goto FAIL;
×
487
    }
488
  }
489

490
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
UNCOV
491
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
×
UNCOV
492
    SField *pField = taosArrayGet(pCreate->pTags, i);
×
UNCOV
493
    if (pField == NULL) {
×
494
      continue;
×
495
    }
496

UNCOV
497
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
×
UNCOV
498
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
×
UNCOV
499
    pObj->tagSchema.pSchema[i].flags = pField->flags;
×
UNCOV
500
    pObj->tagSchema.pSchema[i].type = pField->type;
×
UNCOV
501
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
×
502
  }
503

UNCOV
504
FAIL:
×
UNCOV
505
  if (pAst != NULL) nodesDestroyNode(pAst);
×
UNCOV
506
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
×
UNCOV
507
  return code;
×
508
}
509

UNCOV
510
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
×
511
  SEncoder encoder;
UNCOV
512
  tEncoderInit(&encoder, NULL, 0);
×
513

UNCOV
514
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
515
    pTask->ver = SSTREAM_TASK_VER;
×
516
  }
517

UNCOV
518
  int32_t code = tEncodeStreamTask(&encoder, pTask);
×
UNCOV
519
  if (code == -1) {
×
520
    tEncoderClear(&encoder);
×
521
    return TSDB_CODE_INVALID_MSG;
×
522
  }
523

UNCOV
524
  int32_t size = encoder.pos;
×
UNCOV
525
  int32_t tlen = sizeof(SMsgHead) + size;
×
UNCOV
526
  tEncoderClear(&encoder);
×
527

UNCOV
528
  void *buf = taosMemoryCalloc(1, tlen);
×
UNCOV
529
  if (buf == NULL) {
×
530
    return terrno;
×
531
  }
532

UNCOV
533
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
×
534

UNCOV
535
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
UNCOV
536
  tEncoderInit(&encoder, abuf, size);
×
UNCOV
537
  code = tEncodeStreamTask(&encoder, pTask);
×
UNCOV
538
  tEncoderClear(&encoder);
×
539

UNCOV
540
  if (code != 0) {
×
541
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
542
    taosMemoryFree(buf);
×
543
    return code;
×
544
  }
545

UNCOV
546
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
×
547
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
UNCOV
548
  if (code) {
×
549
    taosMemoryFree(buf);
×
550
  }
551

UNCOV
552
  return code;
×
553
}
554

UNCOV
555
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
556
  SStreamTaskIter *pIter = NULL;
×
UNCOV
557
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
558
  if (code) {
×
559
    mError("failed to create task iter for stream:%s", pStream->name);
×
560
    return code;
×
561
  }
562

UNCOV
563
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
564
    SStreamTask *pTask = NULL;
×
UNCOV
565
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
566
    if (code) {
×
567
      destroyStreamTaskIter(pIter);
×
568
      return code;
×
569
    }
570

UNCOV
571
    code = mndPersistTaskDeployReq(pTrans, pTask);
×
UNCOV
572
    if (code) {
×
573
      destroyStreamTaskIter(pIter);
×
574
      return code;
×
575
    }
576
  }
577

UNCOV
578
  destroyStreamTaskIter(pIter);
×
579

580
  // persistent stream task for already stored ts data
UNCOV
581
  if (pStream->conf.fillHistory) {
×
UNCOV
582
    int32_t level = taosArrayGetSize(pStream->pHTasksList);
×
583

UNCOV
584
    for (int32_t i = 0; i < level; i++) {
×
UNCOV
585
      SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
×
586

UNCOV
587
      int32_t numOfTasks = taosArrayGetSize(pLevel);
×
UNCOV
588
      for (int32_t j = 0; j < numOfTasks; j++) {
×
UNCOV
589
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
×
UNCOV
590
        code = mndPersistTaskDeployReq(pTrans, pTask);
×
UNCOV
591
        if (code) {
×
592
          return code;
×
593
        }
594
      }
595
    }
596
  }
597

UNCOV
598
  return code;
×
599
}
600

UNCOV
601
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
602
  int32_t code = 0;
×
UNCOV
603
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
×
604
    return code;
×
605
  }
606

UNCOV
607
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
608
}
609

UNCOV
610
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
×
UNCOV
611
  SStbObj *pStb = NULL;
×
UNCOV
612
  SDbObj  *pDb = NULL;
×
UNCOV
613
  int32_t  code = 0;
×
UNCOV
614
  int32_t  lino = 0;
×
615

UNCOV
616
  SMCreateStbReq createReq = {0};
×
UNCOV
617
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
618
  createReq.numOfColumns = pStream->outputSchema.nCols;
×
UNCOV
619
  createReq.numOfTags = 1;  // group id
×
UNCOV
620
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
×
UNCOV
621
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
×
622

623
  // build fields
UNCOV
624
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
×
UNCOV
625
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
×
UNCOV
626
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
627

UNCOV
628
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
×
UNCOV
629
    pField->flags = pStream->outputSchema.pSchema[i].flags;
×
UNCOV
630
    pField->type = pStream->outputSchema.pSchema[i].type;
×
UNCOV
631
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
×
UNCOV
632
    pField->compress = createDefaultColCmprByType(pField->type);
×
633
  }
634

UNCOV
635
  if (pStream->tagSchema.nCols == 0) {
×
UNCOV
636
    createReq.numOfTags = 1;
×
UNCOV
637
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
UNCOV
638
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
639

640
    // build tags
UNCOV
641
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
UNCOV
642
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
643

UNCOV
644
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
UNCOV
645
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
UNCOV
646
    pField->flags = 0;
×
UNCOV
647
    pField->bytes = 8;
×
648
  } else {
UNCOV
649
    createReq.numOfTags = pStream->tagSchema.nCols;
×
UNCOV
650
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
×
UNCOV
651
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
652

UNCOV
653
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
×
UNCOV
654
      SField *pField = taosArrayGet(createReq.pTags, i);
×
UNCOV
655
      if (pField == NULL) {
×
656
        continue;
×
657
      }
658

UNCOV
659
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
×
UNCOV
660
      pField->flags = pStream->tagSchema.pSchema[i].flags;
×
UNCOV
661
      pField->type = pStream->tagSchema.pSchema[i].type;
×
UNCOV
662
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
×
663
    }
664
  }
665

UNCOV
666
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
×
667
    goto _OVER;
×
668
  }
669

UNCOV
670
  pStb = mndAcquireStb(pMnode, createReq.name);
×
UNCOV
671
  if (pStb != NULL) {
×
672
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
673
    goto _OVER;
×
674
  }
675

UNCOV
676
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
×
UNCOV
677
  if (pDb == NULL) {
×
678
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
679
    goto _OVER;
×
680
  }
681

UNCOV
682
  int32_t numOfStbs = -1;
×
UNCOV
683
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
×
684
    goto _OVER;
×
685
  }
686

UNCOV
687
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
×
688
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
689
    goto _OVER;
×
690
  }
691

UNCOV
692
  SStbObj stbObj = {0};
×
693

UNCOV
694
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
×
695
    goto _OVER;
×
696
  }
697

UNCOV
698
  stbObj.uid = pStream->targetStbUid;
×
699

UNCOV
700
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
×
701
    mndFreeStb(&stbObj);
×
702
    goto _OVER;
×
703
  }
704

UNCOV
705
  tFreeSMCreateStbReq(&createReq);
×
UNCOV
706
  mndFreeStb(&stbObj);
×
UNCOV
707
  mndReleaseStb(pMnode, pStb);
×
UNCOV
708
  mndReleaseDb(pMnode, pDb);
×
UNCOV
709
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
×
UNCOV
710
  return code;
×
711

712
_OVER:
×
713
  tFreeSMCreateStbReq(&createReq);
×
714
  mndReleaseStb(pMnode, pStb);
×
715
  mndReleaseDb(pMnode, pDb);
×
716

717
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
718
         tstrerror(code));
719
  return code;
×
720
}
721

722
// 1. stream number check
723
// 2. target stable can not be target table of other existed streams.
UNCOV
724
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
×
UNCOV
725
  int32_t     numOfStream = 0;
×
UNCOV
726
  SStreamObj *pStream = NULL;
×
UNCOV
727
  void       *pIter = NULL;
×
728

UNCOV
729
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
UNCOV
730
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
×
UNCOV
731
      ++numOfStream;
×
732
    }
733

UNCOV
734
    sdbRelease(pMnode->pSdb, pStream);
×
735

UNCOV
736
    if (numOfStream > MND_STREAM_MAX_NUM) {
×
737
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
738
             pStreamObj->name);
739
      sdbCancelFetch(pMnode->pSdb, pIter);
×
740
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
741
    }
742

UNCOV
743
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
×
UNCOV
744
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
×
745
             pStreamObj->name);
UNCOV
746
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
747
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
×
748
    }
749
  }
750

UNCOV
751
  return TSDB_CODE_SUCCESS;
×
752
}
753

754
static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
×
755

756
static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
×
757
                                       SStreamTask *pTask) {
758
  int32_t code = TSDB_CODE_SUCCESS;
×
759
  int32_t lino = 0;
×
760

761
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
762
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
763

764
  pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
×
765
  TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
×
766
  pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
×
767
  pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
×
768
  pTask->notifyInfo.streamName = taosStrdup(mndGetDbStr(createReq->name));
×
769
  TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
×
770
  pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
×
771
  TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
×
772
  pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
×
773
  TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
×
774

775
_end:
×
776
  if (code != TSDB_CODE_SUCCESS) {
×
777
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
778
  }
779
  return code;
×
780
}
781

UNCOV
782
static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
×
UNCOV
783
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
784
  int32_t lino = 0;
×
UNCOV
785
  int32_t level = 0;
×
UNCOV
786
  int32_t nTasks = 0;
×
UNCOV
787
  SArray *pLevel = NULL;
×
788

UNCOV
789
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
UNCOV
790
  TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
791

UNCOV
792
  if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
×
UNCOV
793
    goto _end;
×
794
  }
795

796
  level = taosArrayGetSize(pStream->tasks);
×
797
  for (int32_t i = 0; i < level; ++i) {
×
798
    pLevel = taosArrayGetP(pStream->tasks, i);
×
799
    nTasks = taosArrayGetSize(pLevel);
×
800
    for (int32_t j = 0; j < nTasks; ++j) {
×
801
      code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
802
      TSDB_CHECK_CODE(code, lino, _end);
×
803
    }
804
  }
805

806
  if (pStream->conf.fillHistory && createReq->notifyHistory) {
×
807
    level = taosArrayGetSize(pStream->pHTasksList);
×
808
    for (int32_t i = 0; i < level; ++i) {
×
809
      pLevel = taosArrayGetP(pStream->pHTasksList, i);
×
810
      nTasks = taosArrayGetSize(pLevel);
×
811
      for (int32_t j = 0; j < nTasks; ++j) {
×
812
        code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
813
        TSDB_CHECK_CODE(code, lino, _end);
×
814
      }
815
    }
816
  }
817

818
_end:
×
UNCOV
819
  if (code != TSDB_CODE_SUCCESS) {
×
820
    mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
×
821
  }
UNCOV
822
  return code;
×
823
}
824

UNCOV
825
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
×
UNCOV
826
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
827
  SStreamObj *pStream = NULL;
×
UNCOV
828
  SStreamObj  streamObj = {0};
×
UNCOV
829
  char       *sql = NULL;
×
UNCOV
830
  int32_t     sqlLen = 0;
×
UNCOV
831
  const char *pMsg = "create stream tasks on dnodes";
×
UNCOV
832
  int32_t     code = TSDB_CODE_SUCCESS;
×
UNCOV
833
  int32_t     lino = 0;
×
UNCOV
834
  STrans     *pTrans = NULL;
×
835

UNCOV
836
  SCMCreateStreamReq createReq = {0};
×
UNCOV
837
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
×
UNCOV
838
  TSDB_CHECK_CODE(code, lino, _OVER);
×
839

840
#ifdef WINDOWS
841
  code = TSDB_CODE_MND_INVALID_PLATFORM;
842
  goto _OVER;
843
#endif
844

UNCOV
845
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
×
UNCOV
846
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
×
847
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
848
    goto _OVER;
×
849
  }
850

UNCOV
851
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
×
UNCOV
852
  if (pStream != NULL && code == 0) {
×
UNCOV
853
    if (createReq.igExists) {
×
UNCOV
854
      mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
×
UNCOV
855
      mndReleaseStream(pMnode, pStream);
×
UNCOV
856
      tFreeSCMCreateStreamReq(&createReq);
×
UNCOV
857
      return code;
×
858
    } else {
UNCOV
859
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
UNCOV
860
      goto _OVER;
×
861
    }
UNCOV
862
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
863
    goto _OVER;
×
864
  }
865

UNCOV
866
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
×
867
    goto _OVER;
×
868
  }
869

UNCOV
870
  if (createReq.sql != NULL) {
×
UNCOV
871
    sql = taosStrdup(createReq.sql);
×
UNCOV
872
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
×
873
  }
874

875
  // check for the taskEp update trans
UNCOV
876
  if (isNodeUpdateTransActive()) {
×
877
    mError("stream:%s failed to create stream, node update trans is active", createReq.name);
×
878
    code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
879
    goto _OVER;
×
880
  }
881

UNCOV
882
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
×
UNCOV
883
  if (pSourceDb == NULL) {
×
884
    code = terrno;
×
885
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
886
          tstrerror(code));
887
    goto _OVER;
×
888
  }
889

UNCOV
890
  code = mndCheckForSnode(pMnode, pSourceDb);
×
UNCOV
891
  mndReleaseDb(pMnode, pSourceDb);
×
UNCOV
892
  if (code != 0) {
×
UNCOV
893
    goto _OVER;
×
894
  }
895

896
  // build stream obj from request
UNCOV
897
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
×
898
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
899
    goto _OVER;
×
900
  }
901

UNCOV
902
  code = doStreamCheck(pMnode, &streamObj);
×
UNCOV
903
  TSDB_CHECK_CODE(code, lino, _OVER);
×
904

UNCOV
905
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
×
UNCOV
906
  if (pTrans == NULL || code) {
×
907
    goto _OVER;
×
908
  }
909

910
  // create stb for stream
UNCOV
911
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
×
UNCOV
912
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
×
913
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
914
      mndTransDrop(pTrans);
×
915
      goto _OVER;
×
916
    }
917
  } else {
UNCOV
918
    mDebug("stream:%s no need create stable", createReq.name);
×
919
  }
920

921
  // schedule stream task for stream obj
UNCOV
922
  code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
×
UNCOV
923
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
924
    mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
925
    mndTransDrop(pTrans);
×
926
    goto _OVER;
×
927
  }
928

929
  // add notify info into all stream tasks
UNCOV
930
  code = addStreamNotifyInfo(&createReq, &streamObj);
×
UNCOV
931
  if (code != TSDB_CODE_SUCCESS) {
×
932
    mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
×
933
    mndTransDrop(pTrans);
×
934
    goto _OVER;
×
935
  }
936

937
  // add stream to trans
UNCOV
938
  code = mndPersistStream(pTrans, &streamObj);
×
UNCOV
939
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
940
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
941
    mndTransDrop(pTrans);
×
942
    goto _OVER;
×
943
  }
944

UNCOV
945
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
×
946
    mndTransDrop(pTrans);
×
947
    goto _OVER;
×
948
  }
949

UNCOV
950
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
×
951
    mndTransDrop(pTrans);
×
952
    goto _OVER;
×
953
  }
954

955
  // add into buffer firstly
956
  // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
UNCOV
957
  streamMutexLock(&execInfo.lock);
×
UNCOV
958
  mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
×
UNCOV
959
  saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
×
UNCOV
960
  streamMutexUnlock(&execInfo.lock);
×
961

962
  // execute creation
UNCOV
963
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
964
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
965
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
966
    mndTransDrop(pTrans);
×
967
    goto _OVER;
×
968
  }
969

UNCOV
970
  mndTransDrop(pTrans);
×
971

UNCOV
972
  SName dbname = {0};
×
UNCOV
973
  code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
974
  if (code) {
×
975
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
976
    goto _OVER;
×
977
  }
978

UNCOV
979
  SName name = {0};
×
UNCOV
980
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
×
UNCOV
981
  if (code) {
×
982
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
983
    goto _OVER;
×
984
  }
985

986
  // reuse this function for stream
UNCOV
987
  if (sql != NULL && sqlLen > 0) {
×
988
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
989
  } else {
UNCOV
990
    char detail[1000] = {0};
×
UNCOV
991
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
×
UNCOV
992
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
×
993
  }
994

UNCOV
995
_OVER:
×
UNCOV
996
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
997
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
×
998
  } else {
UNCOV
999
    mDebug("stream:%s create stream completed", createReq.name);
×
UNCOV
1000
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1001
  }
1002

UNCOV
1003
  mndReleaseStream(pMnode, pStream);
×
UNCOV
1004
  tFreeSCMCreateStreamReq(&createReq);
×
UNCOV
1005
  tFreeStreamObj(&streamObj);
×
1006

UNCOV
1007
  if (sql != NULL) {
×
UNCOV
1008
    taosMemoryFreeClear(sql);
×
1009
  }
1010

UNCOV
1011
  return code;
×
1012
}
1013

1014
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
1015
  SMnode          *pMnode = pReq->info.node;
×
1016
  SStreamObj      *pStream = NULL;
×
1017
  int32_t          code = 0;
×
1018
  SMPauseStreamReq pauseReq = {0};
×
1019

1020
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1021
    return TSDB_CODE_INVALID_MSG;
×
1022
  }
1023

1024
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
1025
  if (pStream == NULL || code != 0) {
×
1026
    if (pauseReq.igNotExists) {
×
1027
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
1028
      return 0;
×
1029
    } else {
1030
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
1031
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1032
    }
1033
  }
1034

1035
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
1036
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
1037
    sdbRelease(pMnode->pSdb, pStream);
×
1038
    return code;
×
1039
  }
1040

1041
  // check if it is conflict with other trans in both sourceDb and targetDb.
1042
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
1043
  if (code) {
×
1044
    sdbRelease(pMnode->pSdb, pStream);
×
1045
    return code;
×
1046
  }
1047

1048
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
1049
  if (updated) {
×
1050
    mError("tasks are not ready for restart, node update detected");
×
1051
    sdbRelease(pMnode->pSdb, pStream);
×
1052
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1053
  }
1054

1055
  STrans *pTrans = NULL;
×
1056
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
1057
                       &pTrans);
1058
  if (pTrans == NULL || code) {
×
1059
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1060
    sdbRelease(pMnode->pSdb, pStream);
×
1061
    return code;
×
1062
  }
1063

1064
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
1065
  if (code) {
×
1066
    sdbRelease(pMnode->pSdb, pStream);
×
1067
    mndTransDrop(pTrans);
×
1068
    return code;
×
1069
  }
1070

1071
  // if nodeUpdate happened, not send pause trans
1072
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
1073
  if (code) {
×
1074
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
1075
    sdbRelease(pMnode->pSdb, pStream);
×
1076
    mndTransDrop(pTrans);
×
1077
    return code;
×
1078
  }
1079

1080
  code = mndTransPrepare(pMnode, pTrans);
×
1081
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1082
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
1083
    sdbRelease(pMnode->pSdb, pStream);
×
1084
    mndTransDrop(pTrans);
×
1085
    return code;
×
1086
  }
1087

1088
  sdbRelease(pMnode->pSdb, pStream);
×
1089
  mndTransDrop(pTrans);
×
1090

1091
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1092
}
1093

UNCOV
1094
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
×
UNCOV
1095
  SStreamObj *pStream = NULL;
×
UNCOV
1096
  void       *pIter = NULL;
×
UNCOV
1097
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1098
  int64_t     maxChkptId = 0;
×
1099

1100
  while (1) {
UNCOV
1101
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
1102
    if (pIter == NULL) break;
×
1103

UNCOV
1104
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
×
UNCOV
1105
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
×
1106
           pStream->checkpointId);
UNCOV
1107
    sdbRelease(pSdb, pStream);
×
1108
  }
1109

1110
  {  // check the max checkpoint id from all vnodes.
UNCOV
1111
    int64_t maxCheckpointId = -1;
×
UNCOV
1112
    if (lock) {
×
UNCOV
1113
      streamMutexLock(&execInfo.lock);
×
1114
    }
1115

UNCOV
1116
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
UNCOV
1117
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
1118
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
1119
      if (p == NULL || pEntry == NULL) {
×
1120
        continue;
×
1121
      }
1122

UNCOV
1123
      if (pEntry->checkpointInfo.failed) {
×
1124
        continue;
×
1125
      }
1126

UNCOV
1127
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
×
UNCOV
1128
        maxCheckpointId = pEntry->checkpointInfo.latestId;
×
1129
      }
1130
    }
1131

UNCOV
1132
    if (lock) {
×
UNCOV
1133
      streamMutexUnlock(&execInfo.lock);
×
1134
    }
1135

UNCOV
1136
    if (maxCheckpointId > maxChkptId) {
×
1137
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1138
             maxCheckpointId);
1139
      maxChkptId = maxCheckpointId;
×
1140
    }
1141
  }
1142

UNCOV
1143
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
×
UNCOV
1144
  return maxChkptId + 1;
×
1145
}
1146

UNCOV
1147
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
×
1148
                                               int8_t mndTrigger, bool lock) {
UNCOV
1149
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1150
  bool    conflict = false;
×
UNCOV
1151
  int64_t ts = taosGetTimestampMs();
×
UNCOV
1152
  STrans *pTrans = NULL;
×
1153

UNCOV
1154
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
×
1155
    return code;
×
1156
  }
1157

UNCOV
1158
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
×
UNCOV
1159
  if (code) {
×
UNCOV
1160
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1161
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
UNCOV
1162
    goto _ERR;
×
1163
  }
1164

UNCOV
1165
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
×
1166
                       "gen checkpoint for stream", &pTrans);
UNCOV
1167
  if (code) {
×
1168
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1169
           tstrerror(code));
1170
    goto _ERR;
×
1171
  }
1172

UNCOV
1173
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
×
UNCOV
1174
  if (code) {
×
1175
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
1176
    goto _ERR;
×
1177
  }
1178

UNCOV
1179
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
×
1180

UNCOV
1181
  taosWLockLatch(&pStream->lock);
×
UNCOV
1182
  pStream->currentTick = 1;
×
1183

1184
  // 1. redo action: broadcast checkpoint source msg for all source vg
UNCOV
1185
  int32_t totalLevel = taosArrayGetSize(pStream->tasks);
×
UNCOV
1186
  for (int32_t i = 0; i < totalLevel; i++) {
×
UNCOV
1187
    SArray      *pLevel = taosArrayGetP(pStream->tasks, i);
×
UNCOV
1188
    SStreamTask *p = taosArrayGetP(pLevel, 0);
×
1189

UNCOV
1190
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
×
UNCOV
1191
      int32_t sz = taosArrayGetSize(pLevel);
×
UNCOV
1192
      for (int32_t j = 0; j < sz; j++) {
×
UNCOV
1193
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
×
UNCOV
1194
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
×
1195

UNCOV
1196
        if (code != TSDB_CODE_SUCCESS) {
×
1197
          taosWUnLockLatch(&pStream->lock);
×
1198
          goto _ERR;
×
1199
        }
1200
      }
1201
    }
1202
  }
1203

1204
  // 2. reset tick
UNCOV
1205
  pStream->checkpointId = checkpointId;
×
UNCOV
1206
  pStream->checkpointFreq = taosGetTimestampMs();
×
UNCOV
1207
  pStream->currentTick = 0;
×
1208

1209
  // 3. commit log: stream checkpoint info
UNCOV
1210
  pStream->version = pStream->version + 1;
×
UNCOV
1211
  taosWUnLockLatch(&pStream->lock);
×
1212

UNCOV
1213
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
×
1214
    goto _ERR;
×
1215
  }
1216

UNCOV
1217
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1218
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1219
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1220
  } else {
UNCOV
1221
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1222
  }
1223

UNCOV
1224
_ERR:
×
UNCOV
1225
  mndTransDrop(pTrans);
×
UNCOV
1226
  return code;
×
1227
}
1228

UNCOV
1229
int32_t extractStreamNodeList(SMnode *pMnode) {
×
UNCOV
1230
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
×
UNCOV
1231
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
×
UNCOV
1232
    if (code) {
×
1233
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
1234
      return code;
×
1235
    }
1236
  }
1237

UNCOV
1238
  return taosArrayGetSize(execInfo.pNodeList);
×
1239
}
1240

UNCOV
1241
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
×
UNCOV
1242
  int32_t code = 0;
×
UNCOV
1243
  if (mndStreamNodeIsUpdated(pMnode)) {
×
UNCOV
1244
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1245
  }
1246

UNCOV
1247
  streamMutexLock(&execInfo.lock);
×
UNCOV
1248
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
×
UNCOV
1249
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
×
UNCOV
1250
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
×
1251
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
1252
      code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1253
    }
1254
  }
1255

UNCOV
1256
  streamMutexUnlock(&execInfo.lock);
×
UNCOV
1257
  return code;
×
1258
}
1259

UNCOV
1260
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
×
UNCOV
1261
  int64_t ts = -1;
×
UNCOV
1262
  int32_t taskId = -1;
×
1263

UNCOV
1264
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
×
UNCOV
1265
    STaskId          *p = taosArrayGet(pTaskList, i);
×
UNCOV
1266
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
1267
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
×
UNCOV
1268
      continue;
×
1269
    }
1270

1271
    // -1 denote not ready now or never ready till now
UNCOV
1272
    if (pEntry->hTaskId != 0) {
×
UNCOV
1273
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
×
1274
            " exists, checkpoint not issued",
1275
            pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1276
            pEntry->hTaskId);
UNCOV
1277
      return -1;
×
1278
    }
1279

UNCOV
1280
    if (pEntry->status != TASK_STATUS__READY) {
×
UNCOV
1281
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
×
1282
            (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
UNCOV
1283
      return -1;
×
1284
    }
1285

UNCOV
1286
    if (ts < pEntry->startTime) {
×
UNCOV
1287
      ts = pEntry->startTime;
×
UNCOV
1288
      taskId = pEntry->id.taskId;
×
1289
    }
1290
  }
1291

UNCOV
1292
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
×
UNCOV
1293
  return ts;
×
1294
}
1295

1296
typedef struct {
1297
  int64_t streamId;
1298
  int64_t duration;
1299
} SCheckpointInterval;
1300

UNCOV
1301
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
×
UNCOV
1302
  const SCheckpointInterval *pInt1 = p1;
×
UNCOV
1303
  const SCheckpointInterval *pInt2 = p2;
×
UNCOV
1304
  if (pInt1->duration == pInt2->duration) {
×
UNCOV
1305
    return 0;
×
1306
  }
1307

UNCOV
1308
  return pInt1->duration > pInt2->duration ? -1 : 1;
×
1309
}
1310

1311
// all tasks of this stream should be ready, otherwise do nothing
UNCOV
1312
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
×
UNCOV
1313
  bool ready = false;
×
1314

UNCOV
1315
  streamMutexLock(&execInfo.lock);
×
1316

UNCOV
1317
  int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
×
UNCOV
1318
  if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
×
1319

UNCOV
1320
    if (lastReadyTs != -1) {
×
UNCOV
1321
      mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64
×
1322
            "ms less than threshold",
1323
            pStream->uid, lastReadyTs, (now - lastReadyTs));
1324
    }
1325

UNCOV
1326
    ready = false;
×
1327
  } else {
UNCOV
1328
    ready = true;
×
1329
  }
1330

UNCOV
1331
  streamMutexUnlock(&execInfo.lock);
×
UNCOV
1332
  return ready;
×
1333
}
1334

UNCOV
1335
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
×
UNCOV
1336
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1337
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1338
  void       *pIter = NULL;
×
UNCOV
1339
  SStreamObj *pStream = NULL;
×
UNCOV
1340
  int32_t     code = 0;
×
UNCOV
1341
  int32_t     numOfCheckpointTrans = 0;
×
1342

UNCOV
1343
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
×
UNCOV
1344
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1345
  }
1346

UNCOV
1347
  SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
×
UNCOV
1348
  if (pList == NULL) {
×
1349
    return terrno;
×
1350
  }
1351

UNCOV
1352
  int64_t now = taosGetTimestampMs();
×
1353

UNCOV
1354
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
UNCOV
1355
    int64_t duration = now - pStream->checkpointFreq;
×
UNCOV
1356
    if (duration < tsStreamCheckpointInterval * 1000) {
×
UNCOV
1357
      sdbRelease(pSdb, pStream);
×
UNCOV
1358
      continue;
×
1359
    }
1360

UNCOV
1361
    bool ready = isStreamReadyHelp(now, pStream);
×
UNCOV
1362
    if (!ready) {
×
UNCOV
1363
      sdbRelease(pSdb, pStream);
×
UNCOV
1364
      continue;
×
1365
    }
1366

UNCOV
1367
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
×
UNCOV
1368
    void               *p = taosArrayPush(pList, &in);
×
UNCOV
1369
    if (p) {
×
UNCOV
1370
      int32_t currentSize = taosArrayGetSize(pList);
×
UNCOV
1371
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
×
1372
             "s), concurrently launch threshold:%d",
1373
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1374
             tsMaxConcurrentCheckpoint);
1375
    } else {
1376
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1377
    }
UNCOV
1378
    sdbRelease(pSdb, pStream);
×
1379
  }
1380

UNCOV
1381
  int32_t size = taosArrayGetSize(pList);
×
UNCOV
1382
  if (size == 0) {
×
UNCOV
1383
    taosArrayDestroy(pList);
×
UNCOV
1384
    return code;
×
1385
  }
1386

UNCOV
1387
  taosArraySort(pList, streamWaitComparFn);
×
UNCOV
1388
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
×
UNCOV
1389
  if (code) {
×
1390
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1391
    taosArrayDestroy(pList);
×
1392
    return code;
×
1393
  }
1394

UNCOV
1395
  int32_t numOfQual = taosArrayGetSize(pList);
×
UNCOV
1396
  if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
×
1397
    mDebug(
×
1398
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1399
        "checkpoint trans are not allowed, wait for 30s",
1400
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1401
    taosArrayDestroy(pList);
×
1402
    return code;
×
1403
  }
1404

UNCOV
1405
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
×
UNCOV
1406
  mDebug(
×
1407
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1408
      "concurrent trans threshold:%d",
1409
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1410

UNCOV
1411
  int32_t started = 0;
×
UNCOV
1412
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
×
1413

UNCOV
1414
  for (int32_t i = 0; i < numOfQual; ++i) {
×
UNCOV
1415
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
×
UNCOV
1416
    if (pCheckpointInfo == NULL) {
×
1417
      continue;
×
1418
    }
1419

UNCOV
1420
    SStreamObj *p = NULL;
×
UNCOV
1421
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
×
UNCOV
1422
    if (p != NULL && code == 0) {
×
UNCOV
1423
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
×
UNCOV
1424
      sdbRelease(pSdb, p);
×
1425

UNCOV
1426
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1427
        started += 1;
×
1428

UNCOV
1429
        if (started >= capacity) {
×
UNCOV
1430
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
×
1431
                 (started + numOfCheckpointTrans));
UNCOV
1432
          break;
×
1433
        }
1434
      } else {
1435
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1436
      }
1437
    }
1438
  }
1439

UNCOV
1440
  taosArrayDestroy(pList);
×
UNCOV
1441
  return code;
×
1442
}
1443

UNCOV
1444
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
×
UNCOV
1445
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1446
  SStreamObj *pStream = NULL;
×
UNCOV
1447
  int32_t     code = 0;
×
1448

UNCOV
1449
  SMDropStreamReq dropReq = {0};
×
UNCOV
1450
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
×
1451
    mError("invalid drop stream msg recv, discarded");
×
1452
    code = TSDB_CODE_INVALID_MSG;
×
1453
    TAOS_RETURN(code);
×
1454
  }
1455

UNCOV
1456
  mDebug("recv drop stream:%s msg", dropReq.name);
×
1457

UNCOV
1458
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
×
UNCOV
1459
  if (pStream == NULL || code != 0) {
×
UNCOV
1460
    if (dropReq.igNotExists) {
×
UNCOV
1461
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
UNCOV
1462
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1463
      tFreeMDropStreamReq(&dropReq);
×
UNCOV
1464
      return 0;
×
1465
    } else {
UNCOV
1466
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
UNCOV
1467
      tFreeMDropStreamReq(&dropReq);
×
UNCOV
1468
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1469
    }
1470
  }
1471

UNCOV
1472
  if (pStream->smaId != 0) {
×
UNCOV
1473
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
×
1474

UNCOV
1475
    void    *pIter = NULL;
×
UNCOV
1476
    SSmaObj *pSma = NULL;
×
UNCOV
1477
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
UNCOV
1478
    while (pIter) {
×
UNCOV
1479
      if (pSma && pSma->uid == pStream->smaId) {
×
UNCOV
1480
        sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
1481
        sdbRelease(pMnode->pSdb, pStream);
×
1482

UNCOV
1483
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
1484
        tFreeMDropStreamReq(&dropReq);
×
UNCOV
1485
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
1486

UNCOV
1487
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
×
1488
               dropReq.name, pStream->uid, tstrerror(terrno));
UNCOV
1489
        TAOS_RETURN(code);
×
1490
      }
1491

UNCOV
1492
      if (pSma) {
×
UNCOV
1493
        sdbRelease(pMnode->pSdb, pSma);
×
1494
      }
1495

UNCOV
1496
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1497
    }
1498
  }
1499

UNCOV
1500
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
×
1501
    sdbRelease(pMnode->pSdb, pStream);
×
1502
    tFreeMDropStreamReq(&dropReq);
×
1503
    return -1;
×
1504
  }
1505

1506
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
1507
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
×
UNCOV
1508
  if (code) {
×
1509
    sdbRelease(pMnode->pSdb, pStream);
×
1510
    tFreeMDropStreamReq(&dropReq);
×
1511
    return code;
×
1512
  }
1513

UNCOV
1514
  STrans *pTrans = NULL;
×
UNCOV
1515
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
×
UNCOV
1516
  if (pTrans == NULL || code) {
×
1517
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1518
    sdbRelease(pMnode->pSdb, pStream);
×
1519
    tFreeMDropStreamReq(&dropReq);
×
1520
    TAOS_RETURN(code);
×
1521
  }
1522

UNCOV
1523
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
×
UNCOV
1524
  if (code) {
×
1525
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1526
    sdbRelease(pMnode->pSdb, pStream);
×
1527
    mndTransDrop(pTrans);
×
1528
    tFreeMDropStreamReq(&dropReq);
×
1529
    TAOS_RETURN(code);
×
1530
  }
1531

1532
  // drop all tasks
UNCOV
1533
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
×
UNCOV
1534
  if (code) {
×
1535
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1536
    sdbRelease(pMnode->pSdb, pStream);
×
1537
    mndTransDrop(pTrans);
×
1538
    tFreeMDropStreamReq(&dropReq);
×
1539
    TAOS_RETURN(code);
×
1540
  }
1541

1542
  // drop stream
UNCOV
1543
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
×
UNCOV
1544
  if (code) {
×
1545
    sdbRelease(pMnode->pSdb, pStream);
×
1546
    mndTransDrop(pTrans);
×
1547
    tFreeMDropStreamReq(&dropReq);
×
1548
    TAOS_RETURN(code);
×
1549
  }
1550

UNCOV
1551
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1552
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1553
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1554
    sdbRelease(pMnode->pSdb, pStream);
×
1555
    mndTransDrop(pTrans);
×
1556
    tFreeMDropStreamReq(&dropReq);
×
1557
    TAOS_RETURN(code);
×
1558
  }
1559

1560
  // kill the related checkpoint trans
UNCOV
1561
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
×
UNCOV
1562
  if (transId != 0) {
×
1563
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
1564
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1565
  }
1566

UNCOV
1567
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
×
1568
         pStream->uid, transId);
1569

UNCOV
1570
  removeStreamTasksInBuf(pStream, &execInfo);
×
1571

UNCOV
1572
  SName name = {0};
×
UNCOV
1573
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1574
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
×
1575

UNCOV
1576
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1577
  mndTransDrop(pTrans);
×
UNCOV
1578
  tFreeMDropStreamReq(&dropReq);
×
1579

UNCOV
1580
  if (code == 0) {
×
UNCOV
1581
    return TSDB_CODE_ACTION_IN_PROGRESS;
×
1582
  } else {
UNCOV
1583
    TAOS_RETURN(code);
×
1584
  }
1585
}
1586

UNCOV
1587
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
×
UNCOV
1588
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
1589
  void   *pIter = NULL;
×
UNCOV
1590
  int32_t code = 0;
×
1591

UNCOV
1592
  while (1) {
×
UNCOV
1593
    SStreamObj *pStream = NULL;
×
UNCOV
1594
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
1595
    if (pIter == NULL) break;
×
1596

UNCOV
1597
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
×
UNCOV
1598
      if (pStream->sourceDbUid != pStream->targetDbUid) {
×
UNCOV
1599
        sdbRelease(pSdb, pStream);
×
UNCOV
1600
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1601
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
×
1602
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
UNCOV
1603
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
×
1604
      } else {
1605
        // kill the related checkpoint trans
UNCOV
1606
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
×
UNCOV
1607
        if (transId != 0) {
×
1608
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
1609
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1610
        }
1611

1612
        // drop the stream obj in execInfo
UNCOV
1613
        removeStreamTasksInBuf(pStream, &execInfo);
×
1614

UNCOV
1615
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
×
UNCOV
1616
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1617
          sdbRelease(pSdb, pStream);
×
1618
          sdbCancelFetch(pSdb, pIter);
×
1619
          return code;
×
1620
        }
1621
      }
1622
    }
1623

UNCOV
1624
    sdbRelease(pSdb, pStream);
×
1625
  }
1626

UNCOV
1627
  return 0;
×
1628
}
1629

UNCOV
1630
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
1631
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1632
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1633
  int32_t     numOfRows = 0;
×
UNCOV
1634
  SStreamObj *pStream = NULL;
×
UNCOV
1635
  int32_t     code = 0;
×
1636

UNCOV
1637
  while (numOfRows < rows) {
×
UNCOV
1638
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
UNCOV
1639
    if (pShow->pIter == NULL) break;
×
1640

UNCOV
1641
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
×
UNCOV
1642
    if (code == 0) {
×
UNCOV
1643
      numOfRows++;
×
1644
    }
UNCOV
1645
    sdbRelease(pSdb, pStream);
×
1646
  }
1647

UNCOV
1648
  pShow->numOfRows += numOfRows;
×
UNCOV
1649
  return numOfRows;
×
1650
}
1651

1652
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1653
  SSdb *pSdb = pMnode->pSdb;
×
1654
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1655
}
×
1656

UNCOV
1657
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
UNCOV
1658
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1659
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1660
  int32_t     numOfRows = 0;
×
UNCOV
1661
  SStreamObj *pStream = NULL;
×
UNCOV
1662
  int32_t     code = 0;
×
1663

UNCOV
1664
  streamMutexLock(&execInfo.lock);
×
UNCOV
1665
  mndInitStreamExecInfo(pMnode, &execInfo);
×
UNCOV
1666
  streamMutexUnlock(&execInfo.lock);
×
1667

UNCOV
1668
  while (numOfRows < rowsCapacity) {
×
UNCOV
1669
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
UNCOV
1670
    if (pShow->pIter == NULL) {
×
UNCOV
1671
      break;
×
1672
    }
1673

1674
    // lock
UNCOV
1675
    taosRLockLatch(&pStream->lock);
×
1676

UNCOV
1677
    int32_t count = mndGetNumOfStreamTasks(pStream);
×
UNCOV
1678
    if (numOfRows + count > rowsCapacity) {
×
UNCOV
1679
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
×
UNCOV
1680
      if (code) {
×
1681
        mError("failed to prepare the result block buffer, quit return value");
×
1682
        taosRUnLockLatch(&pStream->lock);
×
1683
        sdbRelease(pSdb, pStream);
×
1684
        continue;
×
1685
      }
1686
    }
1687

UNCOV
1688
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
×
UNCOV
1689
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
×
UNCOV
1690
    if (pSourceDb != NULL) {
×
UNCOV
1691
      precision = pSourceDb->cfg.precision;
×
UNCOV
1692
      mndReleaseDb(pMnode, pSourceDb);
×
1693
    }
1694

1695
    // add row for each task
UNCOV
1696
    SStreamTaskIter *pIter = NULL;
×
UNCOV
1697
    code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
1698
    if (code) {
×
1699
      taosRUnLockLatch(&pStream->lock);
×
1700
      sdbRelease(pSdb, pStream);
×
1701
      mError("failed to create task iter for stream:%s", pStream->name);
×
1702
      continue;
×
1703
    }
1704

UNCOV
1705
    while (streamTaskIterNextTask(pIter)) {
×
UNCOV
1706
      SStreamTask *pTask = NULL;
×
UNCOV
1707
      code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
1708
      if (code) {
×
1709
        destroyStreamTaskIter(pIter);
×
1710
        break;
×
1711
      }
1712

UNCOV
1713
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
×
UNCOV
1714
      if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
1715
        numOfRows++;
×
1716
      }
1717
    }
1718

UNCOV
1719
    pBlock->info.rows = numOfRows;
×
1720

UNCOV
1721
    destroyStreamTaskIter(pIter);
×
UNCOV
1722
    taosRUnLockLatch(&pStream->lock);
×
1723

UNCOV
1724
    sdbRelease(pSdb, pStream);
×
1725
  }
1726

UNCOV
1727
  pShow->numOfRows += numOfRows;
×
UNCOV
1728
  return numOfRows;
×
1729
}
1730

1731
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
1732
  SSdb *pSdb = pMnode->pSdb;
×
1733
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1734
}
×
1735

UNCOV
1736
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
×
UNCOV
1737
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1738
  SStreamObj *pStream = NULL;
×
UNCOV
1739
  int32_t     code = 0;
×
1740

UNCOV
1741
  SMPauseStreamReq pauseReq = {0};
×
UNCOV
1742
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1743
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1744
  }
1745

UNCOV
1746
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
UNCOV
1747
  if (pStream == NULL || code != 0) {
×
UNCOV
1748
    if (pauseReq.igNotExists) {
×
UNCOV
1749
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
×
UNCOV
1750
      return 0;
×
1751
    } else {
UNCOV
1752
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
×
UNCOV
1753
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1754
    }
1755
  }
1756

UNCOV
1757
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
×
1758

UNCOV
1759
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
1760
    sdbRelease(pMnode->pSdb, pStream);
×
1761
    return code;
×
1762
  }
1763

1764
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
1765
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
×
UNCOV
1766
  if (code) {
×
1767
    sdbRelease(pMnode->pSdb, pStream);
×
1768
    TAOS_RETURN(code);
×
1769
  }
1770

UNCOV
1771
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
UNCOV
1772
  if (updated) {
×
1773
    mError("tasks are not ready for pause, node update detected");
×
1774
    sdbRelease(pMnode->pSdb, pStream);
×
1775
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1776
  }
1777

1778
  {  // check for tasks, if tasks are not ready, not allowed to pause
UNCOV
1779
    bool found = false;
×
UNCOV
1780
    bool readyToPause = true;
×
UNCOV
1781
    streamMutexLock(&execInfo.lock);
×
1782

UNCOV
1783
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
UNCOV
1784
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
1785
      if (p == NULL) {
×
1786
        continue;
×
1787
      }
1788

UNCOV
1789
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
1790
      if (pEntry == NULL) {
×
1791
        continue;
×
1792
      }
1793

UNCOV
1794
      if (pEntry->id.streamId != pStream->uid) {
×
UNCOV
1795
        continue;
×
1796
      }
1797

UNCOV
1798
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
×
UNCOV
1799
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
×
1800
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
UNCOV
1801
        readyToPause = false;
×
1802
      }
1803

UNCOV
1804
      found = true;
×
1805
    }
1806

UNCOV
1807
    streamMutexUnlock(&execInfo.lock);
×
UNCOV
1808
    if (!found) {
×
1809
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1810
      sdbRelease(pMnode->pSdb, pStream);
×
1811
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1812
    }
1813

UNCOV
1814
    if (!readyToPause) {
×
UNCOV
1815
      mError("stream:%s task not ready for pause yet", pauseReq.name);
×
UNCOV
1816
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1817
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1818
    }
1819
  }
1820

UNCOV
1821
  STrans *pTrans = NULL;
×
UNCOV
1822
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
×
UNCOV
1823
  if (pTrans == NULL || code) {
×
1824
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1825
    sdbRelease(pMnode->pSdb, pStream);
×
1826
    return code;
×
1827
  }
1828

UNCOV
1829
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
×
UNCOV
1830
  if (code) {
×
1831
    sdbRelease(pMnode->pSdb, pStream);
×
1832
    mndTransDrop(pTrans);
×
1833
    return code;
×
1834
  }
1835

1836
  // if nodeUpdate happened, not send pause trans
UNCOV
1837
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
×
UNCOV
1838
  if (code) {
×
1839
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1840
    sdbRelease(pMnode->pSdb, pStream);
×
1841
    mndTransDrop(pTrans);
×
1842
    return code;
×
1843
  }
1844

1845
  // pause stream
UNCOV
1846
  taosWLockLatch(&pStream->lock);
×
UNCOV
1847
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
UNCOV
1848
  if (code) {
×
1849
    taosWUnLockLatch(&pStream->lock);
×
1850
    sdbRelease(pMnode->pSdb, pStream);
×
1851
    mndTransDrop(pTrans);
×
1852
    return code;
×
1853
  }
1854

UNCOV
1855
  taosWUnLockLatch(&pStream->lock);
×
1856

UNCOV
1857
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1858
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1859
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1860
    sdbRelease(pMnode->pSdb, pStream);
×
1861
    mndTransDrop(pTrans);
×
1862
    return code;
×
1863
  }
1864

UNCOV
1865
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1866
  mndTransDrop(pTrans);
×
1867

UNCOV
1868
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1869
}
1870

UNCOV
1871
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
×
UNCOV
1872
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1873
  SStreamObj *pStream = NULL;
×
UNCOV
1874
  int32_t     code = 0;
×
1875

UNCOV
1876
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
1877
    return code;
×
1878
  }
1879

UNCOV
1880
  SMResumeStreamReq resumeReq = {0};
×
UNCOV
1881
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
×
1882
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1883
  }
1884

UNCOV
1885
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
×
UNCOV
1886
  if (pStream == NULL || code != 0) {
×
UNCOV
1887
    if (resumeReq.igNotExists) {
×
UNCOV
1888
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
×
UNCOV
1889
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1890
      return 0;
×
1891
    } else {
UNCOV
1892
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
×
UNCOV
1893
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1894
    }
1895
  }
1896

UNCOV
1897
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
×
UNCOV
1898
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
×
1899
    sdbRelease(pMnode->pSdb, pStream);
×
1900
    return -1;
×
1901
  }
1902

1903
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
1904
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
×
UNCOV
1905
  if (code) {
×
1906
    sdbRelease(pMnode->pSdb, pStream);
×
1907
    return code;
×
1908
  }
1909

UNCOV
1910
  STrans *pTrans = NULL;
×
1911
  code =
UNCOV
1912
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
×
UNCOV
1913
  if (pTrans == NULL || code) {
×
1914
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
1915
    sdbRelease(pMnode->pSdb, pStream);
×
1916
    return code;
×
1917
  }
1918

UNCOV
1919
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
×
UNCOV
1920
  if (code) {
×
1921
    sdbRelease(pMnode->pSdb, pStream);
×
1922
    mndTransDrop(pTrans);
×
1923
    return code;
×
1924
  }
1925

1926
  // set the resume action
UNCOV
1927
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
×
UNCOV
1928
  if (code) {
×
1929
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
1930
    sdbRelease(pMnode->pSdb, pStream);
×
1931
    mndTransDrop(pTrans);
×
1932
    return code;
×
1933
  }
1934

1935
  // resume stream
UNCOV
1936
  taosWLockLatch(&pStream->lock);
×
UNCOV
1937
  pStream->status = STREAM_STATUS__NORMAL;
×
UNCOV
1938
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
×
1939
    taosWUnLockLatch(&pStream->lock);
×
1940

1941
    sdbRelease(pMnode->pSdb, pStream);
×
1942
    mndTransDrop(pTrans);
×
1943
    return code;
×
1944
  }
1945

UNCOV
1946
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
1947
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1948
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1949
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1950
    sdbRelease(pMnode->pSdb, pStream);
×
1951
    mndTransDrop(pTrans);
×
1952
    return code;
×
1953
  }
1954

UNCOV
1955
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1956
  mndTransDrop(pTrans);
×
1957

UNCOV
1958
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1959
}
1960

1961
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
1962
  SMnode     *pMnode = pReq->info.node;
×
1963
  SStreamObj *pStream = NULL;
×
1964
  int32_t     code = 0;
×
1965

1966
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
1967
    return code;
×
1968
  }
1969

1970
  SMResetStreamReq resetReq = {0};
×
1971
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
1972
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1973
  }
1974

1975
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
1976

1977
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
1978
  if (pStream == NULL || code != 0) {
×
1979
    if (resetReq.igNotExists) {
×
1980
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
1981
      return 0;
×
1982
    } else {
1983
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
1984
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1985
    }
1986
  }
1987

1988
  //todo(liao hao jun)
1989
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1990
}
1991

UNCOV
1992
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
×
UNCOV
1993
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1994
  SStreamObj *pStream = NULL;
×
UNCOV
1995
  void       *pIter = NULL;
×
UNCOV
1996
  STrans     *pTrans = NULL;
×
UNCOV
1997
  int32_t     code = 0;
×
1998

1999
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
2000
  while (1) {
UNCOV
2001
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
2002
    if (pIter == NULL) {
×
UNCOV
2003
      break;
×
2004
    }
2005

UNCOV
2006
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
×
UNCOV
2007
    sdbRelease(pSdb, pStream);
×
2008

UNCOV
2009
    if (code) {
×
2010
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
2011
      sdbCancelFetch(pSdb, pIter);
×
2012
      return code;
×
2013
    }
2014
  }
2015

2016
  while (1) {
UNCOV
2017
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
2018
    if (pIter == NULL) {
×
UNCOV
2019
      break;
×
2020
    }
2021

2022
    // here create only one trans
UNCOV
2023
    if (pTrans == NULL) {
×
UNCOV
2024
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
×
2025
                           "update task epsets", &pTrans);
UNCOV
2026
      if (pTrans == NULL || code) {
×
2027
        sdbRelease(pSdb, pStream);
×
2028
        sdbCancelFetch(pSdb, pIter);
×
2029
        return terrno = code;
×
2030
      }
2031
    }
2032

UNCOV
2033
    if (!includeAllNodes) {
×
UNCOV
2034
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
×
UNCOV
2035
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
×
UNCOV
2036
      if (p1 == NULL && p2 == NULL) {
×
2037
        mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
×
2038
        sdbRelease(pSdb, pStream);
×
2039
        continue;
×
2040
      }
2041
    }
2042

UNCOV
2043
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
×
2044
           pStream->name, pTrans->id);
2045

2046
    // NOTE: for each stream, we register one trans entry for task update
UNCOV
2047
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
×
UNCOV
2048
    if (code) {
×
2049
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
2050
    }
2051

UNCOV
2052
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
×
2053

2054
    // todo: not continue, drop all and retry again
UNCOV
2055
    if (code != TSDB_CODE_SUCCESS) {
×
2056
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
2057
             tstrerror(code));
2058
      sdbRelease(pSdb, pStream);
×
2059
      continue;
×
2060
    }
2061

UNCOV
2062
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
UNCOV
2063
    sdbRelease(pSdb, pStream);
×
2064

UNCOV
2065
    if (code != TSDB_CODE_SUCCESS) {
×
2066
      sdbCancelFetch(pSdb, pIter);
×
2067
      return code;
×
2068
    }
2069
  }
2070

2071
  // no need to build the trans to handle the vgroup update
UNCOV
2072
  if (pTrans == NULL) {
×
2073
    return 0;
×
2074
  }
2075

UNCOV
2076
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
2077
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2078
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2079
    sdbRelease(pMnode->pSdb, pStream);
×
2080
    mndTransDrop(pTrans);
×
2081
    return code;
×
2082
  }
2083

UNCOV
2084
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2085
  mndTransDrop(pTrans);
×
UNCOV
2086
  return code;
×
2087
}
2088

UNCOV
2089
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
×
UNCOV
2090
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
2091
  SStreamObj *pStream = NULL;
×
UNCOV
2092
  void       *pIter = NULL;
×
UNCOV
2093
  int32_t     code = 0;
×
2094

UNCOV
2095
  mDebug("start to refresh node list by existed streams");
×
2096

UNCOV
2097
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
UNCOV
2098
  if (pHash == NULL) {
×
2099
    return terrno;
×
2100
  }
2101

UNCOV
2102
  while (1) {
×
UNCOV
2103
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
2104
    if (pIter == NULL) {
×
UNCOV
2105
      break;
×
2106
    }
2107

UNCOV
2108
    taosWLockLatch(&pStream->lock);
×
2109

UNCOV
2110
    SStreamTaskIter *pTaskIter = NULL;
×
UNCOV
2111
    code = createStreamTaskIter(pStream, &pTaskIter);
×
UNCOV
2112
    if (code) {
×
2113
      taosWUnLockLatch(&pStream->lock);
×
2114
      sdbRelease(pSdb, pStream);
×
2115
      mError("failed to create task iter for stream:%s", pStream->name);
×
2116
      continue;
×
2117
    }
2118

UNCOV
2119
    while (streamTaskIterNextTask(pTaskIter)) {
×
UNCOV
2120
      SStreamTask *pTask = NULL;
×
UNCOV
2121
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
×
UNCOV
2122
      if (code) {
×
2123
        break;
×
2124
      }
2125

UNCOV
2126
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
×
UNCOV
2127
      epsetAssign(&entry.epset, &pTask->info.epSet);
×
UNCOV
2128
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
×
UNCOV
2129
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
×
2130
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2131
      }
2132
    }
2133

UNCOV
2134
    destroyStreamTaskIter(pTaskIter);
×
UNCOV
2135
    taosWUnLockLatch(&pStream->lock);
×
2136

UNCOV
2137
    sdbRelease(pSdb, pStream);
×
2138
  }
2139

UNCOV
2140
  taosArrayClear(pNodeList);
×
2141

2142
  // convert to list
UNCOV
2143
  pIter = NULL;
×
UNCOV
2144
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
×
UNCOV
2145
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
×
2146

UNCOV
2147
    void *p = taosArrayPush(pNodeList, pEntry);
×
UNCOV
2148
    if (p == NULL) {
×
2149
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2150
      if (code == 0) {
×
2151
        code = terrno;
×
2152
      }
2153
      continue;
×
2154
    }
2155

UNCOV
2156
    char    buf[256] = {0};
×
UNCOV
2157
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
×
UNCOV
2158
    if (ret != 0) {                                                // print error and continue
×
2159
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2160
    }
2161

UNCOV
2162
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
×
2163
  }
2164

UNCOV
2165
  taosHashCleanup(pHash);
×
2166

UNCOV
2167
  mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
×
UNCOV
2168
  return code;
×
2169
}
2170

2171
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2172
  void   *pIter = NULL;
×
2173
  int32_t code = 0;
×
2174
  while (1) {
×
2175
    SVgObj *pVgroup = NULL;
×
2176
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2177
    if (pIter == NULL) {
×
2178
      break;
×
2179
    }
2180

2181
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2182
    sdbRelease(pSdb, pVgroup);
×
2183

2184
    if (code == 0) {
×
2185
      int32_t size = taosHashGetSize(pDBMap);
×
2186
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2187
    }
2188
  }
2189
}
×
2190

2191
// this function runs by only one thread, so it is not multi-thread safe
UNCOV
2192
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
×
UNCOV
2193
  int32_t code = 0;
×
UNCOV
2194
  bool    allReady = true;
×
UNCOV
2195
  SArray *pNodeSnapshot = NULL;
×
UNCOV
2196
  SMnode *pMnode = pMsg->info.node;
×
UNCOV
2197
  int64_t ts = taosGetTimestampSec();
×
UNCOV
2198
  bool    updateAllVgroups = false;
×
2199

UNCOV
2200
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
×
UNCOV
2201
  if (old != 0) {
×
2202
    mDebug("still in checking node change");
×
2203
    return 0;
×
2204
  }
2205

UNCOV
2206
  mDebug("start to do node changing check");
×
2207

UNCOV
2208
  streamMutexLock(&execInfo.lock);
×
UNCOV
2209
  int32_t numOfNodes = extractStreamNodeList(pMnode);
×
UNCOV
2210
  streamMutexUnlock(&execInfo.lock);
×
2211

UNCOV
2212
  if (numOfNodes == 0) {
×
2213
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2214
    execInfo.ts = ts;
×
2215
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2216
    return 0;
×
2217
  }
2218

UNCOV
2219
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
×
UNCOV
2220
  if (code) {
×
2221
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2222
  }
2223

UNCOV
2224
  if (!allReady) {
×
UNCOV
2225
    taosArrayDestroy(pNodeSnapshot);
×
UNCOV
2226
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
UNCOV
2227
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
×
UNCOV
2228
    return 0;
×
2229
  }
2230

UNCOV
2231
  streamMutexLock(&execInfo.lock);
×
2232

UNCOV
2233
  code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
×
UNCOV
2234
  if (code) {
×
2235
    goto _end;
×
2236
  }
2237

UNCOV
2238
  SVgroupChangeInfo changeInfo = {0};
×
UNCOV
2239
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
×
UNCOV
2240
  if (code) {
×
2241
    goto _end;
×
2242
  }
2243

2244
  {
UNCOV
2245
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
×
2246
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2247
      updateAllVgroups = true;
×
2248
      execInfo.switchFromFollower = false;  // reset the flag
×
2249
      addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
×
2250
    }
2251
  }
2252

UNCOV
2253
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
×
2254
    // kill current active checkpoint transaction, since the transaction is vnode wide.
UNCOV
2255
    killAllCheckpointTrans(pMnode, &changeInfo);
×
UNCOV
2256
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
×
2257

2258
    // keep the new vnode snapshot if success
UNCOV
2259
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2260
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
×
UNCOV
2261
      if (code) {
×
2262
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
2263
        goto _end;
×
2264
      }
2265

UNCOV
2266
      execInfo.ts = ts;
×
UNCOV
2267
      mDebug("create trans successfully, update cached node list, numOfNodes:%d",
×
2268
             (int)taosArrayGetSize(execInfo.pNodeList));
2269
    } else {
2270
      mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
×
2271
    }
2272
  } else {
UNCOV
2273
    mDebug("no update found in nodeList");
×
2274
  }
2275

UNCOV
2276
  mndDestroyVgroupChangeInfo(&changeInfo);
×
2277

UNCOV
2278
_end:
×
UNCOV
2279
  streamMutexUnlock(&execInfo.lock);
×
UNCOV
2280
  taosArrayDestroy(pNodeSnapshot);
×
2281

UNCOV
2282
  mDebug("end to do stream task node change checking");
×
UNCOV
2283
  atomic_store_32(&mndNodeCheckSentinel, 0);
×
UNCOV
2284
  return 0;
×
2285
}
2286

UNCOV
2287
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
×
UNCOV
2288
  SMnode *pMnode = pReq->info.node;
×
UNCOV
2289
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2290
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
×
UNCOV
2291
    return 0;
×
2292
  }
2293

UNCOV
2294
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
×
UNCOV
2295
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
×
UNCOV
2296
  if (pMsg == NULL) {
×
2297
    return terrno;
×
2298
  }
2299

UNCOV
2300
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
×
UNCOV
2301
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
2302
}
2303

UNCOV
2304
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
×
UNCOV
2305
  SStreamTaskIter *pIter = NULL;
×
UNCOV
2306
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
2307
  if (code) {
×
2308
    mError("failed to create task iter for stream:%s", pStream->name);
×
2309
    return;
×
2310
  }
2311

UNCOV
2312
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
2313
    SStreamTask *pTask = NULL;
×
UNCOV
2314
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
2315
    if (code) {
×
2316
      break;
×
2317
    }
2318

UNCOV
2319
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
×
UNCOV
2320
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
×
UNCOV
2321
    if (p == NULL) {
×
UNCOV
2322
      STaskStatusEntry entry = {0};
×
UNCOV
2323
      streamTaskStatusInit(&entry, pTask);
×
2324

UNCOV
2325
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
×
UNCOV
2326
      if (code == 0) {
×
UNCOV
2327
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
×
UNCOV
2328
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
×
UNCOV
2329
        if (px) {
×
UNCOV
2330
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2331
        } else {
2332
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2333
        }
2334
      } else {
2335
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2336
      }
2337

2338
      // add the new vgroups if not added yet
UNCOV
2339
      bool exist = false;
×
UNCOV
2340
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
×
UNCOV
2341
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
×
UNCOV
2342
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
×
UNCOV
2343
          exist = true;
×
UNCOV
2344
          break;
×
2345
        }
2346
      }
2347

UNCOV
2348
      if (!exist) {
×
UNCOV
2349
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
×
UNCOV
2350
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
×
2351

UNCOV
2352
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
×
UNCOV
2353
        if (px) {
×
UNCOV
2354
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
×
2355
        } else {
2356
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2357
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2358
        }
2359
      }
2360
    }
2361
  }
2362

UNCOV
2363
  destroyStreamTaskIter(pIter);
×
2364
}
2365

UNCOV
2366
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
×
UNCOV
2367
  int32_t num = taosArrayGetSize(pList);
×
UNCOV
2368
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
2369
    int32_t *pId = taosArrayGet(pList, i);
×
UNCOV
2370
    if (pId == NULL) {
×
2371
      continue;
×
2372
    }
2373

UNCOV
2374
    if (taskId == *pId) {
×
UNCOV
2375
      return;
×
2376
    }
2377
  }
2378

UNCOV
2379
  int32_t numOfTasks = taosArrayGetSize(pList);
×
UNCOV
2380
  void   *p = taosArrayPush(pList, &taskId);
×
UNCOV
2381
  if (p) {
×
UNCOV
2382
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
×
2383
  } else {
2384
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2385
           uid, numOfTasks);
2386
  }
2387
}
2388

UNCOV
2389
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
×
UNCOV
2390
  SMnode                  *pMnode = pReq->info.node;
×
UNCOV
2391
  SStreamTaskCheckpointReq req = {0};
×
2392

UNCOV
2393
  SDecoder decoder = {0};
×
UNCOV
2394
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
2395

UNCOV
2396
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
×
2397
    tDecoderClear(&decoder);
×
2398
    mError("invalid task checkpoint req msg received");
×
2399
    return TSDB_CODE_INVALID_MSG;
×
2400
  }
UNCOV
2401
  tDecoderClear(&decoder);
×
2402

UNCOV
2403
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
×
2404

2405
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
UNCOV
2406
  streamMutexLock(&execInfo.lock);
×
2407

UNCOV
2408
  SStreamObj *pStream = NULL;
×
UNCOV
2409
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
×
UNCOV
2410
  if (pStream == NULL || code != 0) {
×
2411
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2412
          req.streamId);
2413

2414
    // not in meta-store yet, try to acquire the task in exec buffer
2415
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2416
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2417
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2418
    if (p == NULL) {
×
2419
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2420
      streamMutexUnlock(&execInfo.lock);
×
2421
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2422
    } else {
2423
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2424
             req.streamId, req.taskId);
2425
    }
2426
  }
2427

UNCOV
2428
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
×
2429

UNCOV
2430
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
×
UNCOV
2431
  if (pReqTaskList == NULL) {
×
UNCOV
2432
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
×
UNCOV
2433
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
×
UNCOV
2434
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
×
UNCOV
2435
    if (code) {
×
2436
      mError("failed to put into transfer state stream map, code: out of memory");
×
2437
    }
UNCOV
2438
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
×
2439
  } else {
UNCOV
2440
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
×
2441
  }
2442

UNCOV
2443
  int32_t total = taosArrayGetSize(*pReqTaskList);
×
UNCOV
2444
  if (total == numOfTasks) {  // all tasks have sent the reqs
×
UNCOV
2445
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
×
UNCOV
2446
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
×
2447

UNCOV
2448
    if (pStream != NULL) {  // TODO:handle error
×
UNCOV
2449
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
×
UNCOV
2450
      if (code) {
×
UNCOV
2451
        mError("failed to create checkpoint trans, code:%s", tstrerror(code));
×
2452
      }
2453
    } else {
2454
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2455
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2456
      // sleep(500ms)
2457
    }
2458

2459
    // remove this entry
UNCOV
2460
    (void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
×
2461

UNCOV
2462
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
×
UNCOV
2463
    mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
×
2464
  }
2465

UNCOV
2466
  if (pStream != NULL) {
×
UNCOV
2467
    mndReleaseStream(pMnode, pStream);
×
2468
  }
2469

UNCOV
2470
  streamMutexUnlock(&execInfo.lock);
×
2471

2472
  {
UNCOV
2473
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
×
UNCOV
2474
    rsp.pCont = rpcMallocCont(rsp.contLen);
×
UNCOV
2475
    if (rsp.pCont == NULL) {
×
2476
      return terrno;
×
2477
    }
2478

UNCOV
2479
    SMsgHead *pHead = rsp.pCont;
×
UNCOV
2480
    pHead->vgId = htonl(req.nodeId);
×
2481

UNCOV
2482
    tmsgSendRsp(&rsp);
×
UNCOV
2483
    pReq->info.handle = NULL;  // disable auto rsp
×
2484
  }
2485

UNCOV
2486
  return 0;
×
2487
}
2488

2489
// valid the info according to the HbMsg
UNCOV
2490
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
×
UNCOV
2491
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
×
UNCOV
2492
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
2493
  if (pTaskEntry == NULL) {
×
UNCOV
2494
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
×
UNCOV
2495
    return false;
×
2496
  }
2497

UNCOV
2498
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
×
2499
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2500
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2501
    return false;
×
2502
  }
2503

2504
  // now the task in checkpoint procedure
UNCOV
2505
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
×
2506
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2507
           " discard",
2508
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2509
    return false;
×
2510
  }
2511

UNCOV
2512
  if (reportChkptId >= pReport->checkpointId) {
×
2513
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2514
           " discard",
2515
           pReport->taskId, pReport->checkpointId, reportChkptId);
2516
    return false;
×
2517
  }
2518

UNCOV
2519
  return true;
×
2520
}
2521

UNCOV
2522
static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) {
×
UNCOV
2523
  bool valid = validateChkptReport(pReport, reportedChkptId);
×
UNCOV
2524
  if (!valid) {
×
UNCOV
2525
    return;
×
2526
  }
2527

UNCOV
2528
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
UNCOV
2529
    STaskChkptInfo *p = taosArrayGet(pList, i);
×
UNCOV
2530
    if (p == NULL) {
×
2531
      continue;
×
2532
    }
2533

UNCOV
2534
    if (p->taskId == pReport->taskId) {
×
2535
      if (p->checkpointId > pReport->checkpointId) {
×
2536
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2537
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2538
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2539
        mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2540
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2541

2542
        // update the checkpoint report info
2543
        p->checkpointId = pReport->checkpointId;
×
2544
        p->ts = pReport->checkpointTs;
×
2545
        p->version = pReport->checkpointVer;
×
2546
        p->transId = pReport->transId;
×
2547
        p->dropHTask = pReport->dropHTask;
×
2548
      } else {
2549
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2550
      }
2551
      return;
×
2552
    }
2553
  }
2554

UNCOV
2555
  STaskChkptInfo info = {
×
UNCOV
2556
      .streamId = pReport->streamId,
×
UNCOV
2557
      .taskId = pReport->taskId,
×
UNCOV
2558
      .transId = pReport->transId,
×
UNCOV
2559
      .dropHTask = pReport->dropHTask,
×
UNCOV
2560
      .version = pReport->checkpointVer,
×
UNCOV
2561
      .ts = pReport->checkpointTs,
×
UNCOV
2562
      .checkpointId = pReport->checkpointId,
×
UNCOV
2563
      .nodeId = pReport->nodeId,
×
2564
  };
2565

UNCOV
2566
  void *p = taosArrayPush(pList, &info);
×
UNCOV
2567
  if (p == NULL) {
×
2568
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2569
  } else {
UNCOV
2570
    int32_t size = taosArrayGetSize(pList);
×
UNCOV
2571
    mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report",
×
2572
           pReport->streamId, pReport->taskId, size);
2573
  }
2574
}
2575

UNCOV
2576
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
×
UNCOV
2577
  SMnode           *pMnode = pReq->info.node;
×
UNCOV
2578
  SCheckpointReport req = {0};
×
2579

UNCOV
2580
  SDecoder decoder = {0};
×
UNCOV
2581
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
2582

UNCOV
2583
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
×
2584
    tDecoderClear(&decoder);
×
2585
    mError("invalid task checkpoint-report msg received");
×
2586
    return TSDB_CODE_INVALID_MSG;
×
2587
  }
UNCOV
2588
  tDecoderClear(&decoder);
×
2589

UNCOV
2590
  streamMutexLock(&execInfo.lock);
×
UNCOV
2591
  mndInitStreamExecInfo(pMnode, &execInfo);
×
UNCOV
2592
  streamMutexUnlock(&execInfo.lock);
×
2593

UNCOV
2594
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
×
2595
         " checkpointVer:%" PRId64 " transId:%d",
2596
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2597

2598
  // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans.
UNCOV
2599
  streamMutexLock(&execInfo.lock);
×
2600

UNCOV
2601
  SStreamObj *pStream = NULL;
×
UNCOV
2602
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
×
UNCOV
2603
  if (pStream == NULL || code != 0) {
×
2604
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2605

2606
    // not in meta-store yet, try to acquire the task in exec buffer
2607
    // the checkpoint req arrives too soon before the completion of the creation of stream trans.
2608
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2609
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2610
    if (p == NULL) {
×
2611
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2612
      streamMutexUnlock(&execInfo.lock);
×
2613
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2614
    } else {
2615
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2616
             req.streamId, req.taskId);
2617
    }
2618
  }
2619

UNCOV
2620
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
×
2621

2622
  SChkptReportInfo *pInfo =
UNCOV
2623
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
×
UNCOV
2624
  if (pInfo == NULL) {
×
UNCOV
2625
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
×
UNCOV
2626
    if (info.pTaskList != NULL) {
×
UNCOV
2627
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
×
UNCOV
2628
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
×
UNCOV
2629
      if (code) {
×
2630
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2631
      }
2632

UNCOV
2633
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
×
2634
    }
2635
  } else {
UNCOV
2636
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
×
2637
  }
2638

UNCOV
2639
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
×
UNCOV
2640
  if (total == numOfTasks) {  // all tasks have sent the reqs
×
UNCOV
2641
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
×
2642
          " will be issued soon",
2643
          req.streamId, pStream->name, total, req.checkpointId);
2644
  }
2645

UNCOV
2646
  if (pStream != NULL) {
×
UNCOV
2647
    mndReleaseStream(pMnode, pStream);
×
2648
  }
2649

UNCOV
2650
  streamMutexUnlock(&execInfo.lock);
×
2651

UNCOV
2652
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
×
UNCOV
2653
  return code;
×
2654
}
2655

UNCOV
2656
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
×
UNCOV
2657
  int32_t num = 0;
×
UNCOV
2658
  int64_t chkId = INT64_MAX;
×
UNCOV
2659
  *pExistedTasks = 0;
×
UNCOV
2660
  *pAllSame = true;
×
2661

UNCOV
2662
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
UNCOV
2663
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
2664
    if (p == NULL) {
×
2665
      continue;
×
2666
    }
2667

UNCOV
2668
    if (p->streamId != streamId) {
×
UNCOV
2669
      continue;
×
2670
    }
2671

UNCOV
2672
    num += 1;
×
UNCOV
2673
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
2674
    if (chkId > pe->checkpointInfo.latestId) {
×
UNCOV
2675
      if (chkId != INT64_MAX) {
×
UNCOV
2676
        *pAllSame = false;
×
2677
      }
UNCOV
2678
      chkId = pe->checkpointInfo.latestId;
×
2679
    }
2680
  }
2681

UNCOV
2682
  *pExistedTasks = num;
×
UNCOV
2683
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
×
2684
    return -1;
×
2685
  }
2686

UNCOV
2687
  return chkId;
×
2688
}
2689

UNCOV
2690
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
×
UNCOV
2691
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
×
UNCOV
2692
  rsp.pCont = rpcMallocCont(rsp.contLen);
×
UNCOV
2693
  if (rsp.pCont != NULL) {
×
UNCOV
2694
    SMsgHead *pHead = rsp.pCont;
×
UNCOV
2695
    pHead->vgId = htonl(vgId);
×
2696

UNCOV
2697
    tmsgSendRsp(&rsp);
×
UNCOV
2698
    pInfo->handle = NULL;  // disable auto rsp
×
2699
  }
UNCOV
2700
}
×
2701

UNCOV
2702
static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) {
×
UNCOV
2703
  int32_t alreadySend = taosArrayGetSize(pList);
×
2704

UNCOV
2705
  for (int32_t i = 0; i < alreadySend; ++i) {
×
UNCOV
2706
    int32_t *taskId = taosArrayGet(pList, i);
×
UNCOV
2707
    if (taskId == NULL) {
×
2708
      continue;
×
2709
    }
2710

UNCOV
2711
    for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
×
UNCOV
2712
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
×
UNCOV
2713
      if ((pe != NULL) && (pe->req.taskId == *taskId)) {
×
UNCOV
2714
        taosArrayRemove(pInfo->pTaskList, k);
×
UNCOV
2715
        break;
×
2716
      }
2717
    }
2718
  }
2719

UNCOV
2720
  return alreadySend;
×
2721
}
2722

UNCOV
2723
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
×
UNCOV
2724
  SMnode *pMnode = pMsg->info.node;
×
UNCOV
2725
  int64_t now = taosGetTimestampMs();
×
UNCOV
2726
  bool    allReady = true;
×
UNCOV
2727
  SArray *pNodeSnapshot = NULL;
×
UNCOV
2728
  int32_t maxAllowedTrans = 50;
×
UNCOV
2729
  int32_t numOfTrans = 0;
×
UNCOV
2730
  int32_t code = 0;
×
UNCOV
2731
  void   *pIter = NULL;
×
2732

UNCOV
2733
  SArray *pList = taosArrayInit(4, sizeof(int32_t));
×
UNCOV
2734
  if (pList == NULL) {
×
2735
    return terrno;
×
2736
  }
2737

UNCOV
2738
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
×
UNCOV
2739
  if (pStreamList == NULL) {
×
2740
    taosArrayDestroy(pList);
×
2741
    return terrno;
×
2742
  }
2743

UNCOV
2744
  mDebug("start to process consensus-checkpointId in tmr");
×
2745

UNCOV
2746
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
×
UNCOV
2747
  taosArrayDestroy(pNodeSnapshot);
×
UNCOV
2748
  if (code) {
×
UNCOV
2749
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
2750
  }
2751

UNCOV
2752
  if (!allReady) {
×
UNCOV
2753
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
×
UNCOV
2754
    taosArrayDestroy(pStreamList);
×
UNCOV
2755
    taosArrayDestroy(pList);
×
UNCOV
2756
    return 0;
×
2757
  }
2758

UNCOV
2759
  streamMutexLock(&execInfo.lock);
×
2760

UNCOV
2761
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
×
UNCOV
2762
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
×
2763

UNCOV
2764
    taosArrayClear(pList);
×
2765

UNCOV
2766
    int64_t     streamId = -1;
×
UNCOV
2767
    int32_t     num = taosArrayGetSize(pInfo->pTaskList);
×
UNCOV
2768
    SStreamObj *pStream = NULL;
×
2769

UNCOV
2770
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
×
UNCOV
2771
    if (pStream == NULL || code != 0) {  // stream has been dropped already
×
2772
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2773
      void *p = taosArrayPush(pStreamList, &pInfo->streamId);
×
2774
      if (p == NULL) {
×
2775
        mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
×
2776
               " code:%s, continue",
2777
               pInfo->streamId, tstrerror(terrno));
2778
      }
2779
      continue;
×
2780
    }
2781

UNCOV
2782
    for (int32_t j = 0; j < num; ++j) {
×
UNCOV
2783
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
×
UNCOV
2784
      if (pe == NULL) {
×
2785
        continue;
×
2786
      }
2787

UNCOV
2788
      if (streamId == -1) {
×
UNCOV
2789
        streamId = pe->req.streamId;
×
2790
      }
2791

UNCOV
2792
      int32_t existed = 0;
×
UNCOV
2793
      bool    allSame = true;
×
UNCOV
2794
      int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
×
UNCOV
2795
      if (chkId == -1) {
×
2796
        mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
×
2797
               pInfo->numOfTasks, pe->req.taskId);
2798
        break;
×
2799
      }
2800

UNCOV
2801
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
×
UNCOV
2802
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
×
2803
               pe->req.startTs, (now - pe->ts) / 1000.0);
UNCOV
2804
        if (chkId > pe->req.checkpointId) {
×
2805
          streamMutexUnlock(&execInfo.lock);
×
2806
          taosArrayDestroy(pStreamList);
×
2807
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
2808
                 pe->req.checkpointId, chkId);
2809

2810
          mndReleaseStream(pMnode, pStream);
×
2811
          taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
2812
          return TSDB_CODE_FAILED;
×
2813
        }
2814

UNCOV
2815
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
×
UNCOV
2816
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2817
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
2818
        }
2819

UNCOV
2820
        void *p = taosArrayPush(pList, &pe->req.taskId);
×
UNCOV
2821
        if (p == NULL) {
×
2822
          mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
×
2823
        }
2824
      } else {
UNCOV
2825
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
×
2826
               pe->req.startTs, (now - pe->ts) / 1000.0);
2827
      }
2828
    }
2829

UNCOV
2830
    mndReleaseStream(pMnode, pStream);
×
2831

UNCOV
2832
    int32_t alreadySend = doCleanReqList(pList, pInfo);
×
2833

2834
    // clear request stream item with empty task list
UNCOV
2835
    if (taosArrayGetSize(pInfo->pTaskList) == 0) {
×
UNCOV
2836
      mndClearConsensusRspEntry(pInfo);
×
UNCOV
2837
      if (streamId == -1) {
×
2838
        mError("streamId is -1, streamId:%" PRIx64" in consensus-checkpointId hashMap, cont", pInfo->streamId);
×
2839
      }
2840

UNCOV
2841
      void *p = taosArrayPush(pStreamList, &streamId);
×
UNCOV
2842
      if (p == NULL) {
×
2843
        mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId);
×
2844
      }
2845
    }
2846

UNCOV
2847
    numOfTrans += alreadySend;
×
UNCOV
2848
    if (numOfTrans > maxAllowedTrans) {
×
UNCOV
2849
      mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend);
×
UNCOV
2850
      taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
UNCOV
2851
      break;
×
2852
    }
2853
  }
2854

UNCOV
2855
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
×
UNCOV
2856
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
×
UNCOV
2857
    if (pStreamId == NULL) {
×
2858
      continue;
×
2859
    }
2860

UNCOV
2861
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
×
2862
  }
2863

UNCOV
2864
  streamMutexUnlock(&execInfo.lock);
×
2865

UNCOV
2866
  taosArrayDestroy(pStreamList);
×
UNCOV
2867
  taosArrayDestroy(pList);
×
2868

UNCOV
2869
  mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
×
UNCOV
2870
  return code;
×
2871
}
2872

UNCOV
2873
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
×
UNCOV
2874
  int32_t code = mndProcessCreateStreamReq(pReq);
×
UNCOV
2875
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2876
    pReq->info.rsp = rpcMallocCont(1);
×
2877
    if (pReq->info.rsp == NULL) {
×
2878
      return terrno;
×
2879
    }
2880

2881
    pReq->info.rspLen = 1;
×
2882
    pReq->info.noResp = false;
×
2883
    pReq->code = code;
×
2884
  }
UNCOV
2885
  return code;
×
2886
}
2887

UNCOV
2888
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
×
UNCOV
2889
  int32_t code = mndProcessDropStreamReq(pReq);
×
UNCOV
2890
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2891
    pReq->info.rsp = rpcMallocCont(1);
×
UNCOV
2892
    if (pReq->info.rsp == NULL) {
×
2893
      return terrno;
×
2894
    }
2895

UNCOV
2896
    pReq->info.rspLen = 1;
×
UNCOV
2897
    pReq->info.noResp = false;
×
UNCOV
2898
    pReq->code = code;
×
2899
  }
UNCOV
2900
  return code;
×
2901
}
2902

UNCOV
2903
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
×
UNCOV
2904
  if (pExecInfo->initTaskList || pMnode == NULL) {
×
UNCOV
2905
    return;
×
2906
  }
2907

UNCOV
2908
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
×
UNCOV
2909
  pExecInfo->initTaskList = true;
×
2910
}
2911

UNCOV
2912
void mndStreamResetInitTaskListLoadFlag() {
×
UNCOV
2913
  mInfo("reset task list buffer init flag for leader");
×
UNCOV
2914
  execInfo.initTaskList = false;
×
UNCOV
2915
}
×
2916

UNCOV
2917
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
×
UNCOV
2918
  execInfo.switchFromFollower = false;
×
2919

UNCOV
2920
  if (execInfo.role == NODE_ROLE_UNINIT) {
×
UNCOV
2921
    execInfo.role = role;
×
UNCOV
2922
    if (role == NODE_ROLE_LEADER) {
×
UNCOV
2923
      mInfo("init mnode is set to leader");
×
2924
    } else {
UNCOV
2925
      mInfo("init mnode is set to follower");
×
2926
    }
2927
  } else {
UNCOV
2928
    if (role == NODE_ROLE_LEADER) {
×
UNCOV
2929
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
×
UNCOV
2930
        execInfo.role = role;
×
UNCOV
2931
        execInfo.switchFromFollower = true;
×
UNCOV
2932
        mInfo("mnode switch to be leader from follower");
×
2933
      } else {
2934
        mInfo("mnode remain to be leader, do nothing");
×
2935
      }
2936
    } else {  // follower's
UNCOV
2937
      if (execInfo.role == NODE_ROLE_LEADER) {
×
UNCOV
2938
        execInfo.role = role;
×
UNCOV
2939
        mInfo("mnode switch to be follower from leader");
×
2940
      } else {
UNCOV
2941
        mInfo("mnode remain to be follower, do nothing");
×
2942
      }
2943
    }
2944
  }
UNCOV
2945
}
×
2946

UNCOV
2947
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
×
UNCOV
2948
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
2949
  SStreamObj *pStream = NULL;
×
UNCOV
2950
  void       *pIter = NULL;
×
2951

2952
  while (1) {
UNCOV
2953
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
2954
    if (pIter == NULL) {
×
UNCOV
2955
      break;
×
2956
    }
2957

UNCOV
2958
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
×
UNCOV
2959
    sdbRelease(pSdb, pStream);
×
2960
  }
UNCOV
2961
}
×
2962

UNCOV
2963
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
×
UNCOV
2964
  STrans *pTrans = NULL;
×
UNCOV
2965
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
×
2966
                               "update checkpoint-info", &pTrans);
UNCOV
2967
  if (pTrans == NULL || code) {
×
2968
    sdbRelease(pMnode->pSdb, pStream);
×
2969
    return code;
×
2970
  }
2971

UNCOV
2972
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
×
UNCOV
2973
  if (code) {
×
2974
    sdbRelease(pMnode->pSdb, pStream);
×
2975
    mndTransDrop(pTrans);
×
2976
    return code;
×
2977
  }
2978

UNCOV
2979
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
×
UNCOV
2980
  if (code) {
×
2981
    sdbRelease(pMnode->pSdb, pStream);
×
2982
    mndTransDrop(pTrans);
×
2983
    return code;
×
2984
  }
2985

UNCOV
2986
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
UNCOV
2987
  if (code) {
×
2988
    sdbRelease(pMnode->pSdb, pStream);
×
2989
    mndTransDrop(pTrans);
×
2990
    return code;
×
2991
  }
2992

UNCOV
2993
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
2994
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2995
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
2996
    sdbRelease(pMnode->pSdb, pStream);
×
2997
    mndTransDrop(pTrans);
×
2998
    return code;
×
2999
  }
3000

UNCOV
3001
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3002
  mndTransDrop(pTrans);
×
3003

UNCOV
3004
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
3005
}
3006

UNCOV
3007
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
×
UNCOV
3008
  SMnode      *pMnode = pReq->info.node;
×
UNCOV
3009
  int32_t      code = 0;
×
UNCOV
3010
  SOrphanTask *pTask = NULL;
×
UNCOV
3011
  int32_t      i = 0;
×
UNCOV
3012
  STrans      *pTrans = NULL;
×
UNCOV
3013
  int32_t      numOfTasks = 0;
×
3014

UNCOV
3015
  SMStreamDropOrphanMsg msg = {0};
×
UNCOV
3016
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
×
UNCOV
3017
  if (code) {
×
3018
    return code;
×
3019
  }
3020

UNCOV
3021
  numOfTasks = taosArrayGetSize(msg.pList);
×
UNCOV
3022
  if (numOfTasks == 0) {
×
3023
    mDebug("no orphan tasks to drop, no need to create trans");
×
3024
    goto _err;
×
3025
  }
3026

UNCOV
3027
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
×
3028

UNCOV
3029
  i = 0;
×
UNCOV
3030
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
×
3031
    i += 1;
×
3032
  }
3033

UNCOV
3034
  if (pTask == NULL) {
×
3035
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
3036
    goto _err;
×
3037
  }
3038

3039
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
3040
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
×
UNCOV
3041
  if (code) {
×
3042
    goto _err;
×
3043
  }
3044

UNCOV
3045
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
×
3046

UNCOV
3047
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
×
UNCOV
3048
  if (pTrans == NULL || code != 0) {
×
3049
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3050
    goto _err;
×
3051
  }
3052

UNCOV
3053
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
×
UNCOV
3054
  if (code) {
×
3055
    goto _err;
×
3056
  }
3057

3058
  // drop all tasks
UNCOV
3059
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
×
3060
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3061
    goto _err;
×
3062
  }
3063

3064
  // drop stream
UNCOV
3065
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
3066
    goto _err;
×
3067
  }
3068

UNCOV
3069
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
3070
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3071
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
3072
    goto _err;
×
3073
  }
3074

UNCOV
3075
_err:
×
UNCOV
3076
  tDestroyDropOrphanTaskMsg(&msg);
×
UNCOV
3077
  mndTransDrop(pTrans);
×
3078

UNCOV
3079
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3080
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
×
3081
  }
UNCOV
3082
  return code;
×
3083
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc