• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3720

26 Mar 2025 06:20AM UTC coverage: 30.242% (-31.7%) from 61.936%
#3720

push

travis-ci

web-flow
feat(taosBenchmark): supports decimal data type (#30456)

* feat: taosBenchmark supports decimal data type

* build: decimal script not use pytest.sh

* fix: fix typo for decimal script

* test: insertBasic.py debug

71234 of 313946 branches covered (22.69%)

Branch coverage included in aggregate %.

38 of 423 new or added lines in 8 files covered. (8.98%)

120240 existing lines in 447 files now uncovered.

118188 of 312400 relevant lines covered (37.83%)

1450220.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.13
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndScheduler.h"
21
#include "mndShow.h"
22
#include "mndStb.h"
23
#include "mndTrans.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 60
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStreamExecInfo execInfo;
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
43
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq);
44
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq);
45
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
46

47
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
48
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
49

50
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
51
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
52
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
53
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
55
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
56
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
57
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
58
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
59
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
60
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
61
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
62
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
63
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
64
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
65
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
66
static void    doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
67
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
68
static void    saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
69

70
static void     addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
71
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
72

73
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
74
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
75
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
76
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
77
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
78

79
int32_t mndInitStream(SMnode *pMnode) {
21✔
80
  SSdbTable table = {
21✔
81
      .sdbType = SDB_STREAM,
82
      .keyType = SDB_KEY_BINARY,
83
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
84
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
85
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
86
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
87
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
88
  };
89
  SSdbTable tableSeq = {
21✔
90
      .sdbType = SDB_STREAM_SEQ,
91
      .keyType = SDB_KEY_BINARY,
92
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
93
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
94
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
95
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
96
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
97
  };
98

99
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
21✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_FAILED_STREAM, mndProcessFailedStreamReq);
21✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_CHECK_STREAM_TIMER, mndProcessCheckStreamStatusReq);
21✔
102
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
21✔
103
  mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
21✔
104

105
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
21✔
106
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
21✔
107
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
21✔
108
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
21✔
109
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
21✔
110
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_START_RSP, mndTransProcessRsp);
21✔
111
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
21✔
112
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
21✔
113
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
21✔
114
  mndSetMsgHandle(pMnode, TDMT_STREAM_CONSEN_CHKPT_RSP, mndTransProcessRsp);
21✔
115

116
  // for msgs inside mnode
117
  // TODO change the name
118
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
21✔
119
  mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
21✔
120
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
21✔
121
  mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
21✔
122

123
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
21✔
124
  mndSetMsgHandle(pMnode, TDMT_VND_STREAM_ALL_STOP_RSP, mndTransProcessRsp);
21✔
125
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
21✔
126
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
21✔
127
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);
21✔
128
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
21✔
129
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
21✔
130
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
21✔
131
  mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
21✔
132
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
21✔
133
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
21✔
134
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
21✔
135

136
  mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
21✔
137
  mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessPauseStreamReq);
21✔
138
  mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessPauseStreamReq);
21✔
139
  mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
21✔
140
  mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
21✔
141

142
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
21✔
143
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
21✔
144
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
21✔
145
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
21✔
146

147
  int32_t code = mndInitExecInfo();
21✔
148
  if (code) {
21!
UNCOV
149
    return code;
×
150
  }
151

152
  code = sdbSetTable(pMnode->pSdb, table);
21✔
153
  if (code) {
21!
UNCOV
154
    return code;
×
155
  }
156

157
  code = sdbSetTable(pMnode->pSdb, tableSeq);
21✔
158
  return code;
21✔
159
}
160

161
void mndCleanupStream(SMnode *pMnode) {
21✔
162
  taosArrayDestroy(execInfo.pTaskList);
21✔
163
  taosArrayDestroy(execInfo.pNodeList);
21✔
164
  taosArrayDestroy(execInfo.pKilledChkptTrans);
21✔
165
  taosHashCleanup(execInfo.pTaskMap);
21✔
166
  taosHashCleanup(execInfo.transMgmt.pDBTrans);
21✔
167
  taosHashCleanup(execInfo.pTransferStateStreams);
21✔
168
  taosHashCleanup(execInfo.pChkptStreams);
21✔
169
  taosHashCleanup(execInfo.pStreamConsensus);
21✔
170
  (void)taosThreadMutexDestroy(&execInfo.lock);
21✔
171
  mDebug("mnd stream exec info cleanup");
21!
172
}
21✔
173

UNCOV
174
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
×
UNCOV
175
  int32_t     code = 0;
×
UNCOV
176
  int32_t     lino = 0;
×
UNCOV
177
  SSdbRow    *pRow = NULL;
×
UNCOV
178
  SStreamObj *pStream = NULL;
×
UNCOV
179
  void       *buf = NULL;
×
UNCOV
180
  int8_t      sver = 0;
×
181
  int32_t     tlen;
UNCOV
182
  int32_t     dataPos = 0;
×
183

UNCOV
184
  code = sdbGetRawSoftVer(pRaw, &sver);
×
185
  TSDB_CHECK_CODE(code, lino, _over);
×
186

UNCOV
187
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
×
UNCOV
188
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
UNCOV
189
    goto _over;
×
190
  }
191

UNCOV
192
  pRow = sdbAllocRow(sizeof(SStreamObj));
×
UNCOV
193
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
×
194

UNCOV
195
  pStream = sdbGetRowObj(pRow);
×
UNCOV
196
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
×
197

UNCOV
198
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
×
199

UNCOV
200
  buf = taosMemoryMalloc(tlen + 1);
×
UNCOV
201
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
×
202

UNCOV
203
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
×
204

205
  SDecoder decoder;
UNCOV
206
  tDecoderInit(&decoder, buf, tlen + 1);
×
UNCOV
207
  code = tDecodeSStreamObj(&decoder, pStream, sver);
×
208
  tDecoderClear(&decoder);
×
209

UNCOV
210
  if (code < 0) {
×
UNCOV
211
    tFreeStreamObj(pStream);
×
212
  }
213

UNCOV
214
_over:
×
215
  taosMemoryFreeClear(buf);
×
216

217
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
218
    char *p = (pStream == NULL) ? "null" : pStream->name;
×
219
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
220
    taosMemoryFreeClear(pRow);
×
221

UNCOV
222
    terrno = code;
×
UNCOV
223
    return NULL;
×
224
  } else {
UNCOV
225
    mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
×
226
           pStream->checkpointId);
227

UNCOV
228
    terrno = 0;
×
UNCOV
229
    return pRow;
×
230
  }
231
}
232

UNCOV
233
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
×
UNCOV
234
  mTrace("stream:%s, perform insert action", pStream->name);
×
UNCOV
235
  return 0;
×
236
}
237

UNCOV
238
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
×
UNCOV
239
  mInfo("stream:%s, perform delete action", pStream->name);
×
UNCOV
240
  taosWLockLatch(&pStream->lock);
×
UNCOV
241
  tFreeStreamObj(pStream);
×
UNCOV
242
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
243
  return 0;
×
244
}
245

UNCOV
246
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
×
UNCOV
247
  mTrace("stream:%s, perform update action", pOldStream->name);
×
UNCOV
248
  (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
×
249

UNCOV
250
  taosWLockLatch(&pOldStream->lock);
×
251

UNCOV
252
  pOldStream->status = pNewStream->status;
×
UNCOV
253
  pOldStream->updateTime = pNewStream->updateTime;
×
UNCOV
254
  pOldStream->checkpointId = pNewStream->checkpointId;
×
UNCOV
255
  pOldStream->checkpointFreq = pNewStream->checkpointFreq;
×
UNCOV
256
  if (pOldStream->pTaskList == NULL) {
×
UNCOV
257
    pOldStream->pTaskList = pNewStream->pTaskList;
×
UNCOV
258
    pNewStream->pTaskList = NULL;
×
259
  }
UNCOV
260
  if (pOldStream->pHTaskList == NULL) {
×
UNCOV
261
    pOldStream->pHTaskList = pNewStream->pHTaskList;
×
UNCOV
262
    pNewStream->pHTaskList = NULL;
×
263
  }
UNCOV
264
  taosWUnLockLatch(&pOldStream->lock);
×
UNCOV
265
  return 0;
×
266
}
267

UNCOV
268
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
×
UNCOV
269
  int32_t code = 0;
×
UNCOV
270
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
271
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
UNCOV
272
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
UNCOV
273
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
274
  }
UNCOV
275
  return code;
×
276
}
277

UNCOV
278
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
×
UNCOV
279
  SSdb *pSdb = pMnode->pSdb;
×
280
  sdbRelease(pSdb, pStream);
×
281
}
×
282

283
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
284
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
UNCOV
285
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
UNCOV
286
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
UNCOV
287
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
288

289
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
×
UNCOV
290
  if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
×
UNCOV
291
      pCreate->targetStbFullName[0] == 0) {
×
UNCOV
292
    return TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
293
  }
UNCOV
294
  return TSDB_CODE_SUCCESS;
×
295
}
296

UNCOV
297
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
×
298
  pWrapper->nCols = taosArrayGetSize(pFields);
×
UNCOV
299
  pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
×
UNCOV
300
  if (NULL == pWrapper->pSchema) {
×
UNCOV
301
    return terrno;
×
302
  }
303

UNCOV
304
  int32_t index = 0;
×
305
  for (int32_t i = 0; i < pWrapper->nCols; i++) {
×
UNCOV
306
    SField *pField = (SField *)taosArrayGet(pFields, i);
×
UNCOV
307
    if (pField == NULL) {
×
UNCOV
308
      return terrno;
×
309
    }
310

UNCOV
311
    if (TSDB_DATA_TYPE_NULL == pField->type) {
×
UNCOV
312
      pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
×
UNCOV
313
      pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
×
314
    } else {
UNCOV
315
      pWrapper->pSchema[index].type = pField->type;
×
UNCOV
316
      pWrapper->pSchema[index].bytes = pField->bytes;
×
317
    }
UNCOV
318
    pWrapper->pSchema[index].colId = index + 1;
×
UNCOV
319
    tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
×
UNCOV
320
    pWrapper->pSchema[index].flags = pField->flags;
×
UNCOV
321
    index += 1;
×
322
  }
323

UNCOV
324
  return TSDB_CODE_SUCCESS;
×
325
}
326

UNCOV
327
static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
×
UNCOV
328
  if (pWrapper->nCols < 2) {
×
UNCOV
329
    return false;
×
330
  }
UNCOV
331
  for (int32_t i = 1; i < pWrapper->nCols; i++) {
×
UNCOV
332
    if (pWrapper->pSchema[i].flags & COL_IS_KEY) {
×
UNCOV
333
      return true;
×
334
    }
335
  }
UNCOV
336
  return false;
×
337
}
338

UNCOV
339
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
×
UNCOV
340
  SNode      *pAst = NULL;
×
UNCOV
341
  SQueryPlan *pPlan = NULL;
×
UNCOV
342
  int32_t     code = 0;
×
343

UNCOV
344
  mInfo("stream:%s to create", pCreate->name);
×
UNCOV
345
  memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
×
UNCOV
346
  pObj->createTime = taosGetTimestampMs();
×
UNCOV
347
  pObj->updateTime = pObj->createTime;
×
UNCOV
348
  pObj->version = 1;
×
349

UNCOV
350
  if (pCreate->smaId > 0) {
×
UNCOV
351
    pObj->subTableWithoutMd5 = 1;
×
352
  }
353

UNCOV
354
  pObj->smaId = pCreate->smaId;
×
UNCOV
355
  pObj->indexForMultiAggBalance = -1;
×
356

UNCOV
357
  pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
×
358

UNCOV
359
  char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
×
UNCOV
360
  snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
×
361

UNCOV
362
  pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
×
UNCOV
363
  pObj->status = STREAM_STATUS__NORMAL;
×
364

UNCOV
365
  pObj->conf.igExpired = pCreate->igExpired;
×
UNCOV
366
  pObj->conf.trigger = pCreate->triggerType;
×
UNCOV
367
  pObj->conf.triggerParam = pCreate->maxDelay;
×
UNCOV
368
  pObj->conf.watermark = pCreate->watermark;
×
UNCOV
369
  pObj->conf.fillHistory = pCreate->fillHistory;
×
UNCOV
370
  pObj->deleteMark = pCreate->deleteMark;
×
UNCOV
371
  pObj->igCheckUpdate = pCreate->igUpdate;
×
372

373
  memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
×
374
  SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
×
UNCOV
375
  if (pSourceDb == NULL) {
×
376
    code = terrno;
×
UNCOV
377
    mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
×
378
          tstrerror(code));
UNCOV
379
    goto _ERR;
×
380
  }
381

UNCOV
382
  pObj->sourceDbUid = pSourceDb->uid;
×
UNCOV
383
  mndReleaseDb(pMnode, pSourceDb);
×
384

UNCOV
385
  memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
386

387
  SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
×
UNCOV
388
  if (pTargetDb == NULL) {
×
389
    code = terrno;
×
UNCOV
390
    mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
×
391
           tstrerror(code));
UNCOV
392
    goto _ERR;
×
393
  }
394

UNCOV
395
  tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
×
396

UNCOV
397
  if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
×
UNCOV
398
    pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
399
  } else {
UNCOV
400
    pObj->targetStbUid = pCreate->targetStbUid;
×
401
  }
UNCOV
402
  pObj->targetDbUid = pTargetDb->uid;
×
UNCOV
403
  mndReleaseDb(pMnode, pTargetDb);
×
404

UNCOV
405
  pObj->sql = pCreate->sql;
×
UNCOV
406
  pObj->ast = pCreate->ast;
×
407

UNCOV
408
  pCreate->sql = NULL;
×
UNCOV
409
  pCreate->ast = NULL;
×
410

411
  // deserialize ast
UNCOV
412
  if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
×
UNCOV
413
    goto _ERR;
×
414
  }
415

416
  // create output schema
UNCOV
417
  if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
×
UNCOV
418
    goto _ERR;
×
419
  }
420

UNCOV
421
  int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
×
UNCOV
422
  if (numOfNULL > 0) {
×
423
    pObj->outputSchema.nCols += numOfNULL;
×
424
    SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
×
UNCOV
425
    if (!pFullSchema) {
×
UNCOV
426
      code = terrno;
×
UNCOV
427
      goto _ERR;
×
428
    }
429

UNCOV
430
    int32_t nullIndex = 0;
×
431
    int32_t dataIndex = 0;
×
432
    for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
×
433
      if (nullIndex >= numOfNULL) {
×
434
        pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
435
        pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
436
        pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
UNCOV
437
        tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
UNCOV
438
        pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
UNCOV
439
        dataIndex++;
×
440
      } else {
UNCOV
441
        SColLocation *pos = NULL;
×
UNCOV
442
        if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
×
UNCOV
443
          pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
×
444
        }
445

UNCOV
446
        if (pos == NULL) {
×
UNCOV
447
          mError("invalid null column index, %d", nullIndex);
×
UNCOV
448
          continue;
×
449
        }
450

UNCOV
451
        if (i < pos->slotId) {
×
UNCOV
452
          pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
×
UNCOV
453
          pFullSchema[i].colId = i + 1;  // pObj->outputSchema.pSchema[dataIndex].colId;
×
UNCOV
454
          pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
×
UNCOV
455
          tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
×
UNCOV
456
          pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
×
UNCOV
457
          dataIndex++;
×
458
        } else {
UNCOV
459
          pFullSchema[i].bytes = 0;
×
UNCOV
460
          pFullSchema[i].colId = pos->colId;
×
UNCOV
461
          pFullSchema[i].flags = COL_SET_NULL;
×
UNCOV
462
          memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
×
UNCOV
463
          pFullSchema[i].type = pos->type;
×
UNCOV
464
          nullIndex++;
×
465
        }
466
      }
467
    }
468

UNCOV
469
    taosMemoryFree(pObj->outputSchema.pSchema);
×
UNCOV
470
    pObj->outputSchema.pSchema = pFullSchema;
×
471
  }
472

UNCOV
473
  SPlanContext cxt = {
×
474
      .pAstRoot = pAst,
475
      .topicQuery = false,
476
      .streamQuery = true,
477
      .triggerType =
UNCOV
478
          (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
×
UNCOV
479
      .watermark = pObj->conf.watermark,
×
UNCOV
480
      .igExpired = pObj->conf.igExpired,
×
UNCOV
481
      .deleteMark = pObj->deleteMark,
×
UNCOV
482
      .igCheckUpdate = pObj->igCheckUpdate,
×
UNCOV
483
      .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
×
UNCOV
484
      .recalculateInterval = pCreate->recalculateInterval,
×
485
  };
UNCOV
486
  char *pTargetFStable = strchr(pCreate->targetStbFullName, '.');
×
UNCOV
487
  if (pTargetFStable != NULL) {
×
UNCOV
488
    pTargetFStable = pTargetFStable + 1;
×
489
  }
UNCOV
490
  tstrncpy(cxt.pStbFullName, pTargetFStable, TSDB_TABLE_FNAME_LEN);
×
UNCOV
491
  tstrncpy(cxt.pWstartName, pCreate->pWstartName, TSDB_COL_NAME_LEN);
×
UNCOV
492
  tstrncpy(cxt.pWendName, pCreate->pWendName, TSDB_COL_NAME_LEN);
×
UNCOV
493
  tstrncpy(cxt.pGroupIdName, pCreate->pGroupIdName, TSDB_COL_NAME_LEN);
×
UNCOV
494
  tstrncpy(cxt.pIsWindowFilledName, pCreate->pIsWindowFilledName, TSDB_COL_NAME_LEN);
×
495

496
  // using ast and param to build physical plan
UNCOV
497
  if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
×
UNCOV
498
    goto _ERR;
×
499
  }
500

501
  // save physcial plan
UNCOV
502
  if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
×
UNCOV
503
    goto _ERR;
×
504
  }
505

UNCOV
506
  pObj->tagSchema.nCols = pCreate->numOfTags;
×
507
  if (pCreate->numOfTags) {
×
508
    pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
×
UNCOV
509
    if (pObj->tagSchema.pSchema == NULL) {
×
UNCOV
510
      code = terrno;
×
UNCOV
511
      goto _ERR;
×
512
    }
513
  }
514

515
  /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
516
  for (int32_t i = 0; i < pCreate->numOfTags; i++) {
×
UNCOV
517
    SField *pField = taosArrayGet(pCreate->pTags, i);
×
UNCOV
518
    if (pField == NULL) {
×
UNCOV
519
      continue;
×
520
    }
521

UNCOV
522
    pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
×
UNCOV
523
    pObj->tagSchema.pSchema[i].bytes = pField->bytes;
×
UNCOV
524
    pObj->tagSchema.pSchema[i].flags = pField->flags;
×
UNCOV
525
    pObj->tagSchema.pSchema[i].type = pField->type;
×
UNCOV
526
    memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
×
527
  }
528

UNCOV
529
_ERR:
×
UNCOV
530
  if (pAst != NULL) nodesDestroyNode(pAst);
×
UNCOV
531
  if (pPlan != NULL) qDestroyQueryPlan(pPlan);
×
UNCOV
532
  return code;
×
533
}
534

UNCOV
535
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
×
536
  SEncoder encoder;
537
  tEncoderInit(&encoder, NULL, 0);
×
538

UNCOV
539
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
UNCOV
540
    pTask->ver = SSTREAM_TASK_VER;
×
541
  }
542

543
  int32_t code = tEncodeStreamTask(&encoder, pTask);
×
UNCOV
544
  if (code == -1) {
×
UNCOV
545
    tEncoderClear(&encoder);
×
UNCOV
546
    return TSDB_CODE_INVALID_MSG;
×
547
  }
548

UNCOV
549
  int32_t size = encoder.pos;
×
UNCOV
550
  int32_t tlen = sizeof(SMsgHead) + size;
×
UNCOV
551
  tEncoderClear(&encoder);
×
552

UNCOV
553
  void *buf = taosMemoryCalloc(1, tlen);
×
UNCOV
554
  if (buf == NULL) {
×
UNCOV
555
    return terrno;
×
556
  }
557

UNCOV
558
  ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
×
559

UNCOV
560
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
UNCOV
561
  tEncoderInit(&encoder, abuf, size);
×
UNCOV
562
  code = tEncodeStreamTask(&encoder, pTask);
×
563
  tEncoderClear(&encoder);
×
564

565
  if (code != 0) {
×
UNCOV
566
    mError("failed to encode stream task, code:%s", tstrerror(code));
×
UNCOV
567
    taosMemoryFree(buf);
×
UNCOV
568
    return code;
×
569
  }
570

571
  code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
×
572
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
UNCOV
573
  if (code) {
×
UNCOV
574
    taosMemoryFree(buf);
×
575
  }
576

UNCOV
577
  return code;
×
578
}
579

UNCOV
580
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
×
581
  SStreamTaskIter *pIter = NULL;
×
582
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
583
  if (code) {
×
UNCOV
584
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
585
    return code;
×
586
  }
587

UNCOV
588
  while (streamTaskIterNextTask(pIter)) {
×
589
    SStreamTask *pTask = NULL;
×
590
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
591
    if (code) {
×
UNCOV
592
      destroyStreamTaskIter(pIter);
×
UNCOV
593
      return code;
×
594
    }
595

596
    code = mndPersistTaskDeployReq(pTrans, pTask);
×
UNCOV
597
    if (code) {
×
UNCOV
598
      destroyStreamTaskIter(pIter);
×
UNCOV
599
      return code;
×
600
    }
601
  }
602

UNCOV
603
  destroyStreamTaskIter(pIter);
×
604

605
  // persistent stream task for already stored ts data
UNCOV
606
  if (pStream->conf.fillHistory || (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE)) {
×
UNCOV
607
    int32_t level = taosArrayGetSize(pStream->pHTaskList);
×
608

UNCOV
609
    for (int32_t i = 0; i < level; i++) {
×
UNCOV
610
      SArray *pLevel = taosArrayGetP(pStream->pHTaskList, i);
×
611

UNCOV
612
      int32_t numOfTasks = taosArrayGetSize(pLevel);
×
UNCOV
613
      for (int32_t j = 0; j < numOfTasks; j++) {
×
614
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
×
UNCOV
615
        code = mndPersistTaskDeployReq(pTrans, pTask);
×
UNCOV
616
        if (code) {
×
UNCOV
617
          return code;
×
618
        }
619
      }
620
    }
621
  }
622

UNCOV
623
  return code;
×
624
}
625

626
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
627
  int32_t code = 0;
×
UNCOV
628
  if ((code = mndPersistStreamTasks(pTrans, pStream)) < 0) {
×
UNCOV
629
    return code;
×
630
  }
631

UNCOV
632
  return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
633
}
634

UNCOV
635
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
×
UNCOV
636
  SStbObj *pStb = NULL;
×
UNCOV
637
  SDbObj  *pDb = NULL;
×
UNCOV
638
  int32_t  code = 0;
×
UNCOV
639
  int32_t  lino = 0;
×
640

UNCOV
641
  SMCreateStbReq createReq = {0};
×
UNCOV
642
  tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
643
  createReq.numOfColumns = pStream->outputSchema.nCols;
×
UNCOV
644
  createReq.numOfTags = 1;  // group id
×
UNCOV
645
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
×
UNCOV
646
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
×
647

648
  // build fields
UNCOV
649
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
×
UNCOV
650
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
×
UNCOV
651
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
652

UNCOV
653
    tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
×
UNCOV
654
    pField->flags = pStream->outputSchema.pSchema[i].flags;
×
UNCOV
655
    pField->type = pStream->outputSchema.pSchema[i].type;
×
UNCOV
656
    pField->bytes = pStream->outputSchema.pSchema[i].bytes;
×
UNCOV
657
    pField->compress = createDefaultColCmprByType(pField->type);
×
UNCOV
658
    if (IS_DECIMAL_TYPE(pField->type)) {
×
UNCOV
659
      uint8_t prec = 0, scale = 0;
×
UNCOV
660
      extractDecimalTypeInfoFromBytes(&pField->bytes, &prec, &scale);
×
UNCOV
661
      pField->typeMod = decimalCalcTypeMod(prec, scale);
×
662
    }
663
  }
664

UNCOV
665
  if (pStream->tagSchema.nCols == 0) {
×
UNCOV
666
    createReq.numOfTags = 1;
×
UNCOV
667
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
UNCOV
668
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
669

670
    // build tags
UNCOV
671
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
UNCOV
672
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
673

UNCOV
674
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
UNCOV
675
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
UNCOV
676
    pField->flags = 0;
×
UNCOV
677
    pField->bytes = 8;
×
678
  } else {
UNCOV
679
    createReq.numOfTags = pStream->tagSchema.nCols;
×
UNCOV
680
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
×
UNCOV
681
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
682

683
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
×
UNCOV
684
      SField *pField = taosArrayGet(createReq.pTags, i);
×
UNCOV
685
      if (pField == NULL) {
×
UNCOV
686
        continue;
×
687
      }
688

UNCOV
689
      pField->bytes = pStream->tagSchema.pSchema[i].bytes;
×
UNCOV
690
      pField->flags = pStream->tagSchema.pSchema[i].flags;
×
UNCOV
691
      pField->type = pStream->tagSchema.pSchema[i].type;
×
UNCOV
692
      tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
×
693
    }
694
  }
695

UNCOV
696
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
×
UNCOV
697
    goto _OVER;
×
698
  }
699

700
  pStb = mndAcquireStb(pMnode, createReq.name);
×
UNCOV
701
  if (pStb != NULL) {
×
UNCOV
702
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
UNCOV
703
    goto _OVER;
×
704
  }
705

706
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
×
UNCOV
707
  if (pDb == NULL) {
×
UNCOV
708
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
UNCOV
709
    goto _OVER;
×
710
  }
711

UNCOV
712
  int32_t numOfStbs = -1;
×
UNCOV
713
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
×
UNCOV
714
    goto _OVER;
×
715
  }
716

UNCOV
717
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
×
UNCOV
718
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
UNCOV
719
    goto _OVER;
×
720
  }
721

722
  SStbObj stbObj = {0};
×
723

UNCOV
724
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
×
UNCOV
725
    goto _OVER;
×
726
  }
727

728
  stbObj.uid = pStream->targetStbUid;
×
729

UNCOV
730
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
×
UNCOV
731
    mndFreeStb(&stbObj);
×
UNCOV
732
    goto _OVER;
×
733
  }
734

UNCOV
735
  tFreeSMCreateStbReq(&createReq);
×
UNCOV
736
  mndFreeStb(&stbObj);
×
UNCOV
737
  mndReleaseStb(pMnode, pStb);
×
UNCOV
738
  mndReleaseDb(pMnode, pDb);
×
739
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
×
740
  return code;
×
741

742
_OVER:
×
UNCOV
743
  tFreeSMCreateStbReq(&createReq);
×
744
  mndReleaseStb(pMnode, pStb);
×
UNCOV
745
  mndReleaseDb(pMnode, pDb);
×
746

UNCOV
747
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
×
748
         tstrerror(code));
UNCOV
749
  return code;
×
750
}
751

752
// 1. stream number check
753
// 2. target stable can not be target table of other existed streams.
UNCOV
754
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
×
UNCOV
755
  int32_t     numOfStream = 0;
×
UNCOV
756
  SStreamObj *pStream = NULL;
×
UNCOV
757
  void       *pIter = NULL;
×
758

UNCOV
759
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
UNCOV
760
    if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
×
UNCOV
761
      ++numOfStream;
×
762
    }
763

764

765
    if (numOfStream > MND_STREAM_MAX_NUM) {
×
766
      mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
×
767
             pStreamObj->name);
UNCOV
768
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
769
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
770
      return TSDB_CODE_MND_TOO_MANY_STREAMS;
×
771
    }
772

UNCOV
773
    if (pStream->targetStbUid == pStreamObj->targetStbUid) {
×
UNCOV
774
      mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
×
775
             pStreamObj->name);
UNCOV
776
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
777
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
778
      return TSDB_CODE_MND_INVALID_TARGET_TABLE;
×
779
    }
UNCOV
780
    sdbRelease(pMnode->pSdb, pStream);
×
781
  }
782

783
  return TSDB_CODE_SUCCESS;
×
784
}
785

UNCOV
786
static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
×
787

788
static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
×
789
                                       SStreamTask *pTask) {
790
  int32_t code = TSDB_CODE_SUCCESS;
×
791
  int32_t lino = 0;
×
792

793
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
794
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
795

796
  pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
×
797
  TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
×
798
  pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
×
799
  pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
×
800
  pTask->notifyInfo.streamName = taosStrdup(mndGetDbStr(createReq->name));
×
801
  TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
×
802
  pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
×
UNCOV
803
  TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
×
804
  pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
×
805
  TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
×
806

UNCOV
807
_end:
×
808
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
809
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
810
  }
UNCOV
811
  return code;
×
812
}
813

UNCOV
814
static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
×
UNCOV
815
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
816
  int32_t lino = 0;
×
UNCOV
817
  int32_t level = 0;
×
UNCOV
818
  int32_t nTasks = 0;
×
UNCOV
819
  SArray *pLevel = NULL;
×
820

UNCOV
821
  TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
UNCOV
822
  TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
823

UNCOV
824
  if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
×
825
    goto _end;
×
826
  }
827

828
  level = taosArrayGetSize(pStream->pTaskList);
×
829
  for (int32_t i = 0; i < level; ++i) {
×
830
    pLevel = taosArrayGetP(pStream->pTaskList, i);
×
831
    nTasks = taosArrayGetSize(pLevel);
×
UNCOV
832
    for (int32_t j = 0; j < nTasks; ++j) {
×
UNCOV
833
      code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
UNCOV
834
      TSDB_CHECK_CODE(code, lino, _end);
×
835
    }
836
  }
837

838
  if (pStream->conf.fillHistory && createReq->notifyHistory) {
×
839
    level = taosArrayGetSize(pStream->pHTaskList);
×
840
    for (int32_t i = 0; i < level; ++i) {
×
841
      pLevel = taosArrayGetP(pStream->pHTaskList, i);
×
842
      nTasks = taosArrayGetSize(pLevel);
×
UNCOV
843
      for (int32_t j = 0; j < nTasks; ++j) {
×
UNCOV
844
        code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
×
UNCOV
845
        TSDB_CHECK_CODE(code, lino, _end);
×
846
      }
847
    }
848
  }
849

UNCOV
850
_end:
×
UNCOV
851
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
852
    mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
×
853
  }
UNCOV
854
  return code;
×
855
}
856

UNCOV
857
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq) {
×
UNCOV
858
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
859
  SStreamObj *pStream = NULL;
×
UNCOV
860
  void       *pIter = NULL;
×
861

862
  while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
×
UNCOV
863
    taosWLockLatch(&pStream->lock);
×
UNCOV
864
    if (pStream->status == STREAM_STATUS__INIT && (taosGetTimestampMs() - pStream->createTime > tsStreamFailedTimeout ||
×
UNCOV
865
                                                   taosGetTimestampMs() - pStream->createTime < 0)){
×
UNCOV
866
      pStream->status = STREAM_STATUS__FAILED;
×
UNCOV
867
      tstrncpy(pStream->reserve, "timeout", sizeof(pStream->reserve));
×
UNCOV
868
      mInfo("stream:%s, set status to failed success because of timeout", pStream->name);
×
869
    }
UNCOV
870
    taosWUnLockLatch(&pStream->lock);
×
UNCOV
871
    sdbRelease(pMnode->pSdb, pStream);
×
872
  }
873

874
  return 0;
×
875
}
876

877
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq) {
×
878
  SMnode     *pMnode = pReq->info.node;
×
879
  SStreamObj *pStream = NULL;
×
880
  int32_t     code = TSDB_CODE_SUCCESS;
×
UNCOV
881
  int32_t     errCode = *(int32_t*)pReq->pCont;
×
UNCOV
882
  char streamName[TSDB_STREAM_FNAME_LEN] = {0};
×
UNCOV
883
  memcpy(streamName, POINTER_SHIFT(pReq->pCont,INT_BYTES), TMIN(pReq->contLen - INT_BYTES, TSDB_STREAM_FNAME_LEN - 1));
×
884

885
#ifdef WINDOWS
886
  code = TSDB_CODE_MND_INVALID_PLATFORM;
887
  return code;
888
#endif
889

890
  mInfo("stream:%s, start to set stream failed", streamName);
×
891

892
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
UNCOV
893
  if (pStream == NULL) {
×
UNCOV
894
    mError("stream:%s, failed to get stream when failed stream since %s", streamName, tstrerror(code));
×
895
    return code;
×
896
  }
897

898
  taosWLockLatch(&pStream->lock);
×
899
  pStream->status = STREAM_STATUS__FAILED;
×
UNCOV
900
  tstrncpy(pStream->reserve, tstrerror(errCode), sizeof(pStream->reserve));
×
901
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
902
  mndReleaseStream(pMnode, pStream);
×
903

UNCOV
904
  mInfo("stream:%s, end to set stream failed success", streamName);
×
905

UNCOV
906
  return code;
×
907
}
908

UNCOV
909
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
×
UNCOV
910
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
911
  SStreamObj *pStream = NULL;
×
UNCOV
912
  SStreamObj  streamObj = {0};
×
UNCOV
913
  char       *sql = NULL;
×
UNCOV
914
  int32_t     sqlLen = 0;
×
UNCOV
915
  const char *pMsg = "create stream tasks on dnodes";
×
UNCOV
916
  int32_t     code = TSDB_CODE_SUCCESS;
×
UNCOV
917
  int32_t     lino = 0;
×
UNCOV
918
  STrans     *pTrans = NULL;
×
919

UNCOV
920
  SCMCreateStreamReq createReq = {0};
×
UNCOV
921
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
×
UNCOV
922
  TSDB_CHECK_CODE(code, lino, _OVER);
×
923

924
#ifdef WINDOWS
925
  code = TSDB_CODE_MND_INVALID_PLATFORM;
926
  goto _OVER;
927
#endif
928

929
  mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
×
UNCOV
930
  if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
×
UNCOV
931
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
UNCOV
932
    goto _OVER;
×
933
  }
934

UNCOV
935
  code = mndAcquireStream(pMnode, createReq.name, &pStream);
×
UNCOV
936
  if (pStream != NULL && code == 0) {
×
UNCOV
937
    if (pStream->pTaskList != NULL){
×
UNCOV
938
      if (createReq.igExists) {
×
UNCOV
939
        mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
×
UNCOV
940
        mndReleaseStream(pMnode, pStream);
×
UNCOV
941
        tFreeSCMCreateStreamReq(&createReq);
×
UNCOV
942
        return code;
×
943
      } else {
UNCOV
944
        code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
UNCOV
945
        goto _OVER;
×
946
      }
947
    }
UNCOV
948
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
UNCOV
949
    goto _OVER;
×
950
  }
951

UNCOV
952
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
×
UNCOV
953
    goto _OVER;
×
954
  }
955

UNCOV
956
  if (createReq.sql != NULL) {
×
UNCOV
957
    sql = taosStrdup(createReq.sql);
×
UNCOV
958
    TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
×
959
  }
960

961
  // check for the taskEp update trans
962
  if (isNodeUpdateTransActive()) {
×
UNCOV
963
    mError("stream:%s failed to create stream, node update trans is active", createReq.name);
×
UNCOV
964
    code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
UNCOV
965
    goto _OVER;
×
966
  }
967

968
  SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
×
UNCOV
969
  if (pSourceDb == NULL) {
×
970
    code = terrno;
×
UNCOV
971
    mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
×
972
          tstrerror(code));
UNCOV
973
    goto _OVER;
×
974
  }
975

UNCOV
976
  code = mndCheckForSnode(pMnode, pSourceDb);
×
UNCOV
977
  mndReleaseDb(pMnode, pSourceDb);
×
UNCOV
978
  if (code != 0) {
×
UNCOV
979
    goto _OVER;
×
980
  }
981

982
  // build stream obj from request
UNCOV
983
  if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
×
UNCOV
984
    mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
×
UNCOV
985
    goto _OVER;
×
986
  }
987

988
  bool buildEmptyStream = false;
×
UNCOV
989
  if (createReq.lastTs == 0 && createReq.fillHistory != STREAM_FILL_HISTORY_OFF){
×
UNCOV
990
    streamObj.status = STREAM_STATUS__INIT;
×
UNCOV
991
    buildEmptyStream = true;
×
992
  }
993

UNCOV
994
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
×
UNCOV
995
    goto _OVER;
×
996
  }
997

UNCOV
998
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
×
UNCOV
999
    goto _OVER;
×
1000
  }
1001

UNCOV
1002
  code = doStreamCheck(pMnode, &streamObj);
×
UNCOV
1003
  TSDB_CHECK_CODE(code, lino, _OVER);
×
1004

1005
  // schedule stream task for stream obj
1006
  if (!buildEmptyStream) {
×
1007
    code = mndScheduleStream(pMnode, &streamObj, &createReq);
×
1008
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1009
      mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
×
UNCOV
1010
      mndTransDrop(pTrans);
×
UNCOV
1011
      goto _OVER;
×
1012
    }
1013

1014
    // add notify info into all stream tasks
1015
    code = addStreamNotifyInfo(&createReq, &streamObj);
×
1016
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1017
      mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
×
UNCOV
1018
      mndTransDrop(pTrans);
×
UNCOV
1019
      goto _OVER;
×
1020
    }
1021

1022
    // add into buffer firstly
1023
    // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
UNCOV
1024
    streamMutexLock(&execInfo.lock);
×
UNCOV
1025
    mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
×
UNCOV
1026
    saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
×
UNCOV
1027
    streamMutexUnlock(&execInfo.lock);
×
1028
  }
1029

UNCOV
1030
  code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
×
UNCOV
1031
  if (pTrans == NULL || code) {
×
UNCOV
1032
    goto _OVER;
×
1033
  }
1034

1035
  // create stb for stream
1036
  if (createReq.createStb == STREAM_CREATE_STABLE_TRUE && !buildEmptyStream) {
×
UNCOV
1037
    if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
×
UNCOV
1038
      mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
×
UNCOV
1039
      goto _OVER;
×
1040
    }
1041
  } else {
UNCOV
1042
    mDebug("stream:%s no need create stable", createReq.name);
×
1043
  }
1044

1045
  // add stream to trans
1046
  code = mndPersistStream(pTrans, &streamObj);
×
UNCOV
1047
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1048
    mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
×
UNCOV
1049
    goto _OVER;
×
1050
  }
1051

1052
  // execute creation
1053
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1054
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1055
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
1056
    goto _OVER;
×
1057
  }
1058

UNCOV
1059
  SName dbname = {0};
×
UNCOV
1060
  if (tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) != 0) {
×
UNCOV
1061
    mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
×
1062
  }
1063

UNCOV
1064
  SName name = {0};
×
UNCOV
1065
  if (tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE) != 0) {
×
UNCOV
1066
    mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
×
1067
  }
1068

1069
  // reuse this function for stream
UNCOV
1070
  if (sql != NULL && sqlLen > 0) {
×
UNCOV
1071
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
×
1072
  } else {
UNCOV
1073
    char detail[1000] = {0};
×
UNCOV
1074
    snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
×
UNCOV
1075
    auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
×
1076
  }
1077

UNCOV
1078
_OVER:
×
UNCOV
1079
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1080
    mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, tstrerror(code));
×
1081
  } else {
UNCOV
1082
    mDebug("stream:%s create stream completed", createReq.name);
×
UNCOV
1083
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1084
  }
1085

UNCOV
1086
  mndTransDrop(pTrans);
×
UNCOV
1087
  mndReleaseStream(pMnode, pStream);
×
UNCOV
1088
  tFreeSCMCreateStreamReq(&createReq);
×
UNCOV
1089
  tFreeStreamObj(&streamObj);
×
1090

UNCOV
1091
  if (sql != NULL) {
×
UNCOV
1092
    taosMemoryFreeClear(sql);
×
1093
  }
1094

1095
  return code;
×
1096
}
1097

1098
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
×
1099
  SMnode          *pMnode = pReq->info.node;
×
UNCOV
1100
  SStreamObj      *pStream = NULL;
×
1101
  int32_t          code = 0;
×
1102
  SMPauseStreamReq pauseReq = {0};
×
1103

UNCOV
1104
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
1105
    return TSDB_CODE_INVALID_MSG;
×
1106
  }
1107

1108
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
1109
  if (pStream == NULL || code != 0) {
×
UNCOV
1110
    if (pauseReq.igNotExists) {
×
1111
      mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
×
1112
      return 0;
×
1113
    } else {
UNCOV
1114
      mError("stream:%s not exist, failed to restart stream", pauseReq.name);
×
UNCOV
1115
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1116
    }
1117
  }
1118

1119
  mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
×
UNCOV
1120
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
UNCOV
1121
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1122
    return code;
×
1123
  }
1124

1125
  // check if it is conflict with other trans in both sourceDb and targetDb.
1126
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_STOP_NAME, true);
×
UNCOV
1127
  if (code) {
×
UNCOV
1128
    sdbRelease(pMnode->pSdb, pStream);
×
1129
    return code;
×
1130
  }
1131

1132
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
1133
  if (updated) {
×
UNCOV
1134
    mError("tasks are not ready for restart, node update detected");
×
UNCOV
1135
    sdbRelease(pMnode->pSdb, pStream);
×
1136
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1137
  }
1138

1139
  STrans *pTrans = NULL;
×
1140
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, "stop the stream",
×
1141
                       &pTrans);
1142
  if (pTrans == NULL || code) {
×
UNCOV
1143
    mError("stream:%s failed to stop stream since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1144
    sdbRelease(pMnode->pSdb, pStream);
×
1145
    return code;
×
1146
  }
1147

1148
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_STOP_NAME, pStream->uid);
×
1149
  if (code) {
×
UNCOV
1150
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1151
    mndTransDrop(pTrans);
×
UNCOV
1152
    return code;
×
1153
  }
1154

1155
  // if nodeUpdate happened, not send pause trans
1156
  code = mndStreamSetStopAction(pMnode, pTrans, pStream);
×
1157
  if (code) {
×
1158
    mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1159
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1160
    mndTransDrop(pTrans);
×
1161
    return code;
×
1162
  }
1163

1164
  code = mndTransPrepare(pMnode, pTrans);
×
1165
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1166
    mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1167
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1168
    mndTransDrop(pTrans);
×
1169
    return code;
×
1170
  }
1171

1172
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1173
  mndTransDrop(pTrans);
×
1174

UNCOV
1175
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1176
}
1177

UNCOV
1178
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
×
UNCOV
1179
  SStreamObj *pStream = NULL;
×
UNCOV
1180
  void       *pIter = NULL;
×
UNCOV
1181
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1182
  int64_t     maxChkptId = 0;
×
1183

1184
  while (1) {
UNCOV
1185
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
1186
    if (pIter == NULL) break;
×
1187

UNCOV
1188
    maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
×
UNCOV
1189
    mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64, pStream, pStream->name, pStream->uid,
×
1190
           pStream->checkpointId);
UNCOV
1191
    sdbRelease(pSdb, pStream);
×
1192
  }
1193

1194
  {  // check the max checkpoint id from all vnodes.
UNCOV
1195
    int64_t maxCheckpointId = -1;
×
UNCOV
1196
    if (lock) {
×
UNCOV
1197
      streamMutexLock(&execInfo.lock);
×
1198
    }
1199

UNCOV
1200
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
1201
      STaskId          *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
1202
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
1203
      if (p == NULL || pEntry == NULL) {
×
UNCOV
1204
        continue;
×
1205
      }
1206

UNCOV
1207
      if (pEntry->checkpointInfo.failed) {
×
UNCOV
1208
        continue;
×
1209
      }
1210

UNCOV
1211
      if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
×
UNCOV
1212
        maxCheckpointId = pEntry->checkpointInfo.latestId;
×
1213
      }
1214
    }
1215

UNCOV
1216
    if (lock) {
×
UNCOV
1217
      streamMutexUnlock(&execInfo.lock);
×
1218
    }
1219

1220
    if (maxCheckpointId > maxChkptId) {
×
UNCOV
1221
      mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
×
1222
             maxCheckpointId);
UNCOV
1223
      maxChkptId = maxCheckpointId;
×
1224
    }
1225
  }
1226

UNCOV
1227
  mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
×
UNCOV
1228
  return maxChkptId + 1;
×
1229
}
1230

UNCOV
1231
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
×
1232
                                               int8_t mndTrigger, bool lock) {
UNCOV
1233
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1234
  bool    conflict = false;
×
UNCOV
1235
  int64_t ts = taosGetTimestampMs();
×
1236
  STrans *pTrans = NULL;
×
1237

UNCOV
1238
  if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
×
UNCOV
1239
    return code;
×
1240
  }
1241

UNCOV
1242
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
×
UNCOV
1243
  if (code) {
×
UNCOV
1244
    mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
×
1245
          pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
UNCOV
1246
    goto _ERR;
×
1247
  }
1248

1249
  code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
×
1250
                       "gen checkpoint for stream", &pTrans);
1251
  if (code) {
×
UNCOV
1252
    mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
×
1253
           tstrerror(code));
UNCOV
1254
    goto _ERR;
×
1255
  }
1256

1257
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
×
UNCOV
1258
  if (code) {
×
UNCOV
1259
    mError("failed to register checkpoint trans for stream:%s, checkpointId:%" PRId64, pStream->name, checkpointId);
×
UNCOV
1260
    goto _ERR;
×
1261
  }
1262

UNCOV
1263
  mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64, pStream->name, checkpointId);
×
1264

UNCOV
1265
  taosWLockLatch(&pStream->lock);
×
UNCOV
1266
  pStream->currentTick = 1;
×
1267

1268
  // 1. redo action: broadcast checkpoint source msg for all source vg
UNCOV
1269
  int32_t totalLevel = taosArrayGetSize(pStream->pTaskList);
×
UNCOV
1270
  for (int32_t i = 0; i < totalLevel; i++) {
×
UNCOV
1271
    SArray      *pLevel = taosArrayGetP(pStream->pTaskList, i);
×
UNCOV
1272
    SStreamTask *p = taosArrayGetP(pLevel, 0);
×
1273

UNCOV
1274
    if (p->info.taskLevel == TASK_LEVEL__SOURCE) {
×
UNCOV
1275
      int32_t sz = taosArrayGetSize(pLevel);
×
UNCOV
1276
      for (int32_t j = 0; j < sz; j++) {
×
UNCOV
1277
        SStreamTask *pTask = taosArrayGetP(pLevel, j);
×
1278
        code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
×
1279

UNCOV
1280
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1281
          taosWUnLockLatch(&pStream->lock);
×
UNCOV
1282
          goto _ERR;
×
1283
        }
1284
      }
1285
    }
1286
  }
1287

1288
  // 2. reset tick
UNCOV
1289
  pStream->checkpointId = checkpointId;
×
UNCOV
1290
  pStream->checkpointFreq = taosGetTimestampMs();
×
UNCOV
1291
  pStream->currentTick = 0;
×
1292

1293
  // 3. commit log: stream checkpoint info
UNCOV
1294
  pStream->version = pStream->version + 1;
×
1295
  taosWUnLockLatch(&pStream->lock);
×
1296

UNCOV
1297
  if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
×
UNCOV
1298
    goto _ERR;
×
1299
  }
1300

UNCOV
1301
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
1302
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1303
    mError("failed to prepare checkpoint trans since %s", tstrerror(code));
×
1304
  } else {
UNCOV
1305
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1306
  }
1307

UNCOV
1308
_ERR:
×
UNCOV
1309
  mndTransDrop(pTrans);
×
UNCOV
1310
  return code;
×
1311
}
1312

1313
int32_t extractStreamNodeList(SMnode *pMnode) {
37✔
1314
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
37!
1315
    int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
37✔
1316
    if (code) {
37!
UNCOV
1317
      mError("Failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
1318
      return code;
×
1319
    }
1320
  }
1321

1322
  return taosArrayGetSize(execInfo.pNodeList);
37✔
1323
}
1324

1325
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
37✔
1326
  int32_t code = 0;
37✔
1327
  if (mndStreamNodeIsUpdated(pMnode)) {
37!
UNCOV
1328
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1329
  }
1330

1331
  streamMutexLock(&execInfo.lock);
37✔
1332
  if (taosArrayGetSize(execInfo.pNodeList) == 0) {
37!
1333
    mDebug("stream task node change checking done, no vgroups exist, do nothing");
37!
1334
    if (taosArrayGetSize(execInfo.pTaskList) != 0) {
37!
UNCOV
1335
      mError("stream task node change checking done, no vgroups exist, but task list is not empty");
×
UNCOV
1336
      code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1337
    }
1338
  }
1339

1340
  streamMutexUnlock(&execInfo.lock);
37✔
1341
  return code;
37✔
1342
}
1343

UNCOV
1344
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
×
UNCOV
1345
  int64_t ts = -1;
×
UNCOV
1346
  int32_t taskId = -1;
×
1347

UNCOV
1348
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
×
UNCOV
1349
    STaskId          *p = taosArrayGet(pTaskList, i);
×
UNCOV
1350
    STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
1351
    if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
×
UNCOV
1352
      continue;
×
1353
    }
1354

1355
    // -1 denote not ready now or never ready till now
UNCOV
1356
    if (pEntry->hTaskId != 0) {
×
UNCOV
1357
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
×
1358
            " exists, checkpoint not issued",
1359
            pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
1360
            pEntry->hTaskId);
UNCOV
1361
      return -1;
×
1362
    }
1363

UNCOV
1364
    if (pEntry->status != TASK_STATUS__READY) {
×
UNCOV
1365
      mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
×
1366
            (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
UNCOV
1367
      return -1;
×
1368
    }
1369

UNCOV
1370
    if (ts < pEntry->startTime) {
×
UNCOV
1371
      ts = pEntry->startTime;
×
UNCOV
1372
      taskId = pEntry->id.taskId;
×
1373
    }
1374
  }
1375

UNCOV
1376
  mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
×
UNCOV
1377
  return ts;
×
1378
}
1379

1380
typedef struct {
1381
  int64_t streamId;
1382
  int64_t duration;
1383
} SCheckpointInterval;
1384

UNCOV
1385
static int32_t streamWaitComparFn(const void *p1, const void *p2) {
×
UNCOV
1386
  const SCheckpointInterval *pInt1 = p1;
×
UNCOV
1387
  const SCheckpointInterval *pInt2 = p2;
×
UNCOV
1388
  if (pInt1->duration == pInt2->duration) {
×
UNCOV
1389
    return 0;
×
1390
  }
1391

UNCOV
1392
  return pInt1->duration > pInt2->duration ? -1 : 1;
×
1393
}
1394

1395
// all tasks of this stream should be ready, otherwise do nothing
UNCOV
1396
static bool isStreamReadyHelp(int64_t now, SStreamObj *pStream) {
×
UNCOV
1397
  bool ready = false;
×
1398

UNCOV
1399
  streamMutexLock(&execInfo.lock);
×
1400

UNCOV
1401
  int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
×
UNCOV
1402
  if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
×
UNCOV
1403
    if (lastReadyTs != -1) {
×
UNCOV
1404
      mInfo("not start checkpoint, stream:0x%" PRIx64 " readyTs:%" PRId64 " ready duration:%.2fs less than threshold",
×
1405
            pStream->uid, lastReadyTs, (now - lastReadyTs) / 1000.0);
1406
    }
1407

UNCOV
1408
    ready = false;
×
1409
  } else {
UNCOV
1410
    ready = true;
×
1411
  }
1412

UNCOV
1413
  streamMutexUnlock(&execInfo.lock);
×
UNCOV
1414
  return ready;
×
1415
}
1416

1417
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
37✔
1418
  SMnode     *pMnode = pReq->info.node;
37✔
1419
  SSdb       *pSdb = pMnode->pSdb;
37✔
1420
  void       *pIter = NULL;
37✔
1421
  SStreamObj *pStream = NULL;
37✔
1422
  int32_t     code = 0;
37✔
1423
  int32_t     numOfCheckpointTrans = 0;
37✔
1424
  SArray     *pLongChkpts = NULL;
37✔
1425
  SArray     *pList = NULL;
37✔
1426
  int64_t     now = taosGetTimestampMs();
37✔
1427

1428
  if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
37!
UNCOV
1429
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1430
  }
1431

1432
  pList = taosArrayInit(4, sizeof(SCheckpointInterval));
37✔
1433
  if (pList == NULL) {
37!
UNCOV
1434
    mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
UNCOV
1435
    return terrno;
×
1436
  }
1437

1438
  pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo));
37✔
1439
  if (pLongChkpts == NULL) {
37!
UNCOV
1440
    mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno));
×
UNCOV
1441
    taosArrayDestroy(pList);
×
UNCOV
1442
    return terrno;
×
1443
  }
1444

1445
  // check if ongong checkpoint trans or long chkpt trans exist.
1446
  code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts);
37✔
1447
  if (code) {
37!
1448
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
1449

UNCOV
1450
    taosArrayDestroy(pList);
×
UNCOV
1451
    taosArrayDestroy(pLongChkpts);
×
UNCOV
1452
    return code;
×
1453
  }
1454

1455
  // kill long exec checkpoint and set task status
1456
  if (taosArrayGetSize(pLongChkpts) > 0) {
37!
1457
    killChkptAndResetStreamTask(pMnode, pLongChkpts);
×
1458

UNCOV
1459
    taosArrayDestroy(pList);
×
UNCOV
1460
    taosArrayDestroy(pLongChkpts);
×
UNCOV
1461
    return TSDB_CODE_SUCCESS;
×
1462
  }
1463

1464
  taosArrayDestroy(pLongChkpts);
37✔
1465

1466
  while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
37!
UNCOV
1467
    int64_t duration = now - pStream->checkpointFreq;
×
UNCOV
1468
    if (duration < tsStreamCheckpointInterval * 1000) {
×
UNCOV
1469
      sdbRelease(pSdb, pStream);
×
UNCOV
1470
      continue;
×
1471
    }
1472

UNCOV
1473
    bool ready = isStreamReadyHelp(now, pStream);
×
UNCOV
1474
    if (!ready) {
×
UNCOV
1475
      sdbRelease(pSdb, pStream);
×
UNCOV
1476
      continue;
×
1477
    }
1478

UNCOV
1479
    SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
×
UNCOV
1480
    void               *p = taosArrayPush(pList, &in);
×
UNCOV
1481
    if (p) {
×
UNCOV
1482
      int32_t currentSize = taosArrayGetSize(pList);
×
UNCOV
1483
      mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
×
1484
             "s), concurrently launch threshold:%d",
1485
             pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
1486
             tsMaxConcurrentCheckpoint);
1487
    } else {
UNCOV
1488
      mError("failed to record the checkpoint interval info, stream:0x%" PRIx64, pStream->uid);
×
1489
    }
UNCOV
1490
    sdbRelease(pSdb, pStream);
×
1491
  }
1492

1493
  int32_t size = taosArrayGetSize(pList);
37✔
1494
  if (size == 0) {
37!
1495
    taosArrayDestroy(pList);
37✔
1496
    return code;
37✔
1497
  }
1498

UNCOV
1499
  taosArraySort(pList, streamWaitComparFn);
×
1500

UNCOV
1501
  int32_t numOfQual = taosArrayGetSize(pList);
×
UNCOV
1502
  if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
×
UNCOV
1503
    mDebug(
×
1504
        "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
1505
        "checkpoint trans are not allowed, wait for 30s",
1506
        numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
UNCOV
1507
    taosArrayDestroy(pList);
×
UNCOV
1508
    return code;
×
1509
  }
1510

UNCOV
1511
  int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
×
UNCOV
1512
  mDebug(
×
1513
      "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
1514
      "concurrent trans threshold:%d",
1515
      numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
1516

UNCOV
1517
  int32_t started = 0;
×
UNCOV
1518
  int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
×
1519

1520
  for (int32_t i = 0; i < numOfQual; ++i) {
×
UNCOV
1521
    SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
×
UNCOV
1522
    if (pCheckpointInfo == NULL) {
×
UNCOV
1523
      continue;
×
1524
    }
1525

UNCOV
1526
    SStreamObj *p = NULL;
×
UNCOV
1527
    code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
×
UNCOV
1528
    if (p != NULL && code == 0) {
×
UNCOV
1529
      code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
×
UNCOV
1530
      sdbRelease(pSdb, p);
×
1531

UNCOV
1532
      if (code == 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1533
        started += 1;
×
1534

UNCOV
1535
        if (started >= capacity) {
×
UNCOV
1536
          mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
×
1537
                 (started + numOfCheckpointTrans));
1538
          break;
×
1539
        }
1540
      } else {
UNCOV
1541
        mError("failed to start checkpoint trans, code:%s", tstrerror(code));
×
1542
      }
1543
    }
1544
  }
1545

UNCOV
1546
  taosArrayDestroy(pList);
×
UNCOV
1547
  return code;
×
1548
}
1549

UNCOV
1550
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
×
UNCOV
1551
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1552
  SStreamObj *pStream = NULL;
×
UNCOV
1553
  int32_t     code = 0;
×
1554

1555
  SMDropStreamReq dropReq = {0};
×
1556
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
×
UNCOV
1557
    mError("invalid drop stream msg recv, discarded");
×
UNCOV
1558
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1559
    TAOS_RETURN(code);
×
1560
  }
1561

UNCOV
1562
  mDebug("recv drop stream:%s msg", dropReq.name);
×
1563

UNCOV
1564
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
×
UNCOV
1565
  if (pStream == NULL || code != 0) {
×
UNCOV
1566
    if (dropReq.igNotExists) {
×
UNCOV
1567
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
UNCOV
1568
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1569
      tFreeMDropStreamReq(&dropReq);
×
UNCOV
1570
      return 0;
×
1571
    } else {
UNCOV
1572
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
UNCOV
1573
      tFreeMDropStreamReq(&dropReq);
×
UNCOV
1574
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1575
    }
1576
  }
1577

UNCOV
1578
  if (pStream->smaId != 0) {
×
UNCOV
1579
    mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
×
1580

UNCOV
1581
    void    *pIter = NULL;
×
UNCOV
1582
    SSmaObj *pSma = NULL;
×
UNCOV
1583
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
UNCOV
1584
    while (pIter) {
×
UNCOV
1585
      if (pSma && pSma->uid == pStream->smaId) {
×
UNCOV
1586
        sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
1587
        sdbRelease(pMnode->pSdb, pStream);
×
1588

UNCOV
1589
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
1590
        tFreeMDropStreamReq(&dropReq);
×
UNCOV
1591
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
1592

UNCOV
1593
        mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
×
1594
               dropReq.name, pStream->uid, tstrerror(terrno));
UNCOV
1595
        TAOS_RETURN(code);
×
1596
      }
1597

UNCOV
1598
      if (pSma) {
×
UNCOV
1599
        sdbRelease(pMnode->pSdb, pSma);
×
1600
      }
1601

UNCOV
1602
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1603
    }
1604
  }
1605

1606
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
×
UNCOV
1607
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1608
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1609
    return -1;
×
1610
  }
1611

1612
  // check if it is conflict with other trans in both sourceDb and targetDb.
1613
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
×
1614
  if (code) {
×
UNCOV
1615
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1616
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1617
    return code;
×
1618
  }
1619

1620
  STrans *pTrans = NULL;
×
1621
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
×
1622
  if (pTrans == NULL || code) {
×
1623
    mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
×
UNCOV
1624
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1625
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1626
    TAOS_RETURN(code);
×
1627
  }
1628

1629
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
×
1630
  if (code) {
×
1631
    mError("failed to register drop stream trans, code:%s", tstrerror(code));
×
1632
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1633
    mndTransDrop(pTrans);
×
UNCOV
1634
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1635
    TAOS_RETURN(code);
×
1636
  }
1637

1638
  // drop all tasks
1639
  code = mndStreamSetDropAction(pMnode, pTrans, pStream);
×
1640
  if (code) {
×
1641
    mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code));
×
1642
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1643
    mndTransDrop(pTrans);
×
UNCOV
1644
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1645
    TAOS_RETURN(code);
×
1646
  }
1647

1648
  // drop stream
1649
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
×
1650
  if (code) {
×
1651
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1652
    mndTransDrop(pTrans);
×
UNCOV
1653
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1654
    TAOS_RETURN(code);
×
1655
  }
1656

1657
  code = mndTransPrepare(pMnode, pTrans);
×
1658
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1659
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1660
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1661
    mndTransDrop(pTrans);
×
UNCOV
1662
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1663
    TAOS_RETURN(code);
×
1664
  }
1665

1666
  // kill the related checkpoint trans
1667
  int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
×
UNCOV
1668
  if (transId != 0) {
×
UNCOV
1669
    mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
×
UNCOV
1670
    mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1671
  }
1672

UNCOV
1673
  mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
×
1674
         pStream->uid, transId);
1675

UNCOV
1676
  removeStreamTasksInBuf(pStream, &execInfo);
×
1677

UNCOV
1678
  SName name = {0};
×
UNCOV
1679
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1680
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", name.dbname, dropReq.sql, dropReq.sqlLen);
×
1681

UNCOV
1682
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1683
  mndTransDrop(pTrans);
×
UNCOV
1684
  tFreeMDropStreamReq(&dropReq);
×
1685

UNCOV
1686
  if (code == 0) {
×
UNCOV
1687
    return TSDB_CODE_ACTION_IN_PROGRESS;
×
1688
  } else {
UNCOV
1689
    TAOS_RETURN(code);
×
1690
  }
1691
}
1692

1693
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
8✔
1694
  SSdb   *pSdb = pMnode->pSdb;
8✔
1695
  void   *pIter = NULL;
8✔
1696
  int32_t code = 0;
8✔
1697

UNCOV
1698
  while (1) {
×
1699
    SStreamObj *pStream = NULL;
8✔
1700
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
8✔
1701
    if (pIter == NULL) break;
8!
1702

UNCOV
1703
    if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
×
UNCOV
1704
      if (pStream->sourceDbUid != pStream->targetDbUid) {
×
UNCOV
1705
        sdbRelease(pSdb, pStream);
×
UNCOV
1706
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1707
        mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
×
1708
               pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
UNCOV
1709
        TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
×
1710
      } else {
1711
        // kill the related checkpoint trans
1712
        int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
×
UNCOV
1713
        if (transId != 0) {
×
UNCOV
1714
          mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
×
UNCOV
1715
          mndKillTransImpl(pMnode, transId, pStream->sourceDb);
×
1716
        }
1717

1718
        // drop the stream obj in execInfo
UNCOV
1719
        removeStreamTasksInBuf(pStream, &execInfo);
×
1720

1721
        code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
×
1722
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1723
          sdbRelease(pSdb, pStream);
×
UNCOV
1724
          sdbCancelFetch(pSdb, pIter);
×
UNCOV
1725
          return code;
×
1726
        }
1727
      }
1728
    }
1729

UNCOV
1730
    sdbRelease(pSdb, pStream);
×
1731
  }
1732

1733
  return 0;
8✔
1734
}
1735

UNCOV
1736
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
1737
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1738
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1739
  int32_t     numOfRows = 0;
×
UNCOV
1740
  SStreamObj *pStream = NULL;
×
UNCOV
1741
  int32_t     code = 0;
×
1742

UNCOV
1743
  while (numOfRows < rows) {
×
UNCOV
1744
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
UNCOV
1745
    if (pShow->pIter == NULL) break;
×
1746

UNCOV
1747
    code = setStreamAttrInResBlock(pStream, pBlock, numOfRows);
×
UNCOV
1748
    if (code == 0) {
×
UNCOV
1749
      numOfRows++;
×
1750
    }
UNCOV
1751
    sdbRelease(pSdb, pStream);
×
1752
  }
1753

UNCOV
1754
  pShow->numOfRows += numOfRows;
×
1755
  return numOfRows;
×
1756
}
1757

1758
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
UNCOV
1759
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1760
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
1761
}
×
1762

UNCOV
1763
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
UNCOV
1764
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1765
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1766
  int32_t     numOfRows = 0;
×
UNCOV
1767
  SStreamObj *pStream = NULL;
×
UNCOV
1768
  int32_t     code = 0;
×
1769

UNCOV
1770
  streamMutexLock(&execInfo.lock);
×
UNCOV
1771
  mndInitStreamExecInfo(pMnode, &execInfo);
×
UNCOV
1772
  streamMutexUnlock(&execInfo.lock);
×
1773

UNCOV
1774
  while (numOfRows < rowsCapacity) {
×
UNCOV
1775
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
UNCOV
1776
    if (pShow->pIter == NULL) {
×
UNCOV
1777
      break;
×
1778
    }
1779

1780
    // lock
UNCOV
1781
    taosRLockLatch(&pStream->lock);
×
1782

UNCOV
1783
    int32_t count = mndGetNumOfStreamTasks(pStream);
×
1784
    if (numOfRows + count > rowsCapacity) {
×
1785
      code = blockDataEnsureCapacity(pBlock, numOfRows + count);
×
1786
      if (code) {
×
1787
        mError("failed to prepare the result block buffer, quit return value");
×
UNCOV
1788
        taosRUnLockLatch(&pStream->lock);
×
UNCOV
1789
        sdbRelease(pSdb, pStream);
×
UNCOV
1790
        continue;
×
1791
      }
1792
    }
1793

UNCOV
1794
    int32_t precision = TSDB_TIME_PRECISION_MILLI;
×
UNCOV
1795
    SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
×
UNCOV
1796
    if (pSourceDb != NULL) {
×
UNCOV
1797
      precision = pSourceDb->cfg.precision;
×
UNCOV
1798
      mndReleaseDb(pMnode, pSourceDb);
×
1799
    }
1800

1801
    // add row for each task
1802
    SStreamTaskIter *pIter = NULL;
×
1803
    code = createStreamTaskIter(pStream, &pIter);
×
1804
    if (code) {
×
1805
      taosRUnLockLatch(&pStream->lock);
×
UNCOV
1806
      sdbRelease(pSdb, pStream);
×
UNCOV
1807
      mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
1808
      continue;
×
1809
    }
1810

UNCOV
1811
    while (streamTaskIterNextTask(pIter)) {
×
1812
      SStreamTask *pTask = NULL;
×
1813
      code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
1814
      if (code) {
×
UNCOV
1815
        destroyStreamTaskIter(pIter);
×
UNCOV
1816
        break;
×
1817
      }
1818

UNCOV
1819
      code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
×
UNCOV
1820
      if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
1821
        numOfRows++;
×
1822
      }
1823
    }
1824

UNCOV
1825
    pBlock->info.rows = numOfRows;
×
1826

UNCOV
1827
    destroyStreamTaskIter(pIter);
×
UNCOV
1828
    taosRUnLockLatch(&pStream->lock);
×
1829

UNCOV
1830
    sdbRelease(pSdb, pStream);
×
1831
  }
1832

UNCOV
1833
  pShow->numOfRows += numOfRows;
×
1834
  return numOfRows;
×
1835
}
1836

1837
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
UNCOV
1838
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1839
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
1840
}
×
1841

UNCOV
1842
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
×
UNCOV
1843
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1844
  SStreamObj *pStream = NULL;
×
UNCOV
1845
  int32_t     code = 0;
×
1846

UNCOV
1847
  SMPauseStreamReq pauseReq = {0};
×
UNCOV
1848
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
UNCOV
1849
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1850
  }
1851

UNCOV
1852
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
UNCOV
1853
  if (pStream == NULL || code != 0) {
×
UNCOV
1854
    if (pauseReq.igNotExists) {
×
UNCOV
1855
      mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
×
UNCOV
1856
      return 0;
×
1857
    } else {
UNCOV
1858
      mError("stream:%s not exist, failed to pause stream", pauseReq.name);
×
UNCOV
1859
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1860
    }
1861
  }
1862

1863
  mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
×
1864

UNCOV
1865
  if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
×
UNCOV
1866
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1867
    return code;
×
1868
  }
1869

1870
  // check if it is conflict with other trans in both sourceDb and targetDb.
1871
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
×
UNCOV
1872
  if (code) {
×
UNCOV
1873
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1874
    TAOS_RETURN(code);
×
1875
  }
1876

1877
  bool updated = mndStreamNodeIsUpdated(pMnode);
×
1878
  if (updated) {
×
UNCOV
1879
    mError("tasks are not ready for pause, node update detected");
×
UNCOV
1880
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1881
    TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1882
  }
1883

1884
  {  // check for tasks, if tasks are not ready, not allowed to pause
UNCOV
1885
    bool found = false;
×
UNCOV
1886
    bool readyToPause = true;
×
UNCOV
1887
    streamMutexLock(&execInfo.lock);
×
1888

1889
    for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
UNCOV
1890
      STaskId *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
1891
      if (p == NULL) {
×
UNCOV
1892
        continue;
×
1893
      }
1894

UNCOV
1895
      STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
1896
      if (pEntry == NULL) {
×
UNCOV
1897
        continue;
×
1898
      }
1899

UNCOV
1900
      if (pEntry->id.streamId != pStream->uid) {
×
UNCOV
1901
        continue;
×
1902
      }
1903

UNCOV
1904
      if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
×
UNCOV
1905
        mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
×
1906
               pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
UNCOV
1907
        readyToPause = false;
×
1908
      }
1909

UNCOV
1910
      found = true;
×
1911
    }
1912

1913
    streamMutexUnlock(&execInfo.lock);
×
1914
    if (!found) {
×
UNCOV
1915
      mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
×
UNCOV
1916
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1917
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1918
    }
1919

UNCOV
1920
    if (!readyToPause) {
×
UNCOV
1921
      mError("stream:%s task not ready for pause yet", pauseReq.name);
×
UNCOV
1922
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1923
      TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
×
1924
    }
1925
  }
1926

1927
  STrans *pTrans = NULL;
×
1928
  code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
×
1929
  if (pTrans == NULL || code) {
×
UNCOV
1930
    mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1931
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1932
    return code;
×
1933
  }
1934

1935
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
×
1936
  if (code) {
×
UNCOV
1937
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1938
    mndTransDrop(pTrans);
×
UNCOV
1939
    return code;
×
1940
  }
1941

1942
  // if nodeUpdate happened, not send pause trans
1943
  code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
×
1944
  if (code) {
×
1945
    mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
×
UNCOV
1946
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1947
    mndTransDrop(pTrans);
×
UNCOV
1948
    return code;
×
1949
  }
1950

1951
  // pause stream
1952
  taosWLockLatch(&pStream->lock);
×
1953
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
1954
  if (code) {
×
1955
    taosWUnLockLatch(&pStream->lock);
×
UNCOV
1956
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1957
    mndTransDrop(pTrans);
×
UNCOV
1958
    return code;
×
1959
  }
1960

UNCOV
1961
  taosWUnLockLatch(&pStream->lock);
×
1962

1963
  code = mndTransPrepare(pMnode, pTrans);
×
1964
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1965
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
1966
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1967
    mndTransDrop(pTrans);
×
UNCOV
1968
    return code;
×
1969
  }
1970

UNCOV
1971
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1972
  mndTransDrop(pTrans);
×
1973

UNCOV
1974
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
1975
}
1976

UNCOV
1977
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
×
UNCOV
1978
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1979
  SStreamObj *pStream = NULL;
×
1980
  int32_t     code = 0;
×
1981

UNCOV
1982
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
UNCOV
1983
    return code;
×
1984
  }
1985

UNCOV
1986
  SMResumeStreamReq resumeReq = {0};
×
UNCOV
1987
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
×
UNCOV
1988
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1989
  }
1990

UNCOV
1991
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
×
UNCOV
1992
  if (pStream == NULL || code != 0) {
×
UNCOV
1993
    if (resumeReq.igNotExists) {
×
UNCOV
1994
      mInfo("stream:%s not exist, not resume stream", resumeReq.name);
×
UNCOV
1995
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1996
      return 0;
×
1997
    } else {
UNCOV
1998
      mError("stream:%s not exist, failed to resume stream", resumeReq.name);
×
UNCOV
1999
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2000
    }
2001
  }
2002

2003
  mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
×
UNCOV
2004
  if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
×
UNCOV
2005
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2006
    return -1;
×
2007
  }
2008

2009
  // check if it is conflict with other trans in both sourceDb and targetDb.
2010
  code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
×
UNCOV
2011
  if (code) {
×
UNCOV
2012
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2013
    return code;
×
2014
  }
2015

UNCOV
2016
  STrans *pTrans = NULL;
×
2017
  code =
2018
      doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
×
2019
  if (pTrans == NULL || code) {
×
UNCOV
2020
    mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
×
UNCOV
2021
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2022
    return code;
×
2023
  }
2024

2025
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
×
2026
  if (code) {
×
UNCOV
2027
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2028
    mndTransDrop(pTrans);
×
UNCOV
2029
    return code;
×
2030
  }
2031

2032
  // set the resume action
2033
  code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
×
2034
  if (code) {
×
2035
    mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
×
UNCOV
2036
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2037
    mndTransDrop(pTrans);
×
UNCOV
2038
    return code;
×
2039
  }
2040

2041
  // resume stream
2042
  taosWLockLatch(&pStream->lock);
×
UNCOV
2043
  pStream->status = STREAM_STATUS__NORMAL;
×
2044
  if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
×
2045
    taosWUnLockLatch(&pStream->lock);
×
2046

UNCOV
2047
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2048
    mndTransDrop(pTrans);
×
UNCOV
2049
    return code;
×
2050
  }
2051

2052
  taosWUnLockLatch(&pStream->lock);
×
2053
  code = mndTransPrepare(pMnode, pTrans);
×
2054
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2055
    mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
2056
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2057
    mndTransDrop(pTrans);
×
UNCOV
2058
    return code;
×
2059
  }
2060

UNCOV
2061
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
2062
  mndTransDrop(pTrans);
×
2063

2064
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
2065
}
2066

2067
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
×
UNCOV
2068
  SMnode     *pMnode = pReq->info.node;
×
2069
  SStreamObj *pStream = NULL;
×
2070
  int32_t     code = 0;
×
2071

UNCOV
2072
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
2073
    return code;
×
2074
  }
2075

UNCOV
2076
  SMResetStreamReq resetReq = {0};
×
UNCOV
2077
  if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
×
2078
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
2079
  }
2080

2081
  mDebug("recv reset stream req, stream:%s", resetReq.name);
×
2082

2083
  code = mndAcquireStream(pMnode, resetReq.name, &pStream);
×
2084
  if (pStream == NULL || code != 0) {
×
UNCOV
2085
    if (resetReq.igNotExists) {
×
2086
      mInfo("stream:%s, not exist, not pause stream", resetReq.name);
×
2087
      return 0;
×
2088
    } else {
UNCOV
2089
      mError("stream:%s not exist, failed to pause stream", resetReq.name);
×
UNCOV
2090
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2091
    }
2092
  }
2093

2094
  //todo(liao hao jun)
UNCOV
2095
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
2096
}
2097

UNCOV
2098
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes,
×
2099
                                      STrans **pUpdateTrans, SArray* pStreamList) {
UNCOV
2100
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2101
  void   *pIter = NULL;
×
UNCOV
2102
  STrans *pTrans = NULL;
×
UNCOV
2103
  int32_t code = 0;
×
UNCOV
2104
  *pUpdateTrans = NULL;
×
2105

2106
  // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
UNCOV
2107
  while (1) {
×
UNCOV
2108
    SStreamObj *pStream = NULL;
×
UNCOV
2109
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
2110
    if (pIter == NULL) {
×
UNCOV
2111
      break;
×
2112
    }
2113

UNCOV
2114
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
×
2115
    sdbRelease(pSdb, pStream);
×
2116

2117
    if (code) {
×
UNCOV
2118
      mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
×
UNCOV
2119
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
2120
      return code;
×
2121
    }
2122
  }
2123

UNCOV
2124
  while (1) {
×
UNCOV
2125
    SStreamObj *pStream = NULL;
×
UNCOV
2126
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
2127
    if (pIter == NULL) {
×
UNCOV
2128
      break;
×
2129
    }
2130

2131
    // here create only one trans
UNCOV
2132
    if (pTrans == NULL) {
×
2133
      code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
×
2134
                           "update task epsets", &pTrans);
2135
      if (pTrans == NULL || code) {
×
UNCOV
2136
        sdbRelease(pSdb, pStream);
×
UNCOV
2137
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
2138
        return terrno = code;
×
2139
      }
2140
    }
2141

UNCOV
2142
    if (!includeAllNodes) {
×
2143
      void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
×
2144
      void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
×
2145
      if (p1 == NULL && p2 == NULL) {
×
UNCOV
2146
        mDebug("stream:0x%" PRIx64 " %s not involved in nodeUpdate, ignore", pStream->uid, pStream->name);
×
UNCOV
2147
        sdbRelease(pSdb, pStream);
×
UNCOV
2148
        continue;
×
2149
      }
2150
    }
2151

UNCOV
2152
    mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
×
2153
           pStream->name, pTrans->id);
2154

2155
    // NOTE: for each stream, we register one trans entry for task update
UNCOV
2156
    code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
×
UNCOV
2157
    if (code) {
×
UNCOV
2158
      mError("failed to register trans, transId:%d, and continue", pTrans->id);
×
2159
    }
2160

UNCOV
2161
    code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
×
2162

2163
    // todo: not continue, drop all and retry again
2164
    if (code != TSDB_CODE_SUCCESS) {
×
2165
      mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid,
×
2166
             tstrerror(code));
UNCOV
2167
      sdbRelease(pSdb, pStream);
×
UNCOV
2168
      continue;
×
2169
    }
2170

UNCOV
2171
    code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
UNCOV
2172
    if (code == 0) {
×
UNCOV
2173
      taosArrayPush(pStreamList, &pStream->uid);
×
2174
    }
2175

2176
    sdbRelease(pSdb, pStream);
×
2177

UNCOV
2178
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2179
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
2180
      return code;
×
2181
    }
2182
  }
2183

2184
  // no need to build the trans to handle the vgroup update
UNCOV
2185
  *pUpdateTrans = pTrans;
×
UNCOV
2186
  return code;
×
2187
}
2188

2189
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
37✔
2190
  SSdb       *pSdb = pMnode->pSdb;
37✔
2191
  SStreamObj *pStream = NULL;
37✔
2192
  void       *pIter = NULL;
37✔
2193
  int32_t     code = 0;
37✔
2194

2195
  mDebug("start to refresh node list by existed streams");
37!
2196

2197
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
37✔
2198
  if (pHash == NULL) {
37!
UNCOV
2199
    return terrno;
×
2200
  }
2201

UNCOV
2202
  while (1) {
×
2203
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
37✔
2204
    if (pIter == NULL) {
37!
2205
      break;
37✔
2206
    }
2207

UNCOV
2208
    taosWLockLatch(&pStream->lock);
×
2209

2210
    SStreamTaskIter *pTaskIter = NULL;
×
2211
    code = createStreamTaskIter(pStream, &pTaskIter);
×
2212
    if (code) {
×
2213
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
2214
      sdbRelease(pSdb, pStream);
×
UNCOV
2215
      mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
2216
      continue;
×
2217
    }
2218

UNCOV
2219
    while (streamTaskIterNextTask(pTaskIter)) {
×
2220
      SStreamTask *pTask = NULL;
×
UNCOV
2221
      code = streamTaskIterGetCurrent(pTaskIter, &pTask);
×
UNCOV
2222
      if (code) {
×
UNCOV
2223
        break;
×
2224
      }
2225

UNCOV
2226
      SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
×
2227
      epsetAssign(&entry.epset, &pTask->info.epSet);
×
UNCOV
2228
      int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
×
UNCOV
2229
      if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
×
UNCOV
2230
        mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
×
2231
      }
2232
    }
2233

UNCOV
2234
    destroyStreamTaskIter(pTaskIter);
×
UNCOV
2235
    taosWUnLockLatch(&pStream->lock);
×
2236

UNCOV
2237
    sdbRelease(pSdb, pStream);
×
2238
  }
2239

2240
  taosArrayClear(pNodeList);
37✔
2241

2242
  // convert to list
2243
  pIter = NULL;
37✔
2244
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
37!
UNCOV
2245
    SNodeEntry *pEntry = (SNodeEntry *)pIter;
×
2246

2247
    void *p = taosArrayPush(pNodeList, pEntry);
×
2248
    if (p == NULL) {
×
UNCOV
2249
      mError("failed to put entry into node list, nodeId:%d, code: out of memory", pEntry->nodeId);
×
2250
      if (code == 0) {
×
UNCOV
2251
        code = terrno;
×
2252
      }
UNCOV
2253
      continue;
×
2254
    }
2255

2256
    char    buf[256] = {0};
×
UNCOV
2257
    int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf));  // ignore this error since it is only for log file
×
UNCOV
2258
    if (ret != 0) {                                                // print error and continue
×
UNCOV
2259
      mError("failed to convert epset to str, code:%s", tstrerror(ret));
×
2260
    }
2261

UNCOV
2262
    mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
×
2263
  }
2264

2265
  taosHashCleanup(pHash);
37✔
2266

2267
  mDebug("numOfvNodes:%d get after extracting nodeInfo from all streams", (int32_t)taosArrayGetSize(pNodeList));
37!
2268
  return code;
37✔
2269
}
2270

2271
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
×
2272
  void   *pIter = NULL;
×
2273
  int32_t code = 0;
×
2274
  while (1) {
×
2275
    SVgObj *pVgroup = NULL;
×
UNCOV
2276
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
2277
    if (pIter == NULL) {
×
2278
      break;
×
2279
    }
2280

2281
    code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
×
2282
    sdbRelease(pSdb, pVgroup);
×
2283

UNCOV
2284
    if (code == 0) {
×
UNCOV
2285
      int32_t size = taosHashGetSize(pDBMap);
×
2286
      mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
×
2287
    }
2288
  }
UNCOV
2289
}
×
2290

UNCOV
2291
static int32_t doProcessNodeCheckHelp(SArray *pNodeSnapshot, SMnode *pMnode, SVgroupChangeInfo *pChangeInfo,
×
2292
                                      bool *pUpdateAllVgroups) {
2293
  int32_t code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
×
UNCOV
2294
  if (code) {
×
UNCOV
2295
    mDebug("failed to remove expired node entry in buf, code:%s", tstrerror(code));
×
UNCOV
2296
    return code;
×
2297
  }
2298

2299
  code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, pChangeInfo);
×
UNCOV
2300
  if (code) {
×
UNCOV
2301
    mDebug("failed to find changed vnode(s) during vnode(s) check, code:%s", tstrerror(code));
×
UNCOV
2302
    return code;
×
2303
  }
2304

2305
  {
2306
    if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
×
2307
      mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
×
UNCOV
2308
      *pUpdateAllVgroups = true;
×
UNCOV
2309
      execInfo.switchFromFollower = false;  // reset the flag
×
UNCOV
2310
      addAllDbsIntoHashmap(pChangeInfo->pDBMap, pMnode->pSdb);
×
2311
    }
2312
  }
2313

UNCOV
2314
  if (taosArrayGetSize(pChangeInfo->pUpdateNodeList) > 0 || (*pUpdateAllVgroups)) {
×
2315
    // kill current active checkpoint transaction, since the transaction is vnode wide.
UNCOV
2316
    killAllCheckpointTrans(pMnode, pChangeInfo);
×
2317
  } else {
UNCOV
2318
    mDebug("no update found in vnode(s) list");
×
2319
  }
2320

UNCOV
2321
  return code;
×
2322
}
2323

2324
// this function runs by only one thread, so it is not multi-thread safe
UNCOV
2325
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
×
UNCOV
2326
  int32_t           code = 0;
×
UNCOV
2327
  bool              allReady = true;
×
UNCOV
2328
  SArray           *pNodeSnapshot = NULL;
×
UNCOV
2329
  SMnode           *pMnode = pMsg->info.node;
×
UNCOV
2330
  int64_t           tsms = taosGetTimestampMs();
×
UNCOV
2331
  int64_t           ts = tsms / 1000;
×
UNCOV
2332
  bool              updateAllVgroups = false;
×
UNCOV
2333
  SVgroupChangeInfo changeInfo = {0};
×
2334

2335
  int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
×
UNCOV
2336
  if (old != 0) {
×
UNCOV
2337
    mDebug("still in checking node change");
×
UNCOV
2338
    return 0;
×
2339
  }
2340

UNCOV
2341
  mDebug("start to do node changing check, ts:%" PRId64, tsms);
×
2342

UNCOV
2343
  streamMutexLock(&execInfo.lock);
×
UNCOV
2344
  int32_t numOfNodes = extractStreamNodeList(pMnode);
×
2345
  streamMutexUnlock(&execInfo.lock);
×
2346

2347
  if (numOfNodes == 0) {
×
2348
    mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
×
UNCOV
2349
    execInfo.ts = ts;
×
UNCOV
2350
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
UNCOV
2351
    return 0;
×
2352
  }
2353

UNCOV
2354
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot, NULL);
×
UNCOV
2355
  if (code) {
×
UNCOV
2356
    mError("failed to take the vgroup snapshot, ignore it and continue");
×
2357
  }
2358

UNCOV
2359
  if (!allReady) {
×
UNCOV
2360
    taosArrayDestroy(pNodeSnapshot);
×
UNCOV
2361
    atomic_store_32(&mndNodeCheckSentinel, 0);
×
UNCOV
2362
    mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
×
UNCOV
2363
    return 0;
×
2364
  }
2365

UNCOV
2366
  streamMutexLock(&execInfo.lock);
×
UNCOV
2367
  code = doProcessNodeCheckHelp(pNodeSnapshot, pMnode, &changeInfo, &updateAllVgroups);
×
2368
  streamMutexUnlock(&execInfo.lock);
×
2369

UNCOV
2370
  if (code) {
×
UNCOV
2371
    goto _end;
×
2372
  }
2373

UNCOV
2374
  if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
×
UNCOV
2375
    mDebug("vnode(s) change detected, build trans to update stream task epsets");
×
2376

UNCOV
2377
    STrans *pTrans = NULL;
×
UNCOV
2378
    SArray* pStreamIdList = taosArrayInit(4, sizeof(int64_t));
×
2379

UNCOV
2380
    streamMutexLock(&execInfo.lock);
×
UNCOV
2381
    code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans, pStreamIdList);
×
2382

2383
    // remove the consensus-checkpoint-id req of all related stream(s)
UNCOV
2384
    int32_t num = taosArrayGetSize(pStreamIdList);
×
UNCOV
2385
    if (num > 0) {
×
UNCOV
2386
      mDebug("start to clear %d related stream in consensus-checkpoint-id list due to nodeUpdate", num);
×
UNCOV
2387
      for (int32_t x = 0; x < num; ++x) {
×
2388
        int64_t uid = *(int64_t *)taosArrayGet(pStreamIdList, x);
×
UNCOV
2389
        int32_t ret = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, uid);
×
UNCOV
2390
        if (ret != 0) {
×
UNCOV
2391
          mError("failed to remove stream:0x%" PRIx64 " from consensus-checkpoint-id list, code:%s", uid,
×
2392
                 tstrerror(ret));
2393
        }
2394
      }
2395
    }
2396

UNCOV
2397
    streamMutexUnlock(&execInfo.lock);
×
UNCOV
2398
    taosArrayDestroy(pStreamIdList);
×
2399

2400
    // NOTE: sync trans out of lock
2401
    if (code == 0 && pTrans != NULL) {
×
UNCOV
2402
      code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
2403
      if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2404
        mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
2405
      }
2406

UNCOV
2407
      mndTransDrop(pTrans);
×
2408
    }
2409

2410
    // keep the new vnode snapshot if success
UNCOV
2411
    if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2412
      streamMutexLock(&execInfo.lock);
×
2413

UNCOV
2414
      code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
×
UNCOV
2415
      int32_t num = (int)taosArrayGetSize(execInfo.pNodeList);
×
UNCOV
2416
      if (code == 0) {
×
UNCOV
2417
        execInfo.ts = ts;
×
UNCOV
2418
        mDebug("create trans successfully, update cached node list, numOfNodes:%d", num);
×
2419
      }
2420

2421
      streamMutexUnlock(&execInfo.lock);
×
2422

UNCOV
2423
      if (code) {
×
UNCOV
2424
        mError("failed to extract node list from stream, code:%s", tstrerror(code));
×
UNCOV
2425
        goto _end;
×
2426
      }
2427
    }
2428
  }
2429

UNCOV
2430
  mndDestroyVgroupChangeInfo(&changeInfo);
×
2431

UNCOV
2432
_end:
×
UNCOV
2433
  taosArrayDestroy(pNodeSnapshot);
×
2434

UNCOV
2435
  mDebug("end to do stream task node change checking, elapsed time:%" PRId64 "ms", taosGetTimestampMs() - tsms);
×
UNCOV
2436
  atomic_store_32(&mndNodeCheckSentinel, 0);
×
2437

UNCOV
2438
  return 0;
×
2439
}
2440

2441
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
62✔
2442
  SMnode *pMnode = pReq->info.node;
62✔
2443
  SSdb   *pSdb = pMnode->pSdb;
62✔
2444
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
62!
2445
    return 0;
62✔
2446
  }
2447

2448
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
×
UNCOV
2449
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
×
UNCOV
2450
  if (pMsg == NULL) {
×
UNCOV
2451
    return terrno;
×
2452
  }
2453

UNCOV
2454
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
×
2455
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
2456
}
2457

2458
static int32_t mndProcessStatusCheck(SRpcMsg *pReq) {
×
2459
  SMnode *pMnode = pReq->info.node;
×
UNCOV
2460
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
2461
  if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
×
2462
    return 0;
×
2463
  }
2464

2465
  int32_t               size = sizeof(SMStreamNodeCheckMsg);
×
UNCOV
2466
  SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
×
UNCOV
2467
  if (pMsg == NULL) {
×
2468
    return terrno;
×
2469
  }
2470

UNCOV
2471
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
×
UNCOV
2472
  return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
2473
}
2474

UNCOV
2475
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
×
2476
  SStreamTaskIter *pIter = NULL;
×
2477
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
2478
  if (code) {
×
UNCOV
2479
    mError("failed to create task iter for stream:%s", pStream->name);
×
UNCOV
2480
    return;
×
2481
  }
2482

UNCOV
2483
  while (streamTaskIterNextTask(pIter)) {
×
2484
    SStreamTask *pTask = NULL;
×
UNCOV
2485
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
2486
    if (code) {
×
UNCOV
2487
      break;
×
2488
    }
2489

UNCOV
2490
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
×
UNCOV
2491
    void   *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
×
UNCOV
2492
    if (p == NULL) {
×
UNCOV
2493
      STaskStatusEntry entry = {0};
×
UNCOV
2494
      streamTaskStatusInit(&entry, pTask);
×
2495

UNCOV
2496
      code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
×
UNCOV
2497
      if (code == 0) {
×
UNCOV
2498
        void   *px = taosArrayPush(pExecNode->pTaskList, &id);
×
UNCOV
2499
        int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
×
2500
        if (px) {
×
UNCOV
2501
          mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2502
        } else {
2503
          mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
×
2504
        }
2505
      } else {
UNCOV
2506
        mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
×
2507
      }
2508

2509
      // add the new vgroups if not added yet
UNCOV
2510
      bool exist = false;
×
UNCOV
2511
      for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
×
UNCOV
2512
        SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
×
UNCOV
2513
        if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
×
UNCOV
2514
          exist = true;
×
UNCOV
2515
          break;
×
2516
        }
2517
      }
2518

UNCOV
2519
      if (!exist) {
×
UNCOV
2520
        SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
×
UNCOV
2521
        epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
×
2522

UNCOV
2523
        void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
×
2524
        if (px) {
×
UNCOV
2525
          mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
×
2526
        } else {
UNCOV
2527
          mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
×
2528
                 (int)taosArrayGetSize(pExecNode->pNodeList))
2529
        }
2530
      }
2531
    }
2532
  }
2533

UNCOV
2534
  destroyStreamTaskIter(pIter);
×
2535
}
2536

UNCOV
2537
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
×
UNCOV
2538
  int32_t num = taosArrayGetSize(pList);
×
2539
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
2540
    int32_t *pId = taosArrayGet(pList, i);
×
UNCOV
2541
    if (pId == NULL) {
×
UNCOV
2542
      continue;
×
2543
    }
2544

UNCOV
2545
    if (taskId == *pId) {
×
UNCOV
2546
      return;
×
2547
    }
2548
  }
2549

UNCOV
2550
  int32_t numOfTasks = taosArrayGetSize(pList);
×
UNCOV
2551
  void   *p = taosArrayPush(pList, &taskId);
×
2552
  if (p) {
×
UNCOV
2553
    mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
×
2554
  } else {
UNCOV
2555
    mError("stream:0x%" PRIx64 " receive %d reqs for checkpoint, failed to added into task list, since out of memory",
×
2556
           uid, numOfTasks);
2557
  }
2558
}
2559

UNCOV
2560
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
×
UNCOV
2561
  SMnode                  *pMnode = pReq->info.node;
×
UNCOV
2562
  SStreamTaskCheckpointReq req = {0};
×
2563

UNCOV
2564
  SDecoder decoder = {0};
×
2565
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
2566

2567
  if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
×
UNCOV
2568
    tDecoderClear(&decoder);
×
UNCOV
2569
    mError("invalid task checkpoint req msg received");
×
UNCOV
2570
    return TSDB_CODE_INVALID_MSG;
×
2571
  }
UNCOV
2572
  tDecoderClear(&decoder);
×
2573

UNCOV
2574
  mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
×
2575

2576
  // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
UNCOV
2577
  streamMutexLock(&execInfo.lock);
×
2578

2579
  SStreamObj *pStream = NULL;
×
UNCOV
2580
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
×
UNCOV
2581
  if (pStream == NULL || code != 0) {
×
UNCOV
2582
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
×
2583
          req.streamId);
2584

2585
    // not in meta-store yet, try to acquire the task in exec buffer
2586
    // the checkpoint req arrives too soon before the completion of the create stream trans.
2587
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2588
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2589
    if (p == NULL) {
×
UNCOV
2590
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
×
2591
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
2592
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2593
    } else {
UNCOV
2594
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2595
             req.streamId, req.taskId);
2596
    }
2597
  }
2598

UNCOV
2599
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
×
2600

UNCOV
2601
  SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
×
UNCOV
2602
  if (pReqTaskList == NULL) {
×
UNCOV
2603
    SArray *pList = taosArrayInit(4, sizeof(int32_t));
×
2604
    doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
×
UNCOV
2605
    code = taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
×
UNCOV
2606
    if (code) {
×
UNCOV
2607
      mError("failed to put into transfer state stream map, code: out of memory");
×
2608
    }
UNCOV
2609
    pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
×
2610
  } else {
UNCOV
2611
    doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
×
2612
  }
2613

UNCOV
2614
  int32_t total = taosArrayGetSize(*pReqTaskList);
×
UNCOV
2615
  if (total == numOfTasks) {  // all tasks have sent the reqs
×
UNCOV
2616
    int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
×
UNCOV
2617
    mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
×
2618

UNCOV
2619
    if (pStream != NULL) {  // TODO:handle error
×
UNCOV
2620
      code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
×
UNCOV
2621
      if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2622
        mError("stream:0x%" PRIx64 " failed to create checkpoint trans, checkpointId:%" PRId64 ", code:%s",
×
2623
               req.streamId, checkpointId, tstrerror(code));
2624
      }
2625
    } else {
2626
      // todo: wait for the create stream trans completed, and launch the checkpoint trans
2627
      // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
2628
      // sleep(500ms)
2629
    }
2630

2631
    // remove this entry, not overwriting the global error code
UNCOV
2632
    int32_t ret = taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
×
UNCOV
2633
    if (ret) {
×
UNCOV
2634
      mError("failed to remove transfer state stream, code:%s", tstrerror(ret));
×
2635
    }
2636

UNCOV
2637
    int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
×
UNCOV
2638
    mDebug("stream:0x%" PRIx64 " removed in transfer-state list, %d stream(s) not finish fill-history process",
×
2639
           req.streamId, numOfStreams);
2640
  }
2641

UNCOV
2642
  if (pStream != NULL) {
×
UNCOV
2643
    mndReleaseStream(pMnode, pStream);
×
2644
  }
2645

UNCOV
2646
  streamMutexUnlock(&execInfo.lock);
×
2647

2648
  {
2649
    SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
×
UNCOV
2650
    rsp.pCont = rpcMallocCont(rsp.contLen);
×
UNCOV
2651
    if (rsp.pCont == NULL) {
×
UNCOV
2652
      return terrno;
×
2653
    }
2654

UNCOV
2655
    SMsgHead *pHead = rsp.pCont;
×
UNCOV
2656
    pHead->vgId = htonl(req.nodeId);
×
2657

UNCOV
2658
    tmsgSendRsp(&rsp);
×
UNCOV
2659
    pReq->info.handle = NULL;  // disable auto rsp
×
2660
  }
2661

UNCOV
2662
  return 0;
×
2663
}
2664

2665
// valid the info according to the HbMsg
UNCOV
2666
static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
×
UNCOV
2667
  STaskId           id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
×
UNCOV
2668
  STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
UNCOV
2669
  if (pTaskEntry == NULL) {
×
UNCOV
2670
    mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
×
UNCOV
2671
    return false;
×
2672
  }
2673

2674
  if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
×
UNCOV
2675
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
×
2676
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
UNCOV
2677
    return false;
×
2678
  }
2679

2680
  // now the task in checkpoint procedure
UNCOV
2681
  if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
×
2682
    mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
×
2683
           " discard",
2684
           pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
UNCOV
2685
    return false;
×
2686
  }
2687

UNCOV
2688
  if (reportChkptId >= pReport->checkpointId) {
×
2689
    mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
×
2690
           " discard",
2691
           pReport->taskId, pReport->checkpointId, reportChkptId);
UNCOV
2692
    return false;
×
2693
  }
2694

UNCOV
2695
  return true;
×
2696
}
2697

UNCOV
2698
static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) {
×
UNCOV
2699
  bool valid = validateChkptReport(pReport, reportedChkptId);
×
UNCOV
2700
  if (!valid) {
×
UNCOV
2701
    return;
×
2702
  }
2703

2704
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
UNCOV
2705
    STaskChkptInfo *p = taosArrayGet(pList, i);
×
UNCOV
2706
    if (p == NULL) {
×
UNCOV
2707
      continue;
×
2708
    }
2709

UNCOV
2710
    if (p->taskId == pReport->taskId) {
×
2711
      if (p->checkpointId > pReport->checkpointId) {
×
2712
        mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
×
2713
               pReport->taskId, p->checkpointId, pReport->checkpointId);
UNCOV
2714
      } else if (p->checkpointId < pReport->checkpointId) {  // expired checkpoint-report msg, update it
×
UNCOV
2715
        mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
×
2716
              pReport->taskId, p->checkpointId, pReport->checkpointId);
2717

2718
        // update the checkpoint report info
2719
        p->checkpointId = pReport->checkpointId;
×
2720
        p->ts = pReport->checkpointTs;
×
UNCOV
2721
        p->version = pReport->checkpointVer;
×
2722
        p->transId = pReport->transId;
×
UNCOV
2723
        p->dropHTask = pReport->dropHTask;
×
2724
      } else {
UNCOV
2725
        mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
×
2726
      }
UNCOV
2727
      return;
×
2728
    }
2729
  }
2730

UNCOV
2731
  STaskChkptInfo info = {
×
UNCOV
2732
      .streamId = pReport->streamId,
×
UNCOV
2733
      .taskId = pReport->taskId,
×
UNCOV
2734
      .transId = pReport->transId,
×
UNCOV
2735
      .dropHTask = pReport->dropHTask,
×
UNCOV
2736
      .version = pReport->checkpointVer,
×
UNCOV
2737
      .ts = pReport->checkpointTs,
×
UNCOV
2738
      .checkpointId = pReport->checkpointId,
×
UNCOV
2739
      .nodeId = pReport->nodeId,
×
2740
  };
2741

UNCOV
2742
  void *p = taosArrayPush(pList, &info);
×
UNCOV
2743
  if (p == NULL) {
×
UNCOV
2744
    mError("failed to put into task list, taskId:0x%x", pReport->taskId);
×
2745
  } else {
UNCOV
2746
    int32_t size = taosArrayGetSize(pList);
×
UNCOV
2747
    mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report",
×
2748
           pReport->streamId, pReport->taskId, size);
2749
  }
2750
}
2751

UNCOV
2752
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
×
UNCOV
2753
  SMnode           *pMnode = pReq->info.node;
×
UNCOV
2754
  SCheckpointReport req = {0};
×
2755

UNCOV
2756
  SDecoder decoder = {0};
×
2757
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
2758

2759
  if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
×
UNCOV
2760
    tDecoderClear(&decoder);
×
UNCOV
2761
    mError("invalid task checkpoint-report msg received");
×
UNCOV
2762
    return TSDB_CODE_INVALID_MSG;
×
2763
  }
UNCOV
2764
  tDecoderClear(&decoder);
×
2765

UNCOV
2766
  streamMutexLock(&execInfo.lock);
×
UNCOV
2767
  mndInitStreamExecInfo(pMnode, &execInfo);
×
UNCOV
2768
  streamMutexUnlock(&execInfo.lock);
×
2769

UNCOV
2770
  mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
×
2771
         " checkpointVer:%" PRId64 " transId:%d",
2772
         req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
2773

2774
  // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans.
UNCOV
2775
  streamMutexLock(&execInfo.lock);
×
2776

2777
  SStreamObj *pStream = NULL;
×
UNCOV
2778
  int32_t     code = mndGetStreamObj(pMnode, req.streamId, &pStream);
×
UNCOV
2779
  if (pStream == NULL || code != 0) {
×
UNCOV
2780
    mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
×
2781

2782
    // not in meta-store yet, try to acquire the task in exec buffer
2783
    // the checkpoint req arrives too soon before the completion of the creation of stream trans.
2784
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
2785
    void   *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
×
2786
    if (p == NULL) {
×
UNCOV
2787
      mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
×
2788
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
2789
      return TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2790
    } else {
UNCOV
2791
      mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
×
2792
             req.streamId, req.taskId);
2793
    }
2794
  }
2795

UNCOV
2796
  int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
×
2797

2798
  SChkptReportInfo *pInfo =
UNCOV
2799
      (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
×
UNCOV
2800
  if (pInfo == NULL) {
×
UNCOV
2801
    SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
×
UNCOV
2802
    if (info.pTaskList != NULL) {
×
2803
      doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
×
UNCOV
2804
      code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
×
UNCOV
2805
      if (code) {
×
UNCOV
2806
        mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
×
2807
      }
2808

UNCOV
2809
      pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
×
2810
    }
2811
  } else {
UNCOV
2812
    doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
×
2813
  }
2814

UNCOV
2815
  int32_t total = taosArrayGetSize(pInfo->pTaskList);
×
UNCOV
2816
  if (total == numOfTasks) {  // all tasks have sent the reqs
×
UNCOV
2817
    mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
×
2818
          " will be issued soon",
2819
          req.streamId, pStream->name, total, req.checkpointId);
2820
  }
2821

UNCOV
2822
  if (pStream != NULL) {
×
UNCOV
2823
    mndReleaseStream(pMnode, pStream);
×
2824
  }
2825

UNCOV
2826
  streamMutexUnlock(&execInfo.lock);
×
2827

UNCOV
2828
  doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
×
UNCOV
2829
  return code;
×
2830
}
2831

UNCOV
2832
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
×
UNCOV
2833
  int32_t num = 0;
×
UNCOV
2834
  int64_t chkId = INT64_MAX;
×
UNCOV
2835
  *pExistedTasks = 0;
×
UNCOV
2836
  *pAllSame = true;
×
2837

2838
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
UNCOV
2839
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
2840
    if (p == NULL) {
×
UNCOV
2841
      continue;
×
2842
    }
2843

UNCOV
2844
    if (p->streamId != streamId) {
×
UNCOV
2845
      continue;
×
2846
    }
2847

UNCOV
2848
    num += 1;
×
2849
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
2850
    if (chkId > pe->checkpointInfo.latestId) {
×
UNCOV
2851
      if (chkId != INT64_MAX) {
×
UNCOV
2852
        *pAllSame = false;
×
UNCOV
2853
        mDebug("checkpointIds not identical, prev:%" PRId64 " smaller:%" PRId64 " from task:0x%" PRIx64, chkId,
×
2854
               pe->checkpointInfo.latestId, pe->id.taskId);
2855
      }
UNCOV
2856
      chkId = pe->checkpointInfo.latestId;
×
2857
    }
2858
  }
2859

UNCOV
2860
  *pExistedTasks = num;
×
UNCOV
2861
  if (num < numOfTasks) {  // not all task send info to mnode through hbMsg, no valid checkpoint Id
×
UNCOV
2862
    return -1;
×
2863
  }
2864

UNCOV
2865
  return chkId;
×
2866
}
2867

UNCOV
2868
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
×
UNCOV
2869
  SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
×
UNCOV
2870
  rsp.pCont = rpcMallocCont(rsp.contLen);
×
UNCOV
2871
  if (rsp.pCont != NULL) {
×
UNCOV
2872
    SMsgHead *pHead = rsp.pCont;
×
UNCOV
2873
    pHead->vgId = htonl(vgId);
×
2874

UNCOV
2875
    tmsgSendRsp(&rsp);
×
UNCOV
2876
    pInfo->handle = NULL;  // disable auto rsp
×
2877
  }
2878
}
×
2879

2880
static int32_t doCleanReqList(SArray *pList, SCheckpointConsensusInfo *pInfo) {
×
2881
  int32_t alreadySend = taosArrayGetSize(pList);
×
2882

2883
  for (int32_t i = 0; i < alreadySend; ++i) {
×
UNCOV
2884
    int32_t *taskId = taosArrayGet(pList, i);
×
UNCOV
2885
    if (taskId == NULL) {
×
2886
      continue;
×
2887
    }
2888

2889
    for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
×
2890
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
×
UNCOV
2891
      if ((pe != NULL) && (pe->req.taskId == *taskId)) {
×
UNCOV
2892
        taosArrayRemove(pInfo->pTaskList, k);
×
UNCOV
2893
        break;
×
2894
      }
2895
    }
2896
  }
2897

UNCOV
2898
  return alreadySend;
×
2899
}
2900

2901
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
269✔
2902
  SMnode *pMnode = pMsg->info.node;
269✔
2903
  int64_t now = taosGetTimestampMs();
269✔
2904
  bool    allReady = true;
269✔
2905
  SArray *pNodeSnapshot = NULL;
269✔
2906
  int32_t numOfTrans = 0;
269✔
2907
  int32_t code = 0;
269✔
2908
  void   *pIter = NULL;
269✔
2909

2910
  SArray *pList = taosArrayInit(4, sizeof(int32_t));
269✔
2911
  if (pList == NULL) {
269!
UNCOV
2912
    return terrno;
×
2913
  }
2914

2915
  SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
269✔
2916
  if (pStreamList == NULL) {
269!
UNCOV
2917
    taosArrayDestroy(pList);
×
UNCOV
2918
    return terrno;
×
2919
  }
2920

2921
  SHashObj* pTermMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
269✔
2922
  if (pTermMap == NULL) {
269!
UNCOV
2923
    taosArrayDestroy(pList);
×
UNCOV
2924
    taosArrayDestroy(pStreamList);
×
UNCOV
2925
    return terrno;
×
2926
  }
2927

2928
  mDebug("start to process consensus-checkpointId in tmr");
269!
2929

2930
  code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot, pTermMap);
269✔
2931
  taosArrayDestroy(pNodeSnapshot);
269✔
2932
  if (code) {
269!
UNCOV
2933
    mError("failed to get the vgroup snapshot, ignore it and continue");
×
2934
  }
2935

2936
  if (!allReady) {
269✔
2937
    mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
189!
2938
    taosArrayDestroy(pStreamList);
189✔
2939
    taosArrayDestroy(pList);
189✔
2940
    taosHashCleanup(pTermMap);
189✔
2941
    return 0;
189✔
2942
  }
2943

2944
  streamMutexLock(&execInfo.lock);
80✔
2945

2946
  while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
80!
UNCOV
2947
    SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
×
2948

UNCOV
2949
    taosArrayClear(pList);
×
2950

UNCOV
2951
    int64_t     streamId = -1;
×
UNCOV
2952
    int32_t     num = taosArrayGetSize(pInfo->pTaskList);
×
UNCOV
2953
    SStreamObj *pStream = NULL;
×
2954

2955
    code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
×
2956
    if (pStream == NULL || code != 0) {  // stream has been dropped already
×
2957
      mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
×
UNCOV
2958
      void *p = taosArrayPush(pStreamList, &pInfo->streamId);
×
UNCOV
2959
      if (p == NULL) {
×
UNCOV
2960
        mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
×
2961
               " code:%s, continue",
2962
               pInfo->streamId, tstrerror(terrno));
2963
      }
UNCOV
2964
      continue;
×
2965
    }
2966

UNCOV
2967
    if (pStream->uid != pInfo->streamId) {
×
2968
      // todo remove it
2969
    }
2970

UNCOV
2971
    if ((num < pInfo->numOfTasks) || (pInfo->numOfTasks == 0)) {
×
UNCOV
2972
      mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-consensus req(not all), ignore", pStream->uid,
×
2973
             pStream->name, num, pInfo->numOfTasks);
UNCOV
2974
      mndReleaseStream(pMnode, pStream);
×
UNCOV
2975
      continue;
×
2976
    }
2977

UNCOV
2978
    streamId = pStream->uid;
×
2979

UNCOV
2980
    int32_t existed = 0;
×
2981
    bool    allSame = true;
×
2982
    int64_t chkId = getConsensusId(pInfo->streamId, pInfo->numOfTasks, &existed, &allSame);
×
2983
    if (chkId == -1) {
×
UNCOV
2984
      mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again", existed, pInfo->numOfTasks);
×
UNCOV
2985
      mndReleaseStream(pMnode, pStream);
×
UNCOV
2986
      continue;
×
2987
    }
2988

UNCOV
2989
    bool allQualified = true;
×
2990
    for (int32_t j = 0; j < num; ++j) {
×
UNCOV
2991
      SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
×
UNCOV
2992
      if (pe == NULL) {
×
UNCOV
2993
        continue;
×
2994
      }
2995

2996
      if (pe->req.nodeId != -2) {
×
UNCOV
2997
        int32_t *pTerm = taosHashGet(pTermMap, &(pe->req.nodeId), sizeof(pe->req.nodeId));
×
2998
        if (pTerm == NULL) {
×
2999
          mError("stream:0x%" PRIx64 " s-task:0x%x req from vgId:%d not found in termMap", pe->req.streamId,
×
3000
                 pe->req.taskId, pe->req.nodeId);
UNCOV
3001
          allQualified = false;
×
UNCOV
3002
          continue;
×
3003
        } else {
UNCOV
3004
          if (*pTerm != pe->req.term) {
×
UNCOV
3005
            mWarn("stream:0x%" PRIx64 " s-task:0x%x req from vgId:%d is expired, term:%d, current term:%d",
×
3006
                  pe->req.streamId, pe->req.taskId, pe->req.nodeId, pe->req.term, *pTerm);
UNCOV
3007
            allQualified = false;
×
UNCOV
3008
            continue;
×
3009
          }
3010
        }
3011
      }
3012

UNCOV
3013
      if (((now - pe->ts) >= 10 * 1000) || allSame) {
×
3014
        mDebug("s-task:0x%x vgId:%d term:%d sendTs:%" PRId64 " wait %.2fs or all tasks have same checkpointId:%" PRId64, pe->req.taskId,
×
3015
               pe->req.nodeId, pe->req.term, pe->req.startTs, (now - pe->ts) / 1000.0, chkId);
3016
        if (chkId > pe->req.checkpointId) {
×
3017
          streamMutexUnlock(&execInfo.lock);
×
3018

UNCOV
3019
          taosArrayDestroy(pStreamList);
×
3020
          taosArrayDestroy(pList);
×
UNCOV
3021
          taosHashCleanup(pTermMap);
×
3022

3023
          mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
×
3024
                 pe->req.checkpointId, chkId);
3025

UNCOV
3026
          mndReleaseStream(pMnode, pStream);
×
UNCOV
3027
          taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
UNCOV
3028
          return TSDB_CODE_FAILED;
×
3029
        }
3030

3031
      } else {
UNCOV
3032
        mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
×
3033
               pe->req.startTs, (now - pe->ts) / 1000.0);
UNCOV
3034
        allQualified = false;
×
3035
      }
3036
    }
3037

UNCOV
3038
    if (allQualified) {
×
UNCOV
3039
      code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_CONSEN_NAME, false);
×
3040

3041
      if (code == 0) {
×
UNCOV
3042
        code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, chkId, pInfo->pTaskList);
×
UNCOV
3043
        if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3044
          mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
×
3045
        } else {
UNCOV
3046
          numOfTrans += 1;
×
3047
          mndClearConsensusRspEntry(pInfo);
×
UNCOV
3048
          void *p = taosArrayPush(pStreamList, &streamId);
×
UNCOV
3049
          if (p == NULL) {
×
UNCOV
3050
            mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list",
×
3051
                   streamId);
3052
          }
3053
        }
3054
      } else {
UNCOV
3055
        mDebug("stream:0x%" PRIx64 "not create chktp-consensus, due to trans conflict", pStream->uid);
×
3056
      }
3057
    }
3058

UNCOV
3059
    mndReleaseStream(pMnode, pStream);
×
3060

3061
    // create one transaction each time
UNCOV
3062
    if (numOfTrans > 0) {
×
UNCOV
3063
      taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
×
UNCOV
3064
      break;
×
3065
    }
3066
  }
3067

3068
  for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
80!
UNCOV
3069
    int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
×
UNCOV
3070
    if (pStreamId == NULL) {
×
UNCOV
3071
      continue;
×
3072
    }
3073

UNCOV
3074
    code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
×
3075
  }
3076

3077
  streamMutexUnlock(&execInfo.lock);
80✔
3078

3079
  taosArrayDestroy(pStreamList);
80✔
3080
  taosArrayDestroy(pList);
80✔
3081
  taosHashCleanup(pTermMap);
80✔
3082

3083
  mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
80!
3084
  return code;
80✔
3085
}
3086

3087
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
×
3088
  int32_t code = mndProcessCreateStreamReq(pReq);
×
3089
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3090
    pReq->info.rsp = rpcMallocCont(1);
×
UNCOV
3091
    if (pReq->info.rsp == NULL) {
×
3092
      return terrno;
×
3093
    }
3094

UNCOV
3095
    pReq->info.rspLen = 1;
×
UNCOV
3096
    pReq->info.noResp = false;
×
UNCOV
3097
    pReq->code = code;
×
3098
  }
UNCOV
3099
  return code;
×
3100
}
3101

UNCOV
3102
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
×
UNCOV
3103
  int32_t code = mndProcessDropStreamReq(pReq);
×
3104
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3105
    pReq->info.rsp = rpcMallocCont(1);
×
UNCOV
3106
    if (pReq->info.rsp == NULL) {
×
UNCOV
3107
      return terrno;
×
3108
    }
3109

UNCOV
3110
    pReq->info.rspLen = 1;
×
UNCOV
3111
    pReq->info.noResp = false;
×
UNCOV
3112
    pReq->code = code;
×
3113
  }
UNCOV
3114
  return code;
×
3115
}
3116

UNCOV
3117
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
×
UNCOV
3118
  if (pExecInfo->initTaskList || pMnode == NULL) {
×
UNCOV
3119
    return;
×
3120
  }
3121

UNCOV
3122
  addAllStreamTasksIntoBuf(pMnode, pExecInfo);
×
UNCOV
3123
  pExecInfo->initTaskList = true;
×
3124
}
3125

3126
void mndStreamResetInitTaskListLoadFlag() {
21✔
3127
  mInfo("reset task list buffer init flag for leader");
21!
3128
  execInfo.initTaskList = false;
21✔
3129
}
21✔
3130

3131
void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
21✔
3132
  execInfo.switchFromFollower = false;
21✔
3133

3134
  if (execInfo.role == NODE_ROLE_UNINIT) {
21!
3135
    execInfo.role = role;
21✔
3136
    if (role == NODE_ROLE_LEADER) {
21!
3137
      mInfo("init mnode is set to leader");
21!
3138
    } else {
UNCOV
3139
      mInfo("init mnode is set to follower");
×
3140
    }
3141
  } else {
UNCOV
3142
    if (role == NODE_ROLE_LEADER) {
×
UNCOV
3143
      if (execInfo.role == NODE_ROLE_FOLLOWER) {
×
UNCOV
3144
        execInfo.role = role;
×
3145
        execInfo.switchFromFollower = true;
×
UNCOV
3146
        mInfo("mnode switch to be leader from follower");
×
3147
      } else {
UNCOV
3148
        mInfo("mnode remain to be leader, do nothing");
×
3149
      }
3150
    } else {  // follower's
UNCOV
3151
      if (execInfo.role == NODE_ROLE_LEADER) {
×
UNCOV
3152
        execInfo.role = role;
×
UNCOV
3153
        mInfo("mnode switch to be follower from leader");
×
3154
      } else {
UNCOV
3155
        mInfo("mnode remain to be follower, do nothing");
×
3156
      }
3157
    }
3158
  }
3159
}
21✔
3160

UNCOV
3161
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
×
UNCOV
3162
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
3163
  SStreamObj *pStream = NULL;
×
UNCOV
3164
  void       *pIter = NULL;
×
3165

3166
  while (1) {
UNCOV
3167
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
UNCOV
3168
    if (pIter == NULL) {
×
UNCOV
3169
      break;
×
3170
    }
3171

UNCOV
3172
    saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
×
UNCOV
3173
    sdbRelease(pSdb, pStream);
×
3174
  }
UNCOV
3175
}
×
3176

UNCOV
3177
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
×
UNCOV
3178
  STrans *pTrans = NULL;
×
3179
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
×
3180
                               "update checkpoint-info", &pTrans);
UNCOV
3181
  if (pTrans == NULL || code) {
×
UNCOV
3182
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3183
    return code;
×
3184
  }
3185

3186
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
×
3187
  if (code) {
×
UNCOV
3188
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3189
    mndTransDrop(pTrans);
×
UNCOV
3190
    return code;
×
3191
  }
3192

3193
  code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
×
3194
  if (code) {
×
UNCOV
3195
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3196
    mndTransDrop(pTrans);
×
UNCOV
3197
    return code;
×
3198
  }
3199

3200
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
3201
  if (code) {
×
UNCOV
3202
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3203
    mndTransDrop(pTrans);
×
UNCOV
3204
    return code;
×
3205
  }
3206

3207
  code = mndTransPrepare(pMnode, pTrans);
×
3208
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3209
    mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
3210
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3211
    mndTransDrop(pTrans);
×
UNCOV
3212
    return code;
×
3213
  }
3214

UNCOV
3215
  sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
3216
  mndTransDrop(pTrans);
×
3217

UNCOV
3218
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
3219
}
3220

UNCOV
3221
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
×
UNCOV
3222
  SMnode      *pMnode = pReq->info.node;
×
UNCOV
3223
  int32_t      code = 0;
×
UNCOV
3224
  SOrphanTask *pTask = NULL;
×
UNCOV
3225
  int32_t      i = 0;
×
UNCOV
3226
  STrans      *pTrans = NULL;
×
UNCOV
3227
  int32_t      numOfTasks = 0;
×
3228

3229
  SMStreamDropOrphanMsg msg = {0};
×
UNCOV
3230
  code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
×
UNCOV
3231
  if (code) {
×
UNCOV
3232
    return code;
×
3233
  }
3234

3235
  numOfTasks = taosArrayGetSize(msg.pList);
×
UNCOV
3236
  if (numOfTasks == 0) {
×
UNCOV
3237
    mDebug("no orphan tasks to drop, no need to create trans");
×
UNCOV
3238
    goto _err;
×
3239
  }
3240

UNCOV
3241
  mDebug("create trans to drop %d orphan tasks", numOfTasks);
×
3242

UNCOV
3243
  i = 0;
×
UNCOV
3244
  while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) {
×
UNCOV
3245
    i += 1;
×
3246
  }
3247

UNCOV
3248
  if (pTask == NULL) {
×
UNCOV
3249
    mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
×
UNCOV
3250
    goto _err;
×
3251
  }
3252

3253
  // check if it is conflict with other trans in both sourceDb and targetDb.
UNCOV
3254
  code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
×
UNCOV
3255
  if (code) {
×
UNCOV
3256
    goto _err;
×
3257
  }
3258

UNCOV
3259
  SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
×
3260

3261
  code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
×
UNCOV
3262
  if (pTrans == NULL || code != 0) {
×
UNCOV
3263
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
3264
    goto _err;
×
3265
  }
3266

UNCOV
3267
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
×
UNCOV
3268
  if (code) {
×
UNCOV
3269
    goto _err;
×
3270
  }
3271

3272
  // drop all tasks
UNCOV
3273
  if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
×
UNCOV
3274
    mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
×
UNCOV
3275
    goto _err;
×
3276
  }
3277

3278
  // drop stream
UNCOV
3279
  if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
UNCOV
3280
    goto _err;
×
3281
  }
3282

3283
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
3284
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3285
    mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
3286
    goto _err;
×
3287
  }
3288

UNCOV
3289
_err:
×
UNCOV
3290
  tDestroyDropOrphanTaskMsg(&msg);
×
UNCOV
3291
  mndTransDrop(pTrans);
×
3292

UNCOV
3293
  if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3294
    mDebug("create drop %d orphan tasks trans succ", numOfTasks);
×
3295
  }
UNCOV
3296
  return code;
×
3297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc