• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3676

22 Mar 2025 04:46PM UTC coverage: 25.147% (-36.8%) from 61.952%
#3676

push

travis-ci

web-flow
fix: userOperTest in linux (#30363)

Co-authored-by: taos-support <it@taosdata.com>

55963 of 304767 branches covered (18.36%)

Branch coverage included in aggregate %.

96374 of 301020 relevant lines covered (32.02%)

582640.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.52
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndScheduler.h"
21
#include "mndShow.h"
22
#include "mndStb.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq);
44
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq);
45
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
46

47
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
48
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
49

50
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
51
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
53
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
55
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
56
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
57
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
58
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
59
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
60
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
61
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
62
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
63
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
64
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
65
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
66
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
67
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
68
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
69

70
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
71
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
72

73
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
74
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
75
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
77
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
78

79
int32_t mndInitStream(SMnode *pMnode) {
8✔
80
  SSdbTable table = {
8✔
81
      .sdbType = SDB_STREAM,
82
      .keyType = SDB_KEY_BINARY,
83
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
84
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
85
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
86
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
87
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
88
  };
89
  SSdbTable tableSeq = {
8✔
90
      .sdbType = SDB_STREAM_SEQ,
91
      .keyType = SDB_KEY_BINARY,
92
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
93
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
94
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
95
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
96
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
97
  };
98

99
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
8✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_FAILED_STREAM, mndProcessFailedStreamReq);
8✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_CHECK_STREAM_TIMER, mndProcessCheckStreamStatusReq);
8✔
102
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
8✔
103
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
8✔
104

105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
8✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
8✔
107
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
8✔
108
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
8✔
109
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
8✔
110
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
8✔
111
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
8✔
112
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
8✔
113
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
8✔
114

115
  // for msgs inside mnode
116
  // TODO change the name
117
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
8✔
118
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
8✔
119
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
8✔
120
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
8✔
121

122
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
8✔
123
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_ALL_STOP_RSP, mndTransProcessRsp);
8✔
124
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
8✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
8✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
8✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
8✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
8✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
8✔
130
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
8✔
131
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
8✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
8✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
8✔
134

135
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
8✔
136
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
8✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
8✔
138

139
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
8✔
140
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
8✔
141
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
8✔
142
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
8✔
143

144
  int32_t code = mndInitExecInfo();
8✔
145
  if (code) {
8!
146
    return code;
×
147
  }
148

149
  code = sdbSetTable(pMnode->pSdb, table);
8✔
150
  if (code) {
8!
151
    return code;
×
152
  }
153

154
  code = sdbSetTable(pMnode->pSdb, tableSeq);
8✔
155
  return code;
8✔
156
}
157

158
void mndCleanupStream(SMnode *pMnode) {
8✔
159
  taosArrayDestroy(execInfo.pTaskList);
8✔
160
  taosArrayDestroy(execInfo.pNodeList);
8✔
161
  taosArrayDestroy(execInfo.pKilledChkptTrans);
8✔
162
  taosHashCleanup(execInfo.pTaskMap);
8✔
163
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
8✔
164
  taosHashCleanup(execInfo.pTransferStateStreams);
8✔
165
  taosHashCleanup(execInfo.pChkptStreams);
8✔
166
  taosHashCleanup(execInfo.pStreamConsensus);
8✔
167
  (void)taosThreadMutexDestroy(&execInfo.lock);
8✔
168
  mDebug("mnd stream exec info cleanup");
8!
169
}
8✔
170

171
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
×
172
  int32_t     code = 0;
×
173
  int32_t     lino = 0;
×
174
  SSdbRow    *pRow = NULL;
×
175
  SStreamObj *pStream = NULL;
×
176
  void       *buf = NULL;
×
177
  int8_t      sver = 0;
×
178
  int32_t     tlen;
179
  int32_t     dataPos = 0;
×
180

181
  code = sdbGetRawSoftVer(pRaw, &sver);
×
182
  TSDB_CHECK_CODE(code, lino, _over);
×
183

184
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
×
185
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
186
    goto _over;
×
187
  }
188

189
  pRow = sdbAllocRow(sizeof(SStreamObj));
×
190
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
×
191

192
  pStream = sdbGetRowObj(pRow);
×
193
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
×
194

195
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
×
196

197
  buf = taosMemoryMalloc(tlen + 1);
×
198
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
×
199

200
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
×
201

202
  SDecoder decoder;
203
  tDecoderInit(&decoder, buf, tlen + 1);
×
204
  code = tDecodeSStreamObj(&decoder, pStream, sver);
×
205
  tDecoderClear(&decoder);
×
206

207
  if (code < 0) {
×
208
    tFreeStreamObj(pStream);
×
209
  }
210

211
_over:
×
212
  taosMemoryFreeClear(buf);
×
213

214
  if (code != TSDB_CODE_SUCCESS) {
×
215
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
216
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
217
    taosMemoryFreeClear(pRow);
×
218

219
    terrno = code;
×
220
    return NULL;
×
221
  } else {
222
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
×
223
           pStream->checkpointId);
224

225
    terrno = 0;
×
226
    return pRow;
×
227
  }
228
}
229

230
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
×
231
  mTrace("stream:%s, perform insert action", pStream->name);
×
232
  return 0;
×
233
}
234

235
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
×
236
  mInfo("stream:%s, perform delete action", pStream->name);
×
237
  taosWLockLatch(&pStream->lock);
×
238
  tFreeStreamObj(pStream);
×
239
  taosWUnLockLatch(&pStream->lock);
×
240
  return 0;
×
241
}
242

243
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
×
244
  mTrace("stream:%s, perform update action", pOldStream->name);
×
245
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
×
246

247
  taosWLockLatch(&pOldStream->lock);
×
248

249
  pOldStream->status = pNewStream->status;
×
250
  pOldStream->updateTime = pNewStream->updateTime;
×
251
  pOldStream->checkpointId = pNewStream->checkpointId;
×
252
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
×
253
  if (pOldStream->pTaskList == NULL) {
×
254
    pOldStream->pTaskList = pNewStream->pTaskList;
×
255
    pNewStream->pTaskList = NULL;
×
256
  }
257
  if (pOldStream->pHTaskList == NULL) {
×
258
    pOldStream->pHTaskList = pNewStream->pHTaskList;
×
259
    pNewStream->pHTaskList = NULL;
×
260
  }
261
  taosWUnLockLatch(&pOldStream->lock);
×
262
  return 0;
×
263
}
264

265
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
×
266
  int32_t code = 0;
×
267
  SSdb   *pSdb = pMnode->pSdb;
×
268
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
269
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
270
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
271
  }
272
  return code;
×
273
}
274

275
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
×
276
  SSdb *pSdb = pMnode->pSdb;
×
277
  sdbRelease(pSdb, pStream);
×
278
}
×
279

280
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
281
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
282
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
283
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
284
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
285

286
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
×
287
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
×
288
      pCreate->targetStbFullName[0] == 0) {
×
289
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
290
  }
291
  return TSDB_CODE_SUCCESS;
×
292
}
293

294
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
×
295
  pWrapper->nCols = taosArrayGetSize(pFields);
×
296
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
×
297
  if (NULL == pWrapper->pSchema) {
×
298
    return terrno;
×
299
  }
300

301
  int32_t index = 0;
×
302
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
×
303
    SField *pField = (SField *)taosArrayGet(pFields, i);
×
304
    if (pField == NULL) {
×
305
      return terrno;
×
306
    }
307

308
    if (TSDB_DATA_TYPE_NULL == pField->type) {
×
309
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
310
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
311
    } else {
312
      pWrapper->pSchema[index].type = pField->type;
×
313
      pWrapper->pSchema[index].bytes = pField->bytes;
×
314
    }
315
    pWrapper->pSchema[index].colId = index + 1;
×
316
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
×
317
    pWrapper->pSchema[index].flags = pField->flags;
×
318
    index += 1;
×
319
  }
320

321
  return TSDB_CODE_SUCCESS;
×
322
}
323

324
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
×
325
  if (pWrapper->nCols < 2) {
×
326
    return false;
×
327
  }
328
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
×
329
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
×
330
      return true;
×
331
    }
332
  }
333
  return false;
×
334
}
335

336
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
×
337
  SNode      *pAst = NULL;
×
338
  SQueryPlan *pPlan = NULL;
×
339
  int32_t     code = 0;
×
340

341
  mInfo("stream:%s to create", pCreate->name);
×
342
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
×
343
  pObj->createTime = taosGetTimestampMs();
×
344
  pObj->updateTime = pObj->createTime;
×
345
  pObj->version = 1;
×
346

347
  if (pCreate->smaId > 0) {
×
348
    pObj->subTableWithoutMd5 = 1;
×
349
  }
350

351
  pObj->smaId = pCreate->smaId;
×
352
  pObj->indexForMultiAggBalance = -1;
×
353

354
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
×
355

356
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
×
357
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
×
358

359
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
×
360
  pObj->status = STREAM_STATUS__NORMAL;
×
361

362
  pObj->conf.igExpired = pCreate->igExpired;
×
363
  pObj->conf.trigger = pCreate->triggerType;
×
364
  pObj->conf.triggerParam = pCreate->maxDelay;
×
365
  pObj->conf.watermark = pCreate->watermark;
×
366
  pObj->conf.fillHistory = pCreate->fillHistory;
×
367
  pObj->deleteMark = pCreate->deleteMark;
×
368
  pObj->igCheckUpdate = pCreate->igUpdate;
×
369

370
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
×
371
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
×
372
  if (pSourceDb == NULL) {
×
373
    code = terrno;
×
374
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
375
          tstrerror(code));
376
    goto _ERR;
×
377
  }
378

379
  pObj->sourceDbUid = pSourceDb->uid;
×
380
  mndReleaseDb(pMnode, pSourceDb);
×
381

382
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
383

384
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
×
385
  if (pTargetDb == NULL) {
×
386
    code = terrno;
×
387
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
388
           tstrerror(code));
389
    goto _ERR;
×
390
  }
391

392
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
×
393

394
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
×
395
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
396
  } else {
397
    pObj->targetStbUid = pCreate->targetStbUid;
×
398
  }
399
  pObj->targetDbUid = pTargetDb->uid;
×
400
  mndReleaseDb(pMnode, pTargetDb);
×
401

402
  pObj->sql = pCreate->sql;
×
403
  pObj->ast = pCreate->ast;
×
404

405
  pCreate->sql = NULL;
×
406
  pCreate->ast = NULL;
×
407

408
  // deserialize ast
409
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
×
410
    goto _ERR;
×
411
  }
412

413
  // create output schema
414
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
×
415
    goto _ERR;
×
416
  }
417

418
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
×
419
  if (numOfNULL > 0) {
×
420
    pObj->outputSchema.nCols += numOfNULL;
×
421
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
×
422
    if (!pFullSchema) {
×
423
      code = terrno;
×
424
      goto _ERR;
×
425
    }
426

427
    int32_t nullIndex = 0;
×
428
    int32_t dataIndex = 0;
×
429
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
×
430
      if (nullIndex >= numOfNULL) {
×
431
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
432
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
433
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
434
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
435
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
436
        dataIndex++;
×
437
      } else {
438
        SColLocation *pos = NULL;
×
439
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
×
440
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
×
441
        }
442

443
        if (pos == NULL) {
×
444
          mError("invalid null column index, %d", nullIndex);
×
445
          continue;
×
446
        }
447

448
        if (i < pos->slotId) {
×
449
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
450
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
451
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
452
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
453
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
454
          dataIndex++;
×
455
        } else {
456
          pFullSchema[i].bytes = 0;
×
457
          pFullSchema[i].colId = pos->colId;
×
458
          pFullSchema[i].flags = COL_SET_NULL;
×
459
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
×
460
          pFullSchema[i].type = pos->type;
×
461
          nullIndex++;
×
462
        }
463
      }
464
    }
465

466
    taosMemoryFree(pObj->outputSchema.pSchema);
×
467
    pObj->outputSchema.pSchema = pFullSchema;
×
468
  }
469

470
  SPlanContext cxt = {
×
471
      .pAstRoot = pAst,
472
      .topicQuery = false,
473
      .streamQuery = true,
474
      .triggerType =
475
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
×
476
      .watermark = pObj->conf.watermark,
×
477
      .igExpired = pObj->conf.igExpired,
×
478
      .deleteMark = pObj->deleteMark,
×
479
      .igCheckUpdate = pObj->igCheckUpdate,
×
480
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
×
481
      .recalculateInterval = pCreate->recalculateInterval,
×
482
  };
483
  char *pTargetFStable = strchr(pCreate->targetStbFullName, '.');
×
484
  if (pTargetFStable != NULL) {
×
485
    pTargetFStable = pTargetFStable + 1;
×
486
  }
487
  tstrncpy(cxt.pStbFullName, pTargetFStable, TSDB_TABLE_FNAME_LEN);
×
488
  tstrncpy(cxt.pWstartName, pCreate->pWstartName, TSDB_COL_NAME_LEN);
×
489
  tstrncpy(cxt.pWendName, pCreate->pWendName, TSDB_COL_NAME_LEN);
×
490
  tstrncpy(cxt.pGroupIdName, pCreate->pGroupIdName, TSDB_COL_NAME_LEN);
×
491
  tstrncpy(cxt.pIsWindowFilledName, pCreate->pIsWindowFilledName, TSDB_COL_NAME_LEN);
×
492

493
  // using ast and param to build physical plan
494
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
×
495
    goto _ERR;
×
496
  }
497

498
  // save physcial plan
499
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
×
500
    goto _ERR;
×
501
  }
502

503
  pObj->tagSchema.nCols = pCreate->numOfTags;
×
504
  if (pCreate->numOfTags) {
×
505
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
×
506
    if (pObj->tagSchema.pSchema == NULL) {
×
507
      code = terrno;
×
508
      goto _ERR;
×
509
    }
510
  }
511

512
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
513
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
×
514
    SField *pField = taosArrayGet(pCreate->pTags, i);
×
515
    if (pField == NULL) {
×
516
      continue;
×
517
    }
518

519
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
×
520
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
×
521
    pObj->tagSchema.pSchema[i].flags = pField->flags;
×
522
    pObj->tagSchema.pSchema[i].type = pField->type;
×
523
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
×
524
  }
525

526
_ERR:
×
527
  if (pAst != NULL) nodesDestroyNode(pAst);
×
528
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
×
529
  return code;
×
530
}
531

532
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
×
533
  SEncoder encoder;
534
  tEncoderInit(&encoder, NULL, 0);
×
535

536
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
537
    pTask->ver = SSTREAM_TASK_VER;
×
538
  }
539

540
  int32_t code = tEncodeStreamTask(&encoder, pTask);
×
541
  if (code == -1) {
×
542
    tEncoderClear(&encoder);
×
543
    return TSDB_CODE_INVALID_MSG;
×
544
  }
545

546
  int32_t size = encoder.pos;
×
547
  int32_t tlen = sizeof(SMsgHead) + size;
×
548
  tEncoderClear(&encoder);
×
549

550
  void *buf = taosMemoryCalloc(1, tlen);
×
551
  if (buf == NULL) {
×
552
    return terrno;
×
553
  }
554

555
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
×
556

557
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
558
  tEncoderInit(&encoder, abuf, size);
×
559
  code = tEncodeStreamTask(&encoder, pTask);
×
560
  tEncoderClear(&encoder);
×
561

562
  if (code != 0) {
×
563
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
564
    taosMemoryFree(buf);
×
565
    return code;
×
566
  }
567

568
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
×
569
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
570
  if (code) {
×
571
    taosMemoryFree(buf);
×
572
  }
573

574
  return code;
×
575
}
576

577
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
×
578
  SStreamTaskIter *pIter = NULL;
×
579
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
580
  if (code) {
×
581
    mError("failed to create task iter for stream:%s", pStream->name);
×
582
    return code;
×
583
  }
584

585
  while (streamTaskIterNextTask(pIter)) {
×
586
    SStreamTask *pTask = NULL;
×
587
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
588
    if (code) {
×
589
      destroyStreamTaskIter(pIter);
×
590
      return code;
×
591
    }
592

593
    code = mndPersistTaskDeployReq(pTrans, pTask);
×
594
    if (code) {
×
595
      destroyStreamTaskIter(pIter);
×
596
      return code;
×
597
    }
598
  }
599

600
  destroyStreamTaskIter(pIter);
×
601

602
  // persistent stream task for already stored ts data
603
  if (pStream->conf.fillHistory || (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE)) {
×
604
    int32_t level = taosArrayGetSize(pStream->pHTaskList);
×
605

606
    for (int32_t i = 0; i < level; i++) {
×
607
      SArray *pLevel = taosArrayGetP(pStream->pHTaskList, i);
×
608

609
      int32_t numOfTasks = taosArrayGetSize(pLevel);
×
610
      for (int32_t j = 0; j < numOfTasks; j++) {
×
611
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
×
612
        code = mndPersistTaskDeployReq(pTrans, pTask);
×
613
        if (code) {
×
614
          return code;
×
615
        }
616
      }
617
    }
618
  }
619

620
  return code;
×
621
}
622

623
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
×
624
  int32_t code = 0;
×
625
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
×
626
    return code;
×
627
  }
628

629
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
630
}
631

632
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
×
633
  SStbObj *pStb = NULL;
×
634
  SDbObj  *pDb = NULL;
×
635
  int32_t  code = 0;
×
636
  int32_t  lino = 0;
×
637

638
  SMCreateStbReq createReq = {0};
×
639
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
640
  createReq.numOfColumns = pStream->outputSchema.nCols;
×
641
  createReq.numOfTags = 1;  // group id
×
642
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
×
643
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
×
644

645
  // build fields
646
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
×
647
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
×
648
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
649

650
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
×
651
    pField->flags = pStream->outputSchema.pSchema[i].flags;
×
652
    pField->type = pStream->outputSchema.pSchema[i].type;
×
653
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
×
654
    pField->compress = createDefaultColCmprByType(pField->type);
×
655
    if (IS_DECIMAL_TYPE(pField->type)) {
×
656
      uint8_t prec = 0, scale = 0;
×
657
      extractDecimalTypeInfoFromBytes(&pField->bytes, &prec, &scale);
×
658
      pField->typeMod = decimalCalcTypeMod(prec, scale);
×
659
    }
660
  }
661

662
  if (pStream->tagSchema.nCols == 0) {
×
663
    createReq.numOfTags = 1;
×
664
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
665
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
666

667
    // build tags
668
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
669
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
670

671
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
672
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
673
    pField->flags = 0;
×
674
    pField->bytes = 8;
×
675
  } else {
676
    createReq.numOfTags = pStream->tagSchema.nCols;
×
677
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
×
678
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
679

680
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
×
681
      SField *pField = taosArrayGet(createReq.pTags, i);
×
682
      if (pField == NULL) {
×
683
        continue;
×
684
      }
685

686
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
×
687
      pField->flags = pStream->tagSchema.pSchema[i].flags;
×
688
      pField->type = pStream->tagSchema.pSchema[i].type;
×
689
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
×
690
    }
691
  }
692

693
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
×
694
    goto _OVER;
×
695
  }
696

697
  pStb = mndAcquireStb(pMnode, createReq.name);
×
698
  if (pStb != NULL) {
×
699
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
700
    goto _OVER;
×
701
  }
702

703
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
×
704
  if (pDb == NULL) {
×
705
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
706
    goto _OVER;
×
707
  }
708

709
  int32_t numOfStbs = -1;
×
710
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
×
711
    goto _OVER;
×
712
  }
713

714
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
×
715
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
716
    goto _OVER;
×
717
  }
718

719
  SStbObj stbObj = {0};
×
720

721
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
×
722
    goto _OVER;
×
723
  }
724

725
  stbObj.uid = pStream->targetStbUid;
×
726

727
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
×
728
    mndFreeStb(&stbObj);
×
729
    goto _OVER;
×
730
  }
731

732
  tFreeSMCreateStbReq(&createReq);
×
733
  mndFreeStb(&stbObj);
×
734
  mndReleaseStb(pMnode, pStb);
×
735
  mndReleaseDb(pMnode, pDb);
×
736
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
×
737
  return code;
×
738

739
_OVER:
×
740
  tFreeSMCreateStbReq(&createReq);
×
741
  mndReleaseStb(pMnode, pStb);
×
742
  mndReleaseDb(pMnode, pDb);
×
743

744
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
745
         tstrerror(code));
746
  return code;
×
747
}
748

749
// 1. stream number check
750
// 2. target stable can not be target table of other existed streams.
751
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
×
752
  int32_t     numOfStream = 0;
×
753
  SStreamObj *pStream = NULL;
×
754
  void       *pIter = NULL;
×
755

756
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
757
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
×
758
      ++numOfStream;
×
759
    }
760

761

762
    if (numOfStream > MND_STREAM_MAX_NUM) {
×
763
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
764
             pStreamObj->name);
765
      sdbRelease(pMnode->pSdb, pStream);
×
766
      sdbCancelFetch(pMnode->pSdb, pIter);
×
767
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
768
    }
769

770
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
×
771
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
×
772
             pStreamObj->name);
773
      sdbRelease(pMnode->pSdb, pStream);
×
774
      sdbCancelFetch(pMnode->pSdb, pIter);
×
775
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
×
776
    }
777
    sdbRelease(pMnode->pSdb, pStream);
×
778
  }
779

780
  return TSDB_CODE_SUCCESS;
×
781
}
782

783
static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
×
784

785
static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
×
786
                                       SStreamTask *pTask) {
787
  int32_t code = TSDB_CODE_SUCCESS;
×
788
  int32_t lino = 0;
×
789

790
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
791
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
792

793
  pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
×
794
  TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
×
795
  pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
×
796
  pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
×
797
  pTask->notifyInfo.streamName = taosStrdup(mndGetDbStr(createReq->name));
×
798
  TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
×
799
  pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
×
800
  TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
×
801
  pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
×
802
  TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
×
803

804
_end:
×
805
  if (code != TSDB_CODE_SUCCESS) {
×
806
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
807
  }
808
  return code;
×
809
}
810

811
static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
×
812
  int32_t code = TSDB_CODE_SUCCESS;
×
813
  int32_t lino = 0;
×
814
  int32_t level = 0;
×
815
  int32_t nTasks = 0;
×
816
  SArray *pLevel = NULL;
×
817

818
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
819
  TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
820

821
  if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
×
822
    goto _end;
×
823
  }
824

825
  level = taosArrayGetSize(pStream->pTaskList);
×
826
  for (int32_t i = 0; i < level; ++i) {
×
827
    pLevel = taosArrayGetP(pStream->pTaskList, i);
×
828
    nTasks = taosArrayGetSize(pLevel);
×
829
    for (int32_t j = 0; j < nTasks; ++j) {
×
830
      code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
831
      TSDB_CHECK_CODE(code, lino, _end);
×
832
    }
833
  }
834

835
  if (pStream->conf.fillHistory && createReq->notifyHistory) {
×
836
    level = taosArrayGetSize(pStream->pHTaskList);
×
837
    for (int32_t i = 0; i < level; ++i) {
×
838
      pLevel = taosArrayGetP(pStream->pHTaskList, i);
×
839
      nTasks = taosArrayGetSize(pLevel);
×
840
      for (int32_t j = 0; j < nTasks; ++j) {
×
841
        code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
842
        TSDB_CHECK_CODE(code, lino, _end);
×
843
      }
844
    }
845
  }
846

847
_end:
×
848
  if (code != TSDB_CODE_SUCCESS) {
×
849
    mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
×
850
  }
851
  return code;
×
852
}
853

854
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq) {
×
855
  SMnode     *pMnode = pReq->info.node;
×
856
  SStreamObj *pStream = NULL;
×
857
  void       *pIter = NULL;
×
858

859
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
860
    taosWLockLatch(&pStream->lock);
×
861
    if (pStream->status == STREAM_STATUS__INIT && (taosGetTimestampMs() - pStream->createTime > tsStreamFailedTimeout ||
×
862
                                                   taosGetTimestampMs() - pStream->createTime < 0)){
×
863
      pStream->status = STREAM_STATUS__FAILED;
×
864
      tstrncpy(pStream->reserve, "timeout", sizeof(pStream->reserve));
×
865
      mInfo("stream:%s, set status to failed success because of timeout", pStream->name);
×
866
    }
867
    taosWUnLockLatch(&pStream->lock);
×
868
    sdbRelease(pMnode->pSdb, pStream);
×
869
  }
870

871
  return 0;
×
872
}
873

874
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq) {
×
875
  SMnode     *pMnode = pReq->info.node;
×
876
  SStreamObj *pStream = NULL;
×
877
  int32_t     code = TSDB_CODE_SUCCESS;
×
878
  int32_t     errCode = *(int32_t*)pReq->pCont;
×
879
  char streamName[TSDB_STREAM_FNAME_LEN] = {0};
×
880
  memcpy(streamName, POINTER_SHIFT(pReq->pCont,INT_BYTES), TMIN(pReq->contLen - INT_BYTES, TSDB_STREAM_FNAME_LEN - 1));
×
881

882
#ifdef WINDOWS
883
  code = TSDB_CODE_MND_INVALID_PLATFORM;
884
  return code;
885
#endif
886

887
  mInfo("stream:%s, start to set stream failed", streamName);
×
888

889
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
890
  if (pStream == NULL) {
×
891
    mError("stream:%s, failed to get stream when failed stream since %s", streamName, tstrerror(code));
×
892
    return code;
×
893
  }
894

895
  taosWLockLatch(&pStream->lock);
×
896
  pStream->status = STREAM_STATUS__FAILED;
×
897
  tstrncpy(pStream->reserve, tstrerror(errCode), sizeof(pStream->reserve));
×
898
  taosWUnLockLatch(&pStream->lock);
×
899
  mndReleaseStream(pMnode, pStream);
×
900

901
  mInfo("stream:%s, end to set stream failed success", streamName);
×
902

903
  return code;
×
904
}
905

906
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
×
907
  SMnode     *pMnode = pReq->info.node;
×
908
  SStreamObj *pStream = NULL;
×
909
  SStreamObj  streamObj = {0};
×
910
  char       *sql = NULL;
×
911
  int32_t     sqlLen = 0;
×
912
  const char *pMsg = "create stream tasks on dnodes";
×
913
  int32_t     code = TSDB_CODE_SUCCESS;
×
914
  int32_t     lino = 0;
×
915
  STrans     *pTrans = NULL;
×
916

917
  SCMCreateStreamReq createReq = {0};
×
918
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
×
919
  TSDB_CHECK_CODE(code, lino, _OVER);
×
920

921
#ifdef WINDOWS
922
  code = TSDB_CODE_MND_INVALID_PLATFORM;
923
  goto _OVER;
924
#endif
925

926
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
×
927
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
×
928
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
929
    goto _OVER;
×
930
  }
931

932
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
×
933
  if (pStream != NULL && code == 0) {
×
934
    if (pStream->pTaskList != NULL){
×
935
      if (createReq.igExists) {
×
936
        mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
×
937
        mndReleaseStream(pMnode, pStream);
×
938
        tFreeSCMCreateStreamReq(&createReq);
×
939
        return code;
×
940
      } else {
941
        code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
942
        goto _OVER;
×
943
      }
944
    }
945
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
946
    goto _OVER;
×
947
  }
948

949
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
×
950
    goto _OVER;
×
951
  }
952

953
  if (createReq.sql != NULL) {
×
954
    sql = taosStrdup(createReq.sql);
×
955
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
×
956
  }
957

958
  // check for the taskEp update trans
959
  if (isNodeUpdateTransActive()) {
×
960
    mError("stream:%s failed to create stream, node update trans is active", createReq.name);
×
961
    code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
962
    goto _OVER;
×
963
  }
964

965
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
×
966
  if (pSourceDb == NULL) {
×
967
    code = terrno;
×
968
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
969
          tstrerror(code));
970
    goto _OVER;
×
971
  }
972

973
  code = mndCheckForSnode(pMnode, pSourceDb);
×
974
  mndReleaseDb(pMnode, pSourceDb);
×
975
  if (code != 0) {
×
976
    goto _OVER;
×
977
  }
978

979
  // build stream obj from request
980
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
×
981
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
982
    goto _OVER;
×
983
  }
984

985
  bool buildEmptyStream = false;
×
986
  if (createReq.lastTs == 0 && createReq.fillHistory != STREAM_FILL_HISTORY_OFF){
×
987
    streamObj.status = STREAM_STATUS__INIT;
×
988
    buildEmptyStream = true;
×
989
  }
990

991
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
×
992
    goto _OVER;
×
993
  }
994

995
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
×
996
    goto _OVER;
×
997
  }
998

999
  code = doStreamCheck(pMnode, &streamObj);
×
1000
  TSDB_CHECK_CODE(code, lino, _OVER);
×
1001

1002
  // schedule stream task for stream obj
1003
  if (!buildEmptyStream) {
×
1004
    code = mndScheduleStream(pMnode, &streamObj, &createReq);
×
1005
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1006
      mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
1007
      mndTransDrop(pTrans);
×
1008
      goto _OVER;
×
1009
    }
1010

1011
    // add notify info into all stream tasks
1012
    code = addStreamNotifyInfo(&createReq, &streamObj);
×
1013
    if (code != TSDB_CODE_SUCCESS) {
×
1014
      mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
×
1015
      mndTransDrop(pTrans);
×
1016
      goto _OVER;
×
1017
    }
1018

1019
    // add into buffer firstly
1020
    // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
1021
    streamMutexLock(&execInfo.lock);
×
1022
    mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
×
1023
    saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
×
1024
    streamMutexUnlock(&execInfo.lock);
×
1025
  }
1026

1027
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
×
1028
  if (pTrans == NULL || code) {
×
1029
    goto _OVER;
×
1030
  }
1031

1032
  // create stb for stream
1033
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE && !buildEmptyStream) {
×
1034
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
×
1035
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
1036
      goto _OVER;
×
1037
    }
1038
  } else {
1039
    mDebug("stream:%s no need create stable", createReq.name);
×
1040
  }
1041

1042
  // add stream to trans
1043
  code = mndPersistStream(pTrans, &streamObj);
×
1044
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1045
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
1046
    goto _OVER;
×
1047
  }
1048

1049
  // execute creation
1050
  code = mndTransPrepare(pMnode, pTrans);
×
1051
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1052
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
1053
    goto _OVER;
×
1054
  }
1055

1056
  SName dbname = {0};
×
1057
  if (tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) != 0) {
×
1058
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
1059
  }
1060

1061
  SName name = {0};
×
1062
  if (tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE) != 0) {
×
1063
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
1064
  }
1065

1066
  // reuse this function for stream
1067
  if (sql != NULL && sqlLen > 0) {
×
1068
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
1069
  } else {
1070
    char detail[1000] = {0};
×
1071
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
×
1072
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
×
1073
  }
1074

1075
_OVER:
×
1076
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1077
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
×
1078
  } else {
1079
    mDebug("stream:%s create stream completed", createReq.name);
×
1080
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1081
  }
1082

1083
  mndTransDrop(pTrans);
×
1084
  mndReleaseStream(pMnode, pStream);
×
1085
  tFreeSCMCreateStreamReq(&createReq);
×
1086
  tFreeStreamObj(&streamObj);
×
1087

1088
  if (sql != NULL) {
×
1089
    taosMemoryFreeClear(sql);
×
1090
  }
1091

1092
  return code;
×
1093
}
1094

1095
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
×
1096
  SMnode          *pMnode = pReq->info.node;
×
1097
  SStreamObj      *pStream = NULL;
×
1098
  int32_t          code = 0;
×
1099
  SMPauseStreamReq pauseReq = {0};
×
1100

1101
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1102
    return TSDB_CODE_INVALID_MSG;
×
1103
  }
1104

1105
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
1106
  if (pStream == NULL || code != 0) {
×
1107
    if (pauseReq.igNotExists) {
×
1108
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
1109
      return 0;
×
1110
    } else {
1111
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
1112
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1113
    }
1114
  }
1115

1116
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
1117
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
1118
    sdbRelease(pMnode->pSdb, pStream);
×
1119
    return code;
×
1120
  }
1121

1122
  // check if it is conflict with other trans in both sourceDb and targetDb.
1123
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
×
1124
  if (code) {
×
1125
    sdbRelease(pMnode->pSdb, pStream);
×
1126
    return code;
×
1127
  }
1128

1129
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
1130
  if (updated) {
×
1131
    mError("tasks are not ready for restart, node update detected");
×
1132
    sdbRelease(pMnode->pSdb, pStream);
×
1133
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1134
  }
1135

1136
  STrans *pTrans = NULL;
×
1137
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
×
1138
                       &pTrans);
1139
  if (pTrans == NULL || code) {
×
1140
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1141
    sdbRelease(pMnode->pSdb, pStream);
×
1142
    return code;
×
1143
  }
1144

1145
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
×
1146
  if (code) {
×
1147
    sdbRelease(pMnode->pSdb, pStream);
×
1148
    mndTransDrop(pTrans);
×
1149
    return code;
×
1150
  }
1151

1152
  // if nodeUpdate happened, not send pause trans
1153
  code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
×
1154
  if (code) {
×
1155
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
1156
    sdbRelease(pMnode->pSdb, pStream);
×
1157
    mndTransDrop(pTrans);
×
1158
    return code;
×
1159
  }
1160

1161
  code = mndTransPrepare(pMnode, pTrans);
×
1162
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1163
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
1164
    sdbRelease(pMnode->pSdb, pStream);
×
1165
    mndTransDrop(pTrans);
×
1166
    return code;
×
1167
  }
1168

1169
  sdbRelease(pMnode->pSdb, pStream);
×
1170
  mndTransDrop(pTrans);
×
1171

1172
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1173
}
1174

1175
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
×
1176
  SStreamObj *pStream = NULL;
×
1177
  void       *pIter = NULL;
×
1178
  SSdb       *pSdb = pMnode->pSdb;
×
1179
  int64_t     maxChkptId = 0;
×
1180

1181
  while (1) {
1182
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
1183
    if (pIter == NULL) break;
×
1184

1185
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
×
1186
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64, pStream, pStream->name, pStream->uid,
×
1187
           pStream->checkpointId);
1188
    sdbRelease(pSdb, pStream);
×
1189
  }
1190

1191
  {  // check the max checkpoint id from all vnodes.
1192
    int64_t maxCheckpointId = -1;
×
1193
    if (lock) {
×
1194
      streamMutexLock(&execInfo.lock);
×
1195
    }
1196

1197
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
1198
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
×
1199
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
1200
      if (p == NULL || pEntry == NULL) {
×
1201
        continue;
×
1202
      }
1203

1204
      if (pEntry->checkpointInfo.failed) {
×
1205
        continue;
×
1206
      }
1207

1208
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
×
1209
        maxCheckpointId = pEntry->checkpointInfo.latestId;
×
1210
      }
1211
    }
1212

1213
    if (lock) {
×
1214
      streamMutexUnlock(&execInfo.lock);
×
1215
    }
1216

1217
    if (maxCheckpointId > maxChkptId) {
×
1218
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1219
             maxCheckpointId);
1220
      maxChkptId = maxCheckpointId;
×
1221
    }
1222
  }
1223

1224
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
×
1225
  return maxChkptId + 1;
×
1226
}
1227

1228
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
×
1229
                                               int8_t mndTrigger, bool lock) {
1230
  int32_t code = TSDB_CODE_SUCCESS;
×
1231
  bool    conflict = false;
×
1232
  int64_t ts = taosGetTimestampMs();
×
1233
  STrans *pTrans = NULL;
×
1234

1235
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
×
1236
    return code;
×
1237
  }
1238

1239
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
×
1240
  if (code) {
×
1241
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1242
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
1243
    goto _ERR;
×
1244
  }
1245

1246
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
×
1247
                       "gen checkpoint for stream", &pTrans);
1248
  if (code) {
×
1249
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1250
           tstrerror(code));
1251
    goto _ERR;
×
1252
  }
1253

1254
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
×
1255
  if (code) {
×
1256
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
1257
    goto _ERR;
×
1258
  }
1259

1260
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64, pStream->name, checkpointId);
×
1261

1262
  taosWLockLatch(&pStream->lock);
×
1263
  pStream->currentTick = 1;
×
1264

1265
  // 1. redo action: broadcast checkpoint source msg for all source vg
1266
  int32_t totalLevel = taosArrayGetSize(pStream->pTaskList);
×
1267
  for (int32_t i = 0; i < totalLevel; i++) {
×
1268
    SArray      *pLevel = taosArrayGetP(pStream->pTaskList, i);
×
1269
    SStreamTask *p = taosArrayGetP(pLevel, 0);
×
1270

1271
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
×
1272
      int32_t sz = taosArrayGetSize(pLevel);
×
1273
      for (int32_t j = 0; j < sz; j++) {
×
1274
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
×
1275
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
×
1276

1277
        if (code != TSDB_CODE_SUCCESS) {
×
1278
          taosWUnLockLatch(&pStream->lock);
×
1279
          goto _ERR;
×
1280
        }
1281
      }
1282
    }
1283
  }
1284

1285
  // 2. reset tick
1286
  pStream->checkpointId = checkpointId;
×
1287
  pStream->checkpointFreq = taosGetTimestampMs();
×
1288
  pStream->currentTick = 0;
×
1289

1290
  // 3. commit log: stream checkpoint info
1291
  pStream->version = pStream->version + 1;
×
1292
  taosWUnLockLatch(&pStream->lock);
×
1293

1294
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
×
1295
    goto _ERR;
×
1296
  }
1297

1298
  code = mndTransPrepare(pMnode, pTrans);
×
1299
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1300
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1301
  } else {
1302
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1303
  }
1304

1305
_ERR:
×
1306
  mndTransDrop(pTrans);
×
1307
  return code;
×
1308
}
1309

1310
int32_t extractStreamNodeList(SMnode *pMnode) {
×
1311
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
×
1312
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
×
1313
    if (code) {
×
1314
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
1315
      return code;
×
1316
    }
1317
  }
1318

1319
  return taosArrayGetSize(execInfo.pNodeList);
×
1320
}
1321

1322
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
×
1323
  int32_t code = 0;
×
1324
  if (mndStreamNodeIsUpdated(pMnode)) {
×
1325
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1326
  }
1327

1328
  streamMutexLock(&execInfo.lock);
×
1329
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
×
1330
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
×
1331
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
×
1332
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
1333
      code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1334
    }
1335
  }
1336

1337
  streamMutexUnlock(&execInfo.lock);
×
1338
  return code;
×
1339
}
1340

1341
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
×
1342
  int64_t ts = -1;
×
1343
  int32_t taskId = -1;
×
1344

1345
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
×
1346
    STaskId          *p = taosArrayGet(pTaskList, i);
×
1347
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
1348
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
×
1349
      continue;
×
1350
    }
1351

1352
    // -1 denote not ready now or never ready till now
1353
    if (pEntry->hTaskId != 0) {
×
1354
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
×
1355
            " exists, checkpoint not issued",
1356
            pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1357
            pEntry->hTaskId);
1358
      return -1;
×
1359
    }
1360

1361
    if (pEntry->status != TASK_STATUS__READY) {
×
1362
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
×
1363
            (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
1364
      return -1;
×
1365
    }
1366

1367
    if (ts < pEntry->startTime) {
×
1368
      ts = pEntry->startTime;
×
1369
      taskId = pEntry->id.taskId;
×
1370
    }
1371
  }
1372

1373
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
×
1374
  return ts;
×
1375
}
1376

1377
typedef struct {
1378
  int64_t streamId;
1379
  int64_t duration;
1380
} SCheckpointInterval;
1381

1382
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
×
1383
  const SCheckpointInterval *pInt1 = p1;
×
1384
  const SCheckpointInterval *pInt2 = p2;
×
1385
  if (pInt1->duration == pInt2->duration) {
×
1386
    return 0;
×
1387
  }
1388

1389
  return pInt1->duration > pInt2->duration ? -1 : 1;
×
1390
}
1391

1392
// all tasks of this stream should be ready, otherwise do nothing
1393
static bool isStreamReadyHelp(int64_t now, SStreamObj *pStream) {
×
1394
  bool ready = false;
×
1395

1396
  streamMutexLock(&execInfo.lock);
×
1397

1398
  int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
×
1399
  if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
×
1400
    if (lastReadyTs != -1) {
×
1401
      mInfo("not start checkpoint, stream:0x%" PRIx64 " readyTs:%" PRId64 " ready duration:%.2fs less than threshold",
×
1402
            pStream->uid, lastReadyTs, (now - lastReadyTs) / 1000.0);
1403
    }
1404

1405
    ready = false;
×
1406
  } else {
1407
    ready = true;
×
1408
  }
1409

1410
  streamMutexUnlock(&execInfo.lock);
×
1411
  return ready;
×
1412
}
1413

1414
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
×
1415
  SMnode     *pMnode = pReq->info.node;
×
1416
  SSdb       *pSdb = pMnode->pSdb;
×
1417
  void       *pIter = NULL;
×
1418
  SStreamObj *pStream = NULL;
×
1419
  int32_t     code = 0;
×
1420
  int32_t     numOfCheckpointTrans = 0;
×
1421
  SArray     *pLongChkpts = NULL;
×
1422
  SArray     *pList = NULL;
×
1423
  int64_t     now = taosGetTimestampMs();
×
1424

1425
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
×
1426
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1427
  }
1428

1429
  pList = taosArrayInit(4, sizeof(SCheckpointInterval));
×
1430
  if (pList == NULL) {
×
1431
    mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
1432
    return terrno;
×
1433
  }
1434

1435
  pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo));
×
1436
  if (pLongChkpts == NULL) {
×
1437
    mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
1438
    taosArrayDestroy(pList);
×
1439
    return terrno;
×
1440
  }
1441

1442
  // check if ongong checkpoint trans or long chkpt trans exist.
1443
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts);
×
1444
  if (code) {
×
1445
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1446

1447
    taosArrayDestroy(pList);
×
1448
    taosArrayDestroy(pLongChkpts);
×
1449
    return code;
×
1450
  }
1451

1452
  // kill long exec checkpoint and set task status
1453
  if (taosArrayGetSize(pLongChkpts) > 0) {
×
1454
    killChkptAndResetStreamTask(pMnode, pLongChkpts);
×
1455

1456
    taosArrayDestroy(pList);
×
1457
    taosArrayDestroy(pLongChkpts);
×
1458
    return TSDB_CODE_SUCCESS;
×
1459
  }
1460

1461
  taosArrayDestroy(pLongChkpts);
×
1462

1463
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
1464
    int64_t duration = now - pStream->checkpointFreq;
×
1465
    if (duration < tsStreamCheckpointInterval * 1000) {
×
1466
      sdbRelease(pSdb, pStream);
×
1467
      continue;
×
1468
    }
1469

1470
    bool ready = isStreamReadyHelp(now, pStream);
×
1471
    if (!ready) {
×
1472
      sdbRelease(pSdb, pStream);
×
1473
      continue;
×
1474
    }
1475

1476
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
×
1477
    void               *p = taosArrayPush(pList, &in);
×
1478
    if (p) {
×
1479
      int32_t currentSize = taosArrayGetSize(pList);
×
1480
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
×
1481
             "s), concurrently launch threshold:%d",
1482
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1483
             tsMaxConcurrentCheckpoint);
1484
    } else {
1485
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1486
    }
1487
    sdbRelease(pSdb, pStream);
×
1488
  }
1489

1490
  int32_t size = taosArrayGetSize(pList);
×
1491
  if (size == 0) {
×
1492
    taosArrayDestroy(pList);
×
1493
    return code;
×
1494
  }
1495

1496
  taosArraySort(pList, streamWaitComparFn);
×
1497

1498
  int32_t numOfQual = taosArrayGetSize(pList);
×
1499
  if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
×
1500
    mDebug(
×
1501
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1502
        "checkpoint trans are not allowed, wait for 30s",
1503
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
1504
    taosArrayDestroy(pList);
×
1505
    return code;
×
1506
  }
1507

1508
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
×
1509
  mDebug(
×
1510
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1511
      "concurrent trans threshold:%d",
1512
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1513

1514
  int32_t started = 0;
×
1515
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
×
1516

1517
  for (int32_t i = 0; i < numOfQual; ++i) {
×
1518
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
×
1519
    if (pCheckpointInfo == NULL) {
×
1520
      continue;
×
1521
    }
1522

1523
    SStreamObj *p = NULL;
×
1524
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
×
1525
    if (p != NULL && code == 0) {
×
1526
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
×
1527
      sdbRelease(pSdb, p);
×
1528

1529
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
1530
        started += 1;
×
1531

1532
        if (started >= capacity) {
×
1533
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
×
1534
                 (started + numOfCheckpointTrans));
1535
          break;
×
1536
        }
1537
      } else {
1538
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1539
      }
1540
    }
1541
  }
1542

1543
  taosArrayDestroy(pList);
×
1544
  return code;
×
1545
}
1546

1547
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
×
1548
  SMnode     *pMnode = pReq->info.node;
×
1549
  SStreamObj *pStream = NULL;
×
1550
  int32_t     code = 0;
×
1551

1552
  SMDropStreamReq dropReq = {0};
×
1553
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
×
1554
    mError("invalid drop stream msg recv, discarded");
×
1555
    code = TSDB_CODE_INVALID_MSG;
×
1556
    TAOS_RETURN(code);
×
1557
  }
1558

1559
  mDebug("recv drop stream:%s msg", dropReq.name);
×
1560

1561
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
×
1562
  if (pStream == NULL || code != 0) {
×
1563
    if (dropReq.igNotExists) {
×
1564
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
1565
      sdbRelease(pMnode->pSdb, pStream);
×
1566
      tFreeMDropStreamReq(&dropReq);
×
1567
      return 0;
×
1568
    } else {
1569
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
1570
      tFreeMDropStreamReq(&dropReq);
×
1571
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1572
    }
1573
  }
1574

1575
  if (pStream->smaId != 0) {
×
1576
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
×
1577

1578
    void    *pIter = NULL;
×
1579
    SSmaObj *pSma = NULL;
×
1580
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1581
    while (pIter) {
×
1582
      if (pSma && pSma->uid == pStream->smaId) {
×
1583
        sdbRelease(pMnode->pSdb, pSma);
×
1584
        sdbRelease(pMnode->pSdb, pStream);
×
1585

1586
        sdbCancelFetch(pMnode->pSdb, pIter);
×
1587
        tFreeMDropStreamReq(&dropReq);
×
1588
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
1589

1590
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
×
1591
               dropReq.name, pStream->uid, tstrerror(terrno));
1592
        TAOS_RETURN(code);
×
1593
      }
1594

1595
      if (pSma) {
×
1596
        sdbRelease(pMnode->pSdb, pSma);
×
1597
      }
1598

1599
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1600
    }
1601
  }
1602

1603
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
×
1604
    sdbRelease(pMnode->pSdb, pStream);
×
1605
    tFreeMDropStreamReq(&dropReq);
×
1606
    return -1;
×
1607
  }
1608

1609
  // check if it is conflict with other trans in both sourceDb and targetDb.
1610
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
×
1611
  if (code) {
×
1612
    sdbRelease(pMnode->pSdb, pStream);
×
1613
    tFreeMDropStreamReq(&dropReq);
×
1614
    return code;
×
1615
  }
1616

1617
  STrans *pTrans = NULL;
×
1618
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
×
1619
  if (pTrans == NULL || code) {
×
1620
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1621
    sdbRelease(pMnode->pSdb, pStream);
×
1622
    tFreeMDropStreamReq(&dropReq);
×
1623
    TAOS_RETURN(code);
×
1624
  }
1625

1626
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
×
1627
  if (code) {
×
1628
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1629
    sdbRelease(pMnode->pSdb, pStream);
×
1630
    mndTransDrop(pTrans);
×
1631
    tFreeMDropStreamReq(&dropReq);
×
1632
    TAOS_RETURN(code);
×
1633
  }
1634

1635
  // drop all tasks
1636
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
×
1637
  if (code) {
×
1638
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1639
    sdbRelease(pMnode->pSdb, pStream);
×
1640
    mndTransDrop(pTrans);
×
1641
    tFreeMDropStreamReq(&dropReq);
×
1642
    TAOS_RETURN(code);
×
1643
  }
1644

1645
  // drop stream
1646
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
×
1647
  if (code) {
×
1648
    sdbRelease(pMnode->pSdb, pStream);
×
1649
    mndTransDrop(pTrans);
×
1650
    tFreeMDropStreamReq(&dropReq);
×
1651
    TAOS_RETURN(code);
×
1652
  }
1653

1654
  code = mndTransPrepare(pMnode, pTrans);
×
1655
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1656
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1657
    sdbRelease(pMnode->pSdb, pStream);
×
1658
    mndTransDrop(pTrans);
×
1659
    tFreeMDropStreamReq(&dropReq);
×
1660
    TAOS_RETURN(code);
×
1661
  }
1662

1663
  // kill the related checkpoint trans
1664
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
×
1665
  if (transId != 0) {
×
1666
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
1667
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1668
  }
1669

1670
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
×
1671
         pStream->uid, transId);
1672

1673
  removeStreamTasksInBuf(pStream, &execInfo);
×
1674

1675
  SName name = {0};
×
1676
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1677
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
×
1678

1679
  sdbRelease(pMnode->pSdb, pStream);
×
1680
  mndTransDrop(pTrans);
×
1681
  tFreeMDropStreamReq(&dropReq);
×
1682

1683
  if (code == 0) {
×
1684
    return TSDB_CODE_ACTION_IN_PROGRESS;
×
1685
  } else {
1686
    TAOS_RETURN(code);
×
1687
  }
1688
}
1689

1690
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
8✔
1691
  SSdb   *pSdb = pMnode->pSdb;
8✔
1692
  void   *pIter = NULL;
8✔
1693
  int32_t code = 0;
8✔
1694

1695
  while (1) {
×
1696
    SStreamObj *pStream = NULL;
8✔
1697
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
8✔
1698
    if (pIter == NULL) break;
8!
1699

1700
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
×
1701
      if (pStream->sourceDbUid != pStream->targetDbUid) {
×
1702
        sdbRelease(pSdb, pStream);
×
1703
        sdbCancelFetch(pSdb, pIter);
×
1704
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
×
1705
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
1706
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
×
1707
      } else {
1708
        // kill the related checkpoint trans
1709
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
×
1710
        if (transId != 0) {
×
1711
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
1712
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1713
        }
1714

1715
        // drop the stream obj in execInfo
1716
        removeStreamTasksInBuf(pStream, &execInfo);
×
1717

1718
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
×
1719
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1720
          sdbRelease(pSdb, pStream);
×
1721
          sdbCancelFetch(pSdb, pIter);
×
1722
          return code;
×
1723
        }
1724
      }
1725
    }
1726

1727
    sdbRelease(pSdb, pStream);
×
1728
  }
1729

1730
  return 0;
8✔
1731
}
1732

1733
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1734
  SMnode     *pMnode = pReq->info.node;
×
1735
  SSdb       *pSdb = pMnode->pSdb;
×
1736
  int32_t     numOfRows = 0;
×
1737
  SStreamObj *pStream = NULL;
×
1738
  int32_t     code = 0;
×
1739

1740
  while (numOfRows < rows) {
×
1741
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
1742
    if (pShow->pIter == NULL) break;
×
1743

1744
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
×
1745
    if (code == 0) {
×
1746
      numOfRows++;
×
1747
    }
1748
    sdbRelease(pSdb, pStream);
×
1749
  }
1750

1751
  pShow->numOfRows += numOfRows;
×
1752
  return numOfRows;
×
1753
}
1754

1755
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
1756
  SSdb *pSdb = pMnode->pSdb;
×
1757
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1758
}
×
1759

1760
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
1761
  SMnode     *pMnode = pReq->info.node;
×
1762
  SSdb       *pSdb = pMnode->pSdb;
×
1763
  int32_t     numOfRows = 0;
×
1764
  SStreamObj *pStream = NULL;
×
1765
  int32_t     code = 0;
×
1766

1767
  streamMutexLock(&execInfo.lock);
×
1768
  mndInitStreamExecInfo(pMnode, &execInfo);
×
1769
  streamMutexUnlock(&execInfo.lock);
×
1770

1771
  while (numOfRows < rowsCapacity) {
×
1772
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
1773
    if (pShow->pIter == NULL) {
×
1774
      break;
×
1775
    }
1776

1777
    // lock
1778
    taosRLockLatch(&pStream->lock);
×
1779

1780
    int32_t count = mndGetNumOfStreamTasks(pStream);
×
1781
    if (numOfRows + count > rowsCapacity) {
×
1782
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
×
1783
      if (code) {
×
1784
        mError("failed to prepare the result block buffer, quit return value");
×
1785
        taosRUnLockLatch(&pStream->lock);
×
1786
        sdbRelease(pSdb, pStream);
×
1787
        continue;
×
1788
      }
1789
    }
1790

1791
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
×
1792
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
×
1793
    if (pSourceDb != NULL) {
×
1794
      precision = pSourceDb->cfg.precision;
×
1795
      mndReleaseDb(pMnode, pSourceDb);
×
1796
    }
1797

1798
    // add row for each task
1799
    SStreamTaskIter *pIter = NULL;
×
1800
    code = createStreamTaskIter(pStream, &pIter);
×
1801
    if (code) {
×
1802
      taosRUnLockLatch(&pStream->lock);
×
1803
      sdbRelease(pSdb, pStream);
×
1804
      mError("failed to create task iter for stream:%s", pStream->name);
×
1805
      continue;
×
1806
    }
1807

1808
    while (streamTaskIterNextTask(pIter)) {
×
1809
      SStreamTask *pTask = NULL;
×
1810
      code = streamTaskIterGetCurrent(pIter, &pTask);
×
1811
      if (code) {
×
1812
        destroyStreamTaskIter(pIter);
×
1813
        break;
×
1814
      }
1815

1816
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
×
1817
      if (code == TSDB_CODE_SUCCESS) {
×
1818
        numOfRows++;
×
1819
      }
1820
    }
1821

1822
    pBlock->info.rows = numOfRows;
×
1823

1824
    destroyStreamTaskIter(pIter);
×
1825
    taosRUnLockLatch(&pStream->lock);
×
1826

1827
    sdbRelease(pSdb, pStream);
×
1828
  }
1829

1830
  pShow->numOfRows += numOfRows;
×
1831
  return numOfRows;
×
1832
}
1833

1834
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
1835
  SSdb *pSdb = pMnode->pSdb;
×
1836
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
1837
}
×
1838

1839
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
×
1840
  SMnode     *pMnode = pReq->info.node;
×
1841
  SStreamObj *pStream = NULL;
×
1842
  int32_t     code = 0;
×
1843

1844
  SMPauseStreamReq pauseReq = {0};
×
1845
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1846
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1847
  }
1848

1849
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
1850
  if (pStream == NULL || code != 0) {
×
1851
    if (pauseReq.igNotExists) {
×
1852
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
×
1853
      return 0;
×
1854
    } else {
1855
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
×
1856
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1857
    }
1858
  }
1859

1860
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
×
1861

1862
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
1863
    sdbRelease(pMnode->pSdb, pStream);
×
1864
    return code;
×
1865
  }
1866

1867
  // check if it is conflict with other trans in both sourceDb and targetDb.
1868
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
×
1869
  if (code) {
×
1870
    sdbRelease(pMnode->pSdb, pStream);
×
1871
    TAOS_RETURN(code);
×
1872
  }
1873

1874
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
1875
  if (updated) {
×
1876
    mError("tasks are not ready for pause, node update detected");
×
1877
    sdbRelease(pMnode->pSdb, pStream);
×
1878
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1879
  }
1880

1881
  {  // check for tasks, if tasks are not ready, not allowed to pause
1882
    bool found = false;
×
1883
    bool readyToPause = true;
×
1884
    streamMutexLock(&execInfo.lock);
×
1885

1886
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
1887
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
×
1888
      if (p == NULL) {
×
1889
        continue;
×
1890
      }
1891

1892
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
1893
      if (pEntry == NULL) {
×
1894
        continue;
×
1895
      }
1896

1897
      if (pEntry->id.streamId != pStream->uid) {
×
1898
        continue;
×
1899
      }
1900

1901
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
×
1902
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
×
1903
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
1904
        readyToPause = false;
×
1905
      }
1906

1907
      found = true;
×
1908
    }
1909

1910
    streamMutexUnlock(&execInfo.lock);
×
1911
    if (!found) {
×
1912
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
1913
      sdbRelease(pMnode->pSdb, pStream);
×
1914
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1915
    }
1916

1917
    if (!readyToPause) {
×
1918
      mError("stream:%s task not ready for pause yet", pauseReq.name);
×
1919
      sdbRelease(pMnode->pSdb, pStream);
×
1920
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1921
    }
1922
  }
1923

1924
  STrans *pTrans = NULL;
×
1925
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
×
1926
  if (pTrans == NULL || code) {
×
1927
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
1928
    sdbRelease(pMnode->pSdb, pStream);
×
1929
    return code;
×
1930
  }
1931

1932
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
×
1933
  if (code) {
×
1934
    sdbRelease(pMnode->pSdb, pStream);
×
1935
    mndTransDrop(pTrans);
×
1936
    return code;
×
1937
  }
1938

1939
  // if nodeUpdate happened, not send pause trans
1940
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
×
1941
  if (code) {
×
1942
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
1943
    sdbRelease(pMnode->pSdb, pStream);
×
1944
    mndTransDrop(pTrans);
×
1945
    return code;
×
1946
  }
1947

1948
  // pause stream
1949
  taosWLockLatch(&pStream->lock);
×
1950
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
1951
  if (code) {
×
1952
    taosWUnLockLatch(&pStream->lock);
×
1953
    sdbRelease(pMnode->pSdb, pStream);
×
1954
    mndTransDrop(pTrans);
×
1955
    return code;
×
1956
  }
1957

1958
  taosWUnLockLatch(&pStream->lock);
×
1959

1960
  code = mndTransPrepare(pMnode, pTrans);
×
1961
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1962
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
1963
    sdbRelease(pMnode->pSdb, pStream);
×
1964
    mndTransDrop(pTrans);
×
1965
    return code;
×
1966
  }
1967

1968
  sdbRelease(pMnode->pSdb, pStream);
×
1969
  mndTransDrop(pTrans);
×
1970

1971
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1972
}
1973

1974
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
×
1975
  SMnode     *pMnode = pReq->info.node;
×
1976
  SStreamObj *pStream = NULL;
×
1977
  int32_t     code = 0;
×
1978

1979
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
1980
    return code;
×
1981
  }
1982

1983
  SMResumeStreamReq resumeReq = {0};
×
1984
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
×
1985
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1986
  }
1987

1988
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
×
1989
  if (pStream == NULL || code != 0) {
×
1990
    if (resumeReq.igNotExists) {
×
1991
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
×
1992
      sdbRelease(pMnode->pSdb, pStream);
×
1993
      return 0;
×
1994
    } else {
1995
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
×
1996
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1997
    }
1998
  }
1999

2000
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
×
2001
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
×
2002
    sdbRelease(pMnode->pSdb, pStream);
×
2003
    return -1;
×
2004
  }
2005

2006
  // check if it is conflict with other trans in both sourceDb and targetDb.
2007
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
×
2008
  if (code) {
×
2009
    sdbRelease(pMnode->pSdb, pStream);
×
2010
    return code;
×
2011
  }
2012

2013
  STrans *pTrans = NULL;
×
2014
  code =
2015
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
×
2016
  if (pTrans == NULL || code) {
×
2017
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
2018
    sdbRelease(pMnode->pSdb, pStream);
×
2019
    return code;
×
2020
  }
2021

2022
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
×
2023
  if (code) {
×
2024
    sdbRelease(pMnode->pSdb, pStream);
×
2025
    mndTransDrop(pTrans);
×
2026
    return code;
×
2027
  }
2028

2029
  // set the resume action
2030
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
×
2031
  if (code) {
×
2032
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
2033
    sdbRelease(pMnode->pSdb, pStream);
×
2034
    mndTransDrop(pTrans);
×
2035
    return code;
×
2036
  }
2037

2038
  // resume stream
2039
  taosWLockLatch(&pStream->lock);
×
2040
  pStream->status = STREAM_STATUS__NORMAL;
×
2041
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
×
2042
    taosWUnLockLatch(&pStream->lock);
×
2043

2044
    sdbRelease(pMnode->pSdb, pStream);
×
2045
    mndTransDrop(pTrans);
×
2046
    return code;
×
2047
  }
2048

2049
  taosWUnLockLatch(&pStream->lock);
×
2050
  code = mndTransPrepare(pMnode, pTrans);
×
2051
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2052
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
2053
    sdbRelease(pMnode->pSdb, pStream);
×
2054
    mndTransDrop(pTrans);
×
2055
    return code;
×
2056
  }
2057

2058
  sdbRelease(pMnode->pSdb, pStream);
×
2059
  mndTransDrop(pTrans);
×
2060

2061
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
2062
}
2063

2064
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
2065
  SMnode     *pMnode = pReq->info.node;
×
2066
  SStreamObj *pStream = NULL;
×
2067
  int32_t     code = 0;
×
2068

2069
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
2070
    return code;
×
2071
  }
2072

2073
  SMResetStreamReq resetReq = {0};
×
2074
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
2075
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
2076
  }
2077

2078
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
2079

2080
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
2081
  if (pStream == NULL || code != 0) {
×
2082
    if (resetReq.igNotExists) {
×
2083
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
2084
      return 0;
×
2085
    } else {
2086
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
2087
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2088
    }
2089
  }
2090

2091
  //todo(liao hao jun)
2092
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
2093
}
2094

2095
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes,
×
2096
                                      STrans **pUpdateTrans, SArray* pStreamList) {
2097
  SSdb   *pSdb = pMnode->pSdb;
×
2098
  void   *pIter = NULL;
×
2099
  STrans *pTrans = NULL;
×
2100
  int32_t code = 0;
×
2101
  *pUpdateTrans = NULL;
×
2102

2103
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
2104
  while (1) {
×
2105
    SStreamObj *pStream = NULL;
×
2106
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
2107
    if (pIter == NULL) {
×
2108
      break;
×
2109
    }
2110

2111
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
×
2112
    sdbRelease(pSdb, pStream);
×
2113

2114
    if (code) {
×
2115
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
2116
      sdbCancelFetch(pSdb, pIter);
×
2117
      return code;
×
2118
    }
2119
  }
2120

2121
  while (1) {
×
2122
    SStreamObj *pStream = NULL;
×
2123
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
2124
    if (pIter == NULL) {
×
2125
      break;
×
2126
    }
2127

2128
    // here create only one trans
2129
    if (pTrans == NULL) {
×
2130
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
×
2131
                           "update task epsets", &pTrans);
2132
      if (pTrans == NULL || code) {
×
2133
        sdbRelease(pSdb, pStream);
×
2134
        sdbCancelFetch(pSdb, pIter);
×
2135
        return terrno = code;
×
2136
      }
2137
    }
2138

2139
    if (!includeAllNodes) {
×
2140
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
×
2141
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
×
2142
      if (p1 == NULL && p2 == NULL) {
×
2143
        mDebug("stream:0x%" PRIx64 " %s not involved in nodeUpdate, ignore", pStream->uid, pStream->name);
×
2144
        sdbRelease(pSdb, pStream);
×
2145
        continue;
×
2146
      }
2147
    }
2148

2149
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
×
2150
           pStream->name, pTrans->id);
2151

2152
    // NOTE: for each stream, we register one trans entry for task update
2153
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
×
2154
    if (code) {
×
2155
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
2156
    }
2157

2158
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
×
2159

2160
    // todo: not continue, drop all and retry again
2161
    if (code != TSDB_CODE_SUCCESS) {
×
2162
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
2163
             tstrerror(code));
2164
      sdbRelease(pSdb, pStream);
×
2165
      continue;
×
2166
    }
2167

2168
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
2169
    if (code == 0) {
×
2170
      taosArrayPush(pStreamList, &pStream->uid);
×
2171
    }
2172

2173
    sdbRelease(pSdb, pStream);
×
2174

2175
    if (code != TSDB_CODE_SUCCESS) {
×
2176
      sdbCancelFetch(pSdb, pIter);
×
2177
      return code;
×
2178
    }
2179
  }
2180

2181
  // no need to build the trans to handle the vgroup update
2182
  *pUpdateTrans = pTrans;
×
2183
  return code;
×
2184
}
2185

2186
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
×
2187
  SSdb       *pSdb = pMnode->pSdb;
×
2188
  SStreamObj *pStream = NULL;
×
2189
  void       *pIter = NULL;
×
2190
  int32_t     code = 0;
×
2191

2192
  mDebug("start to refresh node list by existed streams");
×
2193

2194
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
2195
  if (pHash == NULL) {
×
2196
    return terrno;
×
2197
  }
2198

2199
  while (1) {
×
2200
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
2201
    if (pIter == NULL) {
×
2202
      break;
×
2203
    }
2204

2205
    taosWLockLatch(&pStream->lock);
×
2206

2207
    SStreamTaskIter *pTaskIter = NULL;
×
2208
    code = createStreamTaskIter(pStream, &pTaskIter);
×
2209
    if (code) {
×
2210
      taosWUnLockLatch(&pStream->lock);
×
2211
      sdbRelease(pSdb, pStream);
×
2212
      mError("failed to create task iter for stream:%s", pStream->name);
×
2213
      continue;
×
2214
    }
2215

2216
    while (streamTaskIterNextTask(pTaskIter)) {
×
2217
      SStreamTask *pTask = NULL;
×
2218
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
×
2219
      if (code) {
×
2220
        break;
×
2221
      }
2222

2223
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
×
2224
      epsetAssign(&entry.epset, &pTask->info.epSet);
×
2225
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
×
2226
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
×
2227
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2228
      }
2229
    }
2230

2231
    destroyStreamTaskIter(pTaskIter);
×
2232
    taosWUnLockLatch(&pStream->lock);
×
2233

2234
    sdbRelease(pSdb, pStream);
×
2235
  }
2236

2237
  taosArrayClear(pNodeList);
×
2238

2239
  // convert to list
2240
  pIter = NULL;
×
2241
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
×
2242
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
×
2243

2244
    void *p = taosArrayPush(pNodeList, pEntry);
×
2245
    if (p == NULL) {
×
2246
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2247
      if (code == 0) {
×
2248
        code = terrno;
×
2249
      }
2250
      continue;
×
2251
    }
2252

2253
    char    buf[256] = {0};
×
2254
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
×
2255
    if (ret != 0) {                                                // print error and continue
×
2256
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2257
    }
2258

2259
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
×
2260
  }
2261

2262
  taosHashCleanup(pHash);
×
2263

2264
  mDebug("numOfvNodes:%d get after extracting nodeInfo from all streams", (int32_t)taosArrayGetSize(pNodeList));
×
2265
  return code;
×
2266
}
2267

2268
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2269
  void   *pIter = NULL;
×
2270
  int32_t code = 0;
×
2271
  while (1) {
×
2272
    SVgObj *pVgroup = NULL;
×
2273
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2274
    if (pIter == NULL) {
×
2275
      break;
×
2276
    }
2277

2278
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2279
    sdbRelease(pSdb, pVgroup);
×
2280

2281
    if (code == 0) {
×
2282
      int32_t size = taosHashGetSize(pDBMap);
×
2283
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2284
    }
2285
  }
2286
}
×
2287

2288
static int32_t doProcessNodeCheckHelp(SArray *pNodeSnapshot, SMnode *pMnode, SVgroupChangeInfo *pChangeInfo,
×
2289
                                      bool *pUpdateAllVgroups) {
2290
  int32_t code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
×
2291
  if (code) {
×
2292
    mDebug("failed to remove expired node entry in buf, code:%s", tstrerror(code));
×
2293
    return code;
×
2294
  }
2295

2296
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, pChangeInfo);
×
2297
  if (code) {
×
2298
    mDebug("failed to find changed vnode(s) during vnode(s) check, code:%s", tstrerror(code));
×
2299
    return code;
×
2300
  }
2301

2302
  {
2303
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
×
2304
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
2305
      *pUpdateAllVgroups = true;
×
2306
      execInfo.switchFromFollower = false;  // reset the flag
×
2307
      addAllDbsIntoHashmap(pChangeInfo->pDBMap, pMnode->pSdb);
×
2308
    }
2309
  }
2310

2311
  if (taosArrayGetSize(pChangeInfo->pUpdateNodeList) > 0 || (*pUpdateAllVgroups)) {
×
2312
    // kill current active checkpoint transaction, since the transaction is vnode wide.
2313
    killAllCheckpointTrans(pMnode, pChangeInfo);
×
2314
  } else {
2315
    mDebug("no update found in vnode(s) list");
×
2316
  }
2317

2318
  return code;
×
2319
}
2320

2321
// this function runs by only one thread, so it is not multi-thread safe
2322
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
×
2323
  int32_t           code = 0;
×
2324
  bool              allReady = true;
×
2325
  SArray           *pNodeSnapshot = NULL;
×
2326
  SMnode           *pMnode = pMsg->info.node;
×
2327
  int64_t           tsms = taosGetTimestampMs();
×
2328
  int64_t           ts = tsms / 1000;
×
2329
  bool              updateAllVgroups = false;
×
2330
  SVgroupChangeInfo changeInfo = {0};
×
2331

2332
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
×
2333
  if (old != 0) {
×
2334
    mDebug("still in checking node change");
×
2335
    return 0;
×
2336
  }
2337

2338
  mDebug("start to do node changing check, ts:%" PRId64, tsms);
×
2339

2340
  streamMutexLock(&execInfo.lock);
×
2341
  int32_t numOfNodes = extractStreamNodeList(pMnode);
×
2342
  streamMutexUnlock(&execInfo.lock);
×
2343

2344
  if (numOfNodes == 0) {
×
2345
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
2346
    execInfo.ts = ts;
×
2347
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2348
    return 0;
×
2349
  }
2350

2351
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot, NULL);
×
2352
  if (code) {
×
2353
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2354
  }
2355

2356
  if (!allReady) {
×
2357
    taosArrayDestroy(pNodeSnapshot);
×
2358
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
2359
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
×
2360
    return 0;
×
2361
  }
2362

2363
  streamMutexLock(&execInfo.lock);
×
2364
  code = doProcessNodeCheckHelp(pNodeSnapshot, pMnode, &changeInfo, &updateAllVgroups);
×
2365
  streamMutexUnlock(&execInfo.lock);
×
2366

2367
  if (code) {
×
2368
    goto _end;
×
2369
  }
2370

2371
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
×
2372
    mDebug("vnode(s) change detected, build trans to update stream task epsets");
×
2373

2374
    STrans *pTrans = NULL;
×
2375
    SArray* pStreamIdList = taosArrayInit(4, sizeof(int64_t));
×
2376

2377
    streamMutexLock(&execInfo.lock);
×
2378
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans, pStreamIdList);
×
2379

2380
    // remove the consensus-checkpoint-id req of all related stream(s)
2381
    int32_t num = taosArrayGetSize(pStreamIdList);
×
2382
    if (num > 0) {
×
2383
      mDebug("start to clear %d related stream in consensus-checkpoint-id list due to nodeUpdate", num);
×
2384
      for (int32_t x = 0; x < num; ++x) {
×
2385
        int64_t uid = *(int64_t *)taosArrayGet(pStreamIdList, x);
×
2386
        int32_t ret = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, uid);
×
2387
        if (ret != 0) {
×
2388
          mError("failed to remove stream:0x%" PRIx64 " from consensus-checkpoint-id list, code:%s", uid,
×
2389
                 tstrerror(ret));
2390
        }
2391
      }
2392
    }
2393

2394
    streamMutexUnlock(&execInfo.lock);
×
2395
    taosArrayDestroy(pStreamIdList);
×
2396

2397
    // NOTE: sync trans out of lock
2398
    if (code == 0 && pTrans != NULL) {
×
2399
      code = mndTransPrepare(pMnode, pTrans);
×
2400
      if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2401
        mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2402
      }
2403

2404
      mndTransDrop(pTrans);
×
2405
    }
2406

2407
    // keep the new vnode snapshot if success
2408
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
2409
      streamMutexLock(&execInfo.lock);
×
2410

2411
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
×
2412
      int32_t num = (int)taosArrayGetSize(execInfo.pNodeList);
×
2413
      if (code == 0) {
×
2414
        execInfo.ts = ts;
×
2415
        mDebug("create trans successfully, update cached node list, numOfNodes:%d", num);
×
2416
      }
2417

2418
      streamMutexUnlock(&execInfo.lock);
×
2419

2420
      if (code) {
×
2421
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
2422
        goto _end;
×
2423
      }
2424
    }
2425
  }
2426

2427
  mndDestroyVgroupChangeInfo(&changeInfo);
×
2428

2429
_end:
×
2430
  taosArrayDestroy(pNodeSnapshot);
×
2431

2432
  mDebug("end to do stream task node change checking, elapsed time:%" PRId64 "ms", taosGetTimestampMs() - tsms);
×
2433
  atomic_store_32(&mndNodeCheckSentinel, 0);
×
2434

2435
  return 0;
×
2436
}
2437

2438
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
×
2439
  SMnode *pMnode = pReq->info.node;
×
2440
  SSdb   *pSdb = pMnode->pSdb;
×
2441
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
×
2442
    return 0;
×
2443
  }
2444

2445
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
×
2446
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
×
2447
  if (pMsg == NULL) {
×
2448
    return terrno;
×
2449
  }
2450

2451
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
×
2452
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
2453
}
2454

2455
static int32_t mndProcessStatusCheck(SRpcMsg *pReq) {
×
2456
  SMnode *pMnode = pReq->info.node;
×
2457
  SSdb   *pSdb = pMnode->pSdb;
×
2458
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
×
2459
    return 0;
×
2460
  }
2461

2462
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
×
2463
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
×
2464
  if (pMsg == NULL) {
×
2465
    return terrno;
×
2466
  }
2467

2468
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
×
2469
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
2470
}
2471

2472
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
×
2473
  SStreamTaskIter *pIter = NULL;
×
2474
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
2475
  if (code) {
×
2476
    mError("failed to create task iter for stream:%s", pStream->name);
×
2477
    return;
×
2478
  }
2479

2480
  while (streamTaskIterNextTask(pIter)) {
×
2481
    SStreamTask *pTask = NULL;
×
2482
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
2483
    if (code) {
×
2484
      break;
×
2485
    }
2486

2487
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
×
2488
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
×
2489
    if (p == NULL) {
×
2490
      STaskStatusEntry entry = {0};
×
2491
      streamTaskStatusInit(&entry, pTask);
×
2492

2493
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
×
2494
      if (code == 0) {
×
2495
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
×
2496
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
×
2497
        if (px) {
×
2498
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2499
        } else {
2500
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2501
        }
2502
      } else {
2503
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2504
      }
2505

2506
      // add the new vgroups if not added yet
2507
      bool exist = false;
×
2508
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
×
2509
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
×
2510
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
×
2511
          exist = true;
×
2512
          break;
×
2513
        }
2514
      }
2515

2516
      if (!exist) {
×
2517
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
×
2518
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
×
2519

2520
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
×
2521
        if (px) {
×
2522
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
×
2523
        } else {
2524
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2525
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2526
        }
2527
      }
2528
    }
2529
  }
2530

2531
  destroyStreamTaskIter(pIter);
×
2532
}
2533

2534
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
×
2535
  int32_t num = taosArrayGetSize(pList);
×
2536
  for (int32_t i = 0; i < num; ++i) {
×
2537
    int32_t *pId = taosArrayGet(pList, i);
×
2538
    if (pId == NULL) {
×
2539
      continue;
×
2540
    }
2541

2542
    if (taskId == *pId) {
×
2543
      return;
×
2544
    }
2545
  }
2546

2547
  int32_t numOfTasks = taosArrayGetSize(pList);
×
2548
  void   *p = taosArrayPush(pList, &taskId);
×
2549
  if (p) {
×
2550
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
×
2551
  } else {
2552
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2553
           uid, numOfTasks);
2554
  }
2555
}
2556

2557
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
×
2558
  SMnode                  *pMnode = pReq->info.node;
×
2559
  SStreamTaskCheckpointReq req = {0};
×
2560

2561
  SDecoder decoder = {0};
×
2562
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
2563

2564
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
×
2565
    tDecoderClear(&decoder);
×
2566
    mError("invalid task checkpoint req msg received");
×
2567
    return TSDB_CODE_INVALID_MSG;
×
2568
  }
2569
  tDecoderClear(&decoder);
×
2570

2571
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
×
2572

2573
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
2574
  streamMutexLock(&execInfo.lock);
×
2575

2576
  SStreamObj *pStream = NULL;
×
2577
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
×
2578
  if (pStream == NULL || code != 0) {
×
2579
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2580
          req.streamId);
2581

2582
    // not in meta-store yet, try to acquire the task in exec buffer
2583
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2584
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2585
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2586
    if (p == NULL) {
×
2587
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2588
      streamMutexUnlock(&execInfo.lock);
×
2589
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2590
    } else {
2591
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2592
             req.streamId, req.taskId);
2593
    }
2594
  }
2595

2596
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
×
2597

2598
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
×
2599
  if (pReqTaskList == NULL) {
×
2600
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
×
2601
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
×
2602
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
×
2603
    if (code) {
×
2604
      mError("failed to put into transfer state stream map, code: out of memory");
×
2605
    }
2606
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
×
2607
  } else {
2608
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
×
2609
  }
2610

2611
  int32_t total = taosArrayGetSize(*pReqTaskList);
×
2612
  if (total == numOfTasks) {  // all tasks have sent the reqs
×
2613
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
×
2614
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
×
2615

2616
    if (pStream != NULL) {  // TODO:handle error
×
2617
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
×
2618
      if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2619
        mError("stream:0x%" PRIx64 " failed to create checkpoint trans, checkpointId:%" PRId64 ", code:%s",
×
2620
               req.streamId, checkpointId, tstrerror(code));
2621
      }
2622
    } else {
2623
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2624
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2625
      // sleep(500ms)
2626
    }
2627

2628
    // remove this entry, not overwriting the global error code
2629
    int32_t ret = taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
×
2630
    if (ret) {
×
2631
      mError("failed to remove transfer state stream, code:%s", tstrerror(ret));
×
2632
    }
2633

2634
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
×
2635
    mDebug("stream:0x%" PRIx64 " removed in transfer-state list, %d stream(s) not finish fill-history process",
×
2636
           req.streamId, numOfStreams);
2637
  }
2638

2639
  if (pStream != NULL) {
×
2640
    mndReleaseStream(pMnode, pStream);
×
2641
  }
2642

2643
  streamMutexUnlock(&execInfo.lock);
×
2644

2645
  {
2646
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
×
2647
    rsp.pCont = rpcMallocCont(rsp.contLen);
×
2648
    if (rsp.pCont == NULL) {
×
2649
      return terrno;
×
2650
    }
2651

2652
    SMsgHead *pHead = rsp.pCont;
×
2653
    pHead->vgId = htonl(req.nodeId);
×
2654

2655
    tmsgSendRsp(&rsp);
×
2656
    pReq->info.handle = NULL;  // disable auto rsp
×
2657
  }
2658

2659
  return 0;
×
2660
}
2661

2662
// valid the info according to the HbMsg
2663
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
×
2664
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
×
2665
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2666
  if (pTaskEntry == NULL) {
×
2667
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
×
2668
    return false;
×
2669
  }
2670

2671
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
×
2672
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2673
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2674
    return false;
×
2675
  }
2676

2677
  // now the task in checkpoint procedure
2678
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
×
2679
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2680
           " discard",
2681
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
2682
    return false;
×
2683
  }
2684

2685
  if (reportChkptId >= pReport->checkpointId) {
×
2686
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2687
           " discard",
2688
           pReport->taskId, pReport->checkpointId, reportChkptId);
2689
    return false;
×
2690
  }
2691

2692
  return true;
×
2693
}
2694

2695
static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) {
×
2696
  bool valid = validateChkptReport(pReport, reportedChkptId);
×
2697
  if (!valid) {
×
2698
    return;
×
2699
  }
2700

2701
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
2702
    STaskChkptInfo *p = taosArrayGet(pList, i);
×
2703
    if (p == NULL) {
×
2704
      continue;
×
2705
    }
2706

2707
    if (p->taskId == pReport->taskId) {
×
2708
      if (p->checkpointId > pReport->checkpointId) {
×
2709
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2710
               pReport->taskId, p->checkpointId, pReport->checkpointId);
2711
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
2712
        mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2713
              pReport->taskId, p->checkpointId, pReport->checkpointId);
2714

2715
        // update the checkpoint report info
2716
        p->checkpointId = pReport->checkpointId;
×
2717
        p->ts = pReport->checkpointTs;
×
2718
        p->version = pReport->checkpointVer;
×
2719
        p->transId = pReport->transId;
×
2720
        p->dropHTask = pReport->dropHTask;
×
2721
      } else {
2722
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2723
      }
2724
      return;
×
2725
    }
2726
  }
2727

2728
  STaskChkptInfo info = {
×
2729
      .streamId = pReport->streamId,
×
2730
      .taskId = pReport->taskId,
×
2731
      .transId = pReport->transId,
×
2732
      .dropHTask = pReport->dropHTask,
×
2733
      .version = pReport->checkpointVer,
×
2734
      .ts = pReport->checkpointTs,
×
2735
      .checkpointId = pReport->checkpointId,
×
2736
      .nodeId = pReport->nodeId,
×
2737
  };
2738

2739
  void *p = taosArrayPush(pList, &info);
×
2740
  if (p == NULL) {
×
2741
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2742
  } else {
2743
    int32_t size = taosArrayGetSize(pList);
×
2744
    mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report",
×
2745
           pReport->streamId, pReport->taskId, size);
2746
  }
2747
}
2748

2749
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
×
2750
  SMnode           *pMnode = pReq->info.node;
×
2751
  SCheckpointReport req = {0};
×
2752

2753
  SDecoder decoder = {0};
×
2754
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
2755

2756
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
×
2757
    tDecoderClear(&decoder);
×
2758
    mError("invalid task checkpoint-report msg received");
×
2759
    return TSDB_CODE_INVALID_MSG;
×
2760
  }
2761
  tDecoderClear(&decoder);
×
2762

2763
  streamMutexLock(&execInfo.lock);
×
2764
  mndInitStreamExecInfo(pMnode, &execInfo);
×
2765
  streamMutexUnlock(&execInfo.lock);
×
2766

2767
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
×
2768
         " checkpointVer:%" PRId64 " transId:%d",
2769
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2770

2771
  // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans.
2772
  streamMutexLock(&execInfo.lock);
×
2773

2774
  SStreamObj *pStream = NULL;
×
2775
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
×
2776
  if (pStream == NULL || code != 0) {
×
2777
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2778

2779
    // not in meta-store yet, try to acquire the task in exec buffer
2780
    // the checkpoint req arrives too soon before the completion of the creation of stream trans.
2781
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2782
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2783
    if (p == NULL) {
×
2784
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2785
      streamMutexUnlock(&execInfo.lock);
×
2786
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2787
    } else {
2788
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2789
             req.streamId, req.taskId);
2790
    }
2791
  }
2792

2793
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
×
2794

2795
  SChkptReportInfo *pInfo =
2796
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
×
2797
  if (pInfo == NULL) {
×
2798
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
×
2799
    if (info.pTaskList != NULL) {
×
2800
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
×
2801
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
×
2802
      if (code) {
×
2803
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2804
      }
2805

2806
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
×
2807
    }
2808
  } else {
2809
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
×
2810
  }
2811

2812
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
×
2813
  if (total == numOfTasks) {  // all tasks have sent the reqs
×
2814
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
×
2815
          " will be issued soon",
2816
          req.streamId, pStream->name, total, req.checkpointId);
2817
  }
2818

2819
  if (pStream != NULL) {
×
2820
    mndReleaseStream(pMnode, pStream);
×
2821
  }
2822

2823
  streamMutexUnlock(&execInfo.lock);
×
2824

2825
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
×
2826
  return code;
×
2827
}
2828

2829
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
×
2830
  int32_t num = 0;
×
2831
  int64_t chkId = INT64_MAX;
×
2832
  *pExistedTasks = 0;
×
2833
  *pAllSame = true;
×
2834

2835
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
2836
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
×
2837
    if (p == NULL) {
×
2838
      continue;
×
2839
    }
2840

2841
    if (p->streamId != streamId) {
×
2842
      continue;
×
2843
    }
2844

2845
    num += 1;
×
2846
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
2847
    if (chkId > pe->checkpointInfo.latestId) {
×
2848
      if (chkId != INT64_MAX) {
×
2849
        *pAllSame = false;
×
2850
        mDebug("checkpointIds not identical, prev:%" PRId64 " smaller:%" PRId64 " from task:0x%" PRIx64, chkId,
×
2851
               pe->checkpointInfo.latestId, pe->id.taskId);
2852
      }
2853
      chkId = pe->checkpointInfo.latestId;
×
2854
    }
2855
  }
2856

2857
  *pExistedTasks = num;
×
2858
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
×
2859
    return -1;
×
2860
  }
2861

2862
  return chkId;
×
2863
}
2864

2865
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
×
2866
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
×
2867
  rsp.pCont = rpcMallocCont(rsp.contLen);
×
2868
  if (rsp.pCont != NULL) {
×
2869
    SMsgHead *pHead = rsp.pCont;
×
2870
    pHead->vgId = htonl(vgId);
×
2871

2872
    tmsgSendRsp(&rsp);
×
2873
    pInfo->handle = NULL;  // disable auto rsp
×
2874
  }
2875
}
×
2876

2877
static int32_t doCleanReqList(SArray *pList, SCheckpointConsensusInfo *pInfo) {
×
2878
  int32_t alreadySend = taosArrayGetSize(pList);
×
2879

2880
  for (int32_t i = 0; i < alreadySend; ++i) {
×
2881
    int32_t *taskId = taosArrayGet(pList, i);
×
2882
    if (taskId == NULL) {
×
2883
      continue;
×
2884
    }
2885

2886
    for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
×
2887
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
×
2888
      if ((pe != NULL) && (pe->req.taskId == *taskId)) {
×
2889
        taosArrayRemove(pInfo->pTaskList, k);
×
2890
        break;
×
2891
      }
2892
    }
2893
  }
2894

2895
  return alreadySend;
×
2896
}
2897

2898
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
2✔
2899
  SMnode *pMnode = pMsg->info.node;
2✔
2900
  int64_t now = taosGetTimestampMs();
2✔
2901
  bool    allReady = true;
2✔
2902
  SArray *pNodeSnapshot = NULL;
2✔
2903
  int32_t numOfTrans = 0;
2✔
2904
  int32_t code = 0;
2✔
2905
  void   *pIter = NULL;
2✔
2906

2907
  SArray *pList = taosArrayInit(4, sizeof(int32_t));
2✔
2908
  if (pList == NULL) {
2!
2909
    return terrno;
×
2910
  }
2911

2912
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
2✔
2913
  if (pStreamList == NULL) {
2!
2914
    taosArrayDestroy(pList);
×
2915
    return terrno;
×
2916
  }
2917

2918
  SHashObj* pTermMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
2✔
2919
  if (pTermMap == NULL) {
2!
2920
    taosArrayDestroy(pList);
×
2921
    taosArrayDestroy(pStreamList);
×
2922
    return terrno;
×
2923
  }
2924

2925
  mDebug("start to process consensus-checkpointId in tmr");
2!
2926

2927
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot, pTermMap);
2✔
2928
  taosArrayDestroy(pNodeSnapshot);
2✔
2929
  if (code) {
2!
2930
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
2931
  }
2932

2933
  if (!allReady) {
2!
2934
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
2!
2935
    taosArrayDestroy(pStreamList);
2✔
2936
    taosArrayDestroy(pList);
2✔
2937
    taosHashCleanup(pTermMap);
2✔
2938
    return 0;
2✔
2939
  }
2940

2941
  streamMutexLock(&execInfo.lock);
×
2942

2943
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
×
2944
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
×
2945

2946
    taosArrayClear(pList);
×
2947

2948
    int64_t     streamId = -1;
×
2949
    int32_t     num = taosArrayGetSize(pInfo->pTaskList);
×
2950
    SStreamObj *pStream = NULL;
×
2951

2952
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
×
2953
    if (pStream == NULL || code != 0) {  // stream has been dropped already
×
2954
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
2955
      void *p = taosArrayPush(pStreamList, &pInfo->streamId);
×
2956
      if (p == NULL) {
×
2957
        mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
×
2958
               " code:%s, continue",
2959
               pInfo->streamId, tstrerror(terrno));
2960
      }
2961
      continue;
×
2962
    }
2963

2964
    if (pStream->uid != pInfo->streamId) {
×
2965
      // todo remove it
2966
    }
2967

2968
    if ((num < pInfo->numOfTasks) || (pInfo->numOfTasks == 0)) {
×
2969
      mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-consensus req(not all), ignore", pStream->uid,
×
2970
             pStream->name, num, pInfo->numOfTasks);
2971
      mndReleaseStream(pMnode, pStream);
×
2972
      continue;
×
2973
    }
2974

2975
    streamId = pStream->uid;
×
2976

2977
    int32_t existed = 0;
×
2978
    bool    allSame = true;
×
2979
    int64_t chkId = getConsensusId(pInfo->streamId, pInfo->numOfTasks, &existed, &allSame);
×
2980
    if (chkId == -1) {
×
2981
      mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again", existed, pInfo->numOfTasks);
×
2982
      mndReleaseStream(pMnode, pStream);
×
2983
      continue;
×
2984
    }
2985

2986
    bool allQualified = true;
×
2987
    for (int32_t j = 0; j < num; ++j) {
×
2988
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
×
2989
      if (pe == NULL) {
×
2990
        continue;
×
2991
      }
2992

2993
      if (pe->req.nodeId != -2) {
×
2994
        int32_t *pTerm = taosHashGet(pTermMap, &(pe->req.nodeId), sizeof(pe->req.nodeId));
×
2995
        if (pTerm == NULL) {
×
2996
          mError("stream:0x%" PRIx64 " s-task:0x%x req from vgId:%d not found in termMap", pe->req.streamId,
×
2997
                 pe->req.taskId, pe->req.nodeId);
2998
          allQualified = false;
×
2999
          continue;
×
3000
        } else {
3001
          if (*pTerm != pe->req.term) {
×
3002
            mWarn("stream:0x%" PRIx64 " s-task:0x%x req from vgId:%d is expired, term:%d, current term:%d",
×
3003
                  pe->req.streamId, pe->req.taskId, pe->req.nodeId, pe->req.term, *pTerm);
3004
            allQualified = false;
×
3005
            continue;
×
3006
          }
3007
        }
3008
      }
3009

3010
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
×
3011
        mDebug("s-task:0x%x vgId:%d term:%d sendTs:%" PRId64 " wait %.2fs or all tasks have same checkpointId:%" PRId64, pe->req.taskId,
×
3012
               pe->req.nodeId, pe->req.term, pe->req.startTs, (now - pe->ts) / 1000.0, chkId);
3013
        if (chkId > pe->req.checkpointId) {
×
3014
          streamMutexUnlock(&execInfo.lock);
×
3015

3016
          taosArrayDestroy(pStreamList);
×
3017
          taosArrayDestroy(pList);
×
3018
          taosHashCleanup(pTermMap);
×
3019

3020
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
3021
                 pe->req.checkpointId, chkId);
3022

3023
          mndReleaseStream(pMnode, pStream);
×
3024
          taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
3025
          return TSDB_CODE_FAILED;
×
3026
        }
3027

3028
      } else {
3029
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
×
3030
               pe->req.startTs, (now - pe->ts) / 1000.0);
3031
        allQualified = false;
×
3032
      }
3033
    }
3034

3035
    if (allQualified) {
×
3036
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_CONSEN_NAME, false);
×
3037

3038
      if (code == 0) {
×
3039
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, chkId, pInfo->pTaskList);
×
3040
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3041
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
3042
        } else {
3043
          numOfTrans += 1;
×
3044
          mndClearConsensusRspEntry(pInfo);
×
3045
          void *p = taosArrayPush(pStreamList, &streamId);
×
3046
          if (p == NULL) {
×
3047
            mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list",
×
3048
                   streamId);
3049
          }
3050
        }
3051
      } else {
3052
        mDebug("stream:0x%" PRIx64 "not create chktp-consensus, due to trans conflict", pStream->uid);
×
3053
      }
3054
    }
3055

3056
    mndReleaseStream(pMnode, pStream);
×
3057

3058
    // create one transaction each time
3059
    if (numOfTrans > 0) {
×
3060
      taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
3061
      break;
×
3062
    }
3063
  }
3064

3065
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
×
3066
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
×
3067
    if (pStreamId == NULL) {
×
3068
      continue;
×
3069
    }
3070

3071
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
×
3072
  }
3073

3074
  streamMutexUnlock(&execInfo.lock);
×
3075

3076
  taosArrayDestroy(pStreamList);
×
3077
  taosArrayDestroy(pList);
×
3078
  taosHashCleanup(pTermMap);
×
3079

3080
  mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
×
3081
  return code;
×
3082
}
3083

3084
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
×
3085
  int32_t code = mndProcessCreateStreamReq(pReq);
×
3086
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3087
    pReq->info.rsp = rpcMallocCont(1);
×
3088
    if (pReq->info.rsp == NULL) {
×
3089
      return terrno;
×
3090
    }
3091

3092
    pReq->info.rspLen = 1;
×
3093
    pReq->info.noResp = false;
×
3094
    pReq->code = code;
×
3095
  }
3096
  return code;
×
3097
}
3098

3099
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
×
3100
  int32_t code = mndProcessDropStreamReq(pReq);
×
3101
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3102
    pReq->info.rsp = rpcMallocCont(1);
×
3103
    if (pReq->info.rsp == NULL) {
×
3104
      return terrno;
×
3105
    }
3106

3107
    pReq->info.rspLen = 1;
×
3108
    pReq->info.noResp = false;
×
3109
    pReq->code = code;
×
3110
  }
3111
  return code;
×
3112
}
3113

3114
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
1✔
3115
  if (pExecInfo->initTaskList || pMnode == NULL) {
1!
3116
    return;
1✔
3117
  }
3118

3119
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
×
3120
  pExecInfo->initTaskList = true;
×
3121
}
3122

3123
void mndStreamResetInitTaskListLoadFlag() {
8✔
3124
  mInfo("reset task list buffer init flag for leader");
8!
3125
  execInfo.initTaskList = false;
8✔
3126
}
8✔
3127

3128
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
8✔
3129
  execInfo.switchFromFollower = false;
8✔
3130

3131
  if (execInfo.role == NODE_ROLE_UNINIT) {
8!
3132
    execInfo.role = role;
8✔
3133
    if (role == NODE_ROLE_LEADER) {
8!
3134
      mInfo("init mnode is set to leader");
8!
3135
    } else {
3136
      mInfo("init mnode is set to follower");
×
3137
    }
3138
  } else {
3139
    if (role == NODE_ROLE_LEADER) {
×
3140
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
×
3141
        execInfo.role = role;
×
3142
        execInfo.switchFromFollower = true;
×
3143
        mInfo("mnode switch to be leader from follower");
×
3144
      } else {
3145
        mInfo("mnode remain to be leader, do nothing");
×
3146
      }
3147
    } else {  // follower's
3148
      if (execInfo.role == NODE_ROLE_LEADER) {
×
3149
        execInfo.role = role;
×
3150
        mInfo("mnode switch to be follower from leader");
×
3151
      } else {
3152
        mInfo("mnode remain to be follower, do nothing");
×
3153
      }
3154
    }
3155
  }
3156
}
8✔
3157

3158
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
×
3159
  SSdb       *pSdb = pMnode->pSdb;
×
3160
  SStreamObj *pStream = NULL;
×
3161
  void       *pIter = NULL;
×
3162

3163
  while (1) {
3164
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
3165
    if (pIter == NULL) {
×
3166
      break;
×
3167
    }
3168

3169
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
×
3170
    sdbRelease(pSdb, pStream);
×
3171
  }
3172
}
×
3173

3174
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
×
3175
  STrans *pTrans = NULL;
×
3176
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
×
3177
                               "update checkpoint-info", &pTrans);
3178
  if (pTrans == NULL || code) {
×
3179
    sdbRelease(pMnode->pSdb, pStream);
×
3180
    return code;
×
3181
  }
3182

3183
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
×
3184
  if (code) {
×
3185
    sdbRelease(pMnode->pSdb, pStream);
×
3186
    mndTransDrop(pTrans);
×
3187
    return code;
×
3188
  }
3189

3190
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
×
3191
  if (code) {
×
3192
    sdbRelease(pMnode->pSdb, pStream);
×
3193
    mndTransDrop(pTrans);
×
3194
    return code;
×
3195
  }
3196

3197
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
3198
  if (code) {
×
3199
    sdbRelease(pMnode->pSdb, pStream);
×
3200
    mndTransDrop(pTrans);
×
3201
    return code;
×
3202
  }
3203

3204
  code = mndTransPrepare(pMnode, pTrans);
×
3205
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3206
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
3207
    sdbRelease(pMnode->pSdb, pStream);
×
3208
    mndTransDrop(pTrans);
×
3209
    return code;
×
3210
  }
3211

3212
  sdbRelease(pMnode->pSdb, pStream);
×
3213
  mndTransDrop(pTrans);
×
3214

3215
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
3216
}
3217

3218
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
×
3219
  SMnode      *pMnode = pReq->info.node;
×
3220
  int32_t      code = 0;
×
3221
  SOrphanTask *pTask = NULL;
×
3222
  int32_t      i = 0;
×
3223
  STrans      *pTrans = NULL;
×
3224
  int32_t      numOfTasks = 0;
×
3225

3226
  SMStreamDropOrphanMsg msg = {0};
×
3227
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
×
3228
  if (code) {
×
3229
    return code;
×
3230
  }
3231

3232
  numOfTasks = taosArrayGetSize(msg.pList);
×
3233
  if (numOfTasks == 0) {
×
3234
    mDebug("no orphan tasks to drop, no need to create trans");
×
3235
    goto _err;
×
3236
  }
3237

3238
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
×
3239

3240
  i = 0;
×
3241
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
×
3242
    i += 1;
×
3243
  }
3244

3245
  if (pTask == NULL) {
×
3246
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
3247
    goto _err;
×
3248
  }
3249

3250
  // check if it is conflict with other trans in both sourceDb and targetDb.
3251
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
×
3252
  if (code) {
×
3253
    goto _err;
×
3254
  }
3255

3256
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
×
3257

3258
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
×
3259
  if (pTrans == NULL || code != 0) {
×
3260
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3261
    goto _err;
×
3262
  }
3263

3264
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
×
3265
  if (code) {
×
3266
    goto _err;
×
3267
  }
3268

3269
  // drop all tasks
3270
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
×
3271
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
3272
    goto _err;
×
3273
  }
3274

3275
  // drop stream
3276
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
3277
    goto _err;
×
3278
  }
3279

3280
  code = mndTransPrepare(pMnode, pTrans);
×
3281
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3282
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
3283
    goto _err;
×
3284
  }
3285

3286
_err:
×
3287
  tDestroyDropOrphanTaskMsg(&msg);
×
3288
  mndTransDrop(pTrans);
×
3289

3290
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
3291
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
×
3292
  }
3293
  return code;
×
3294
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc