• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3798

31 Mar 2025 10:39AM UTC coverage: 9.424% (-20.9%) from 30.372%
#3798

push

travis-ci

happyguoxy
test:add test cases

21549 of 307601 branches covered (7.01%)

Branch coverage included in aggregate %.

36084 of 303967 relevant lines covered (11.87%)

58620.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
#define MAX_CHKPT_EXEC_ELAPSED (600*1000*3)  // 600s
20

21
typedef struct SKeyInfo {
22
  void   *pKey;
23
  int32_t keyLen;
24
} SKeyInfo;
25

26
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
27
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
28
}
29

30
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
×
31
  SStreamTransInfo info = {
×
32
      .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
×
33
  return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
×
34
}
35

36
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans) {
×
37
  size_t  keyLen = 0;
×
38
  void   *pIter = NULL;
×
39
  SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
×
40
  int32_t numOfChkpt = 0;
×
41
  int64_t now = taosGetTimestampMs();
×
42

43
  if (pNumOfActiveChkpt != NULL) {
×
44
    *pNumOfActiveChkpt = 0;
×
45
  }
46

47
  if (pList == NULL) {
×
48
    return terrno;
×
49
  }
50

51
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
×
52
    SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
×
53

54
    // let's clear the finished trans
55
    STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
×
56
    if (pTrans == NULL) {
×
57
      void *pKey = taosHashGetKey(pEntry, &keyLen);
×
58
      // key is the name of src/dst db name
59
      SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
×
60
      mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
×
61
             pEntry->streamId, pEntry->name, pEntry->startTime);
62
      void* p = taosArrayPush(pList, &info);
×
63
      if (p == NULL) {
×
64
        return terrno;
×
65
      }
66
    } else {
67
      if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
×
68
        numOfChkpt++;
×
69

70
        // last for 10min, kill it
71
        int64_t dur = now - pTrans->createdTime;
×
72
        if ((dur >= MAX_CHKPT_EXEC_ELAPSED) && (pLongChkptTrans != NULL)) {
×
73
          mInfo("long chkpt transId:%d, start:%" PRId64
×
74
                " exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status",
75
                pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0));
76
          void* p = taosArrayPush(pLongChkptTrans, pEntry);
×
77
          if (p == NULL) {
×
78
            mError("failed to add long checkpoint trans, transId:%d, code:%s", pEntry->transId, tstrerror(terrno));
×
79
          }
80
        }
81
      }
82
      mndReleaseTrans(pMnode, pTrans);
×
83
    }
84
  }
85

86
  int32_t size = taosArrayGetSize(pList);
×
87
  for (int32_t i = 0; i < size; ++i) {
×
88
    SKeyInfo *pKey = taosArrayGet(pList, i);
×
89
    if (pKey == NULL) {
×
90
      continue;
×
91
    }
92

93
    int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
×
94
    if (code != 0) {
×
95
      taosArrayDestroy(pList);
×
96
      return code;
×
97
    }
98
  }
99

100
  mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
×
101
         taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
102

103
  taosArrayDestroy(pList);
×
104

105
  if (pNumOfActiveChkpt != NULL) {
×
106
    *pNumOfActiveChkpt = numOfChkpt;
×
107
  }
108

109
  return 0;
×
110
}
111

112
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
×
113
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
×
114
  if (num <= 0) {
×
115
    return 0;
×
116
  }
117

118
  // if any task updates exist, any other stream trans are not allowed to be created
119
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
×
120
  if (code) {
×
121
    mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
×
122
  }
123

124
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
×
125
  if (pEntry != NULL) {
×
126
    SStreamTransInfo tInfo = *pEntry;
×
127

128
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
×
129
      if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
×
130
          (strcmp(pTransName, MND_STREAM_STOP_NAME) != 0)) {
×
131
        mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
132
              tInfo.name);
133
        return TSDB_CODE_MND_TRANS_CONFLICT;
×
134
      } else {
135
        mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
×
136
      }
137
    } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
×
138
               (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
×
139
               (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
×
140
               (strcmp(tInfo.name, MND_STREAM_CHKPT_CONSEN_NAME) == 0) ||
×
141
               strcmp(tInfo.name, MND_STREAM_STOP_NAME) == 0) {
×
142
      mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
143
            tInfo.name);
144
      return TSDB_CODE_MND_TRANS_CONFLICT;
×
145
    }
146
  } else {
147
    mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
×
148
  }
149

150
  return TSDB_CODE_SUCCESS;
×
151
}
152

153
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
154
// For a given stream:
155
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
156
// 2. create/drop/reset/update/chkpt-consensus trans are conflict with any other trans.
157
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
×
158
  if (lock) {
×
159
    streamMutexLock(&execInfo.lock);
×
160
  }
161

162
  int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
×
163

164
  if (lock) {
×
165
    streamMutexUnlock(&execInfo.lock);
×
166
  }
167

168
  return code;
×
169
}
170

171
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
×
172
  streamMutexLock(&execInfo.lock);
×
173
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
×
174
  if (num <= 0) {
×
175
    streamMutexUnlock(&execInfo.lock);
×
176
    return 0;
×
177
  }
178

179
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
×
180
  if (code) {
×
181
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
182
  }
183

184
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
×
185
  if (pEntry != NULL) {
×
186
    SStreamTransInfo tInfo = *pEntry;
×
187
    streamMutexUnlock(&execInfo.lock);
×
188

189
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
×
190
        strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
×
191
      return tInfo.transId;
×
192
    }
193
  } else {
194
    streamMutexUnlock(&execInfo.lock);
×
195
  }
196

197
  return 0;
×
198
}
199

200
int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
×
201
                      const char *pMsg, STrans **pTrans1) {
202
  *pTrans1 = NULL;
×
203
  terrno = 0;
×
204

205
  int32_t code = 0;
×
206
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
×
207
  if (p == NULL) {
×
208
    mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
209
    return terrno;
×
210
  }
211

212
  mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
×
213

214
  mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
×
215
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
×
216
    mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
×
217
    mndTransDrop(p);
×
218
    return code;
×
219
  }
220

221
  *pTrans1 = p;
×
222
  return code;
×
223
}
224

225
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
×
226
  int32_t code = 0;
×
227
  int32_t lino = 0;
×
228
  void   *buf = NULL;
×
229

230
  SEncoder encoder;
231
  tEncoderInit(&encoder, NULL, 0);
×
232
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
×
233
    tEncoderClear(&encoder);
×
234
    TSDB_CHECK_CODE(code, lino, _over);
×
235
  }
236

237
  int32_t tlen = encoder.pos;
×
238
  tEncoderClear(&encoder);
×
239

240
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
×
241
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
×
242
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
×
243

244
  buf = taosMemoryMalloc(tlen);
×
245
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
×
246

247
  tEncoderInit(&encoder, buf, tlen);
×
248
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
×
249
    tEncoderClear(&encoder);
×
250
    TSDB_CHECK_CODE(code, lino, _over);
×
251
  }
252

253
  tEncoderClear(&encoder);
×
254

255
  int32_t dataPos = 0;
×
256
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
×
257
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
×
258
  SDB_SET_DATALEN(pRaw, dataPos, _over);
×
259

260
_over:
×
261
  taosMemoryFreeClear(buf);
×
262
  if (code != TSDB_CODE_SUCCESS) {
×
263
    mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
×
264
    sdbFreeRaw(pRaw);
×
265
    terrno = code;
×
266
    return NULL;
×
267
  }
268

269
  terrno = 0;
×
270
  mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
×
271
         pStream->checkpointId);
272
  return pRaw;
×
273
}
274

275
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
×
276
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
×
277
  if (pCommitRaw == NULL) {
×
278
    mError("failed to encode stream since %s", terrstr());
×
279
    mndTransDrop(pTrans);
×
280
    return terrno;
×
281
  }
282

283
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
×
284
    mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
285
    sdbFreeRaw(pCommitRaw);
×
286
    mndTransDrop(pTrans);
×
287
    return terrno;
×
288
  }
289

290
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
×
291
    mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
292
    sdbFreeRaw(pCommitRaw);
×
293
    mndTransDrop(pTrans);
×
294
    return terrno;
×
295
  }
296

297
  return 0;
×
298
}
299

300
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
×
301
                       int32_t retryCode, int32_t acceptCode) {
302
  STransAction action = {.epSet = *pEpset,
×
303
                         .contLen = contLen,
304
                         .pCont = pCont,
305
                         .msgType = msgType,
306
                         .retryCode = retryCode,
307
                         .acceptableCode = acceptCode};
308
  return mndTransAppendRedoAction(pTrans, &action);
×
309
}
310

311
bool isNodeUpdateTransActive() {
×
312
  bool  exist = false;
×
313
  void *pIter = NULL;
×
314

315
  streamMutexLock(&execInfo.lock);
×
316

317
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
×
318
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
×
319
    if (strcmp(pTransInfo->name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
×
320
      mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed",
×
321
             pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime);
322
      exist = true;
×
323
    }
324
  }
325

326
  streamMutexUnlock(&execInfo.lock);
×
327
  return exist;
×
328
}
329

330
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
×
331
  void *pIter = NULL;
×
332

333
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
×
334
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
×
335
    if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
×
336
      continue;
×
337
    }
338

339
    SStreamObj *pStream = NULL;
×
340
    int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
×
341
    if (pStream != NULL && code == 0) {
×
342
      if (identicalName(pStream->sourceDb, pDBName, len)) {
×
343
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
×
344
      } else if (identicalName(pStream->targetDb, pDBName, len)) {
×
345
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
×
346
      }
347

348
      mndReleaseStream(pMnode, pStream);
×
349
    }
350
  }
351

352
  return TSDB_CODE_SUCCESS;
×
353
}
354

355
// kill all trans in the dst DB
356
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
×
357
  mDebug("start to clear checkpoints in all Dbs");
×
358
  char p[128] = {0};
×
359

360
  void *pIter = NULL;
×
361
  while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
×
362
    char *pDb = (char *)pIter;
×
363

364
    size_t len = 0;
×
365
    void  *pKey = taosHashGetKey(pDb, &len);
×
366
    int cpLen = (127 < len) ? 127 : len;
×
367
    TAOS_STRNCPY(p, pKey, cpLen);
×
368
    p[cpLen] = '\0';
×
369

370
    int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
×
371
    if (code) {
×
372
      mError("failed to kill trans, transId:%p", pKey);
×
373
    } else {
374
      mDebug("clear checkpoint trans in Db:%s", p);
×
375
    }
376
  }
377

378
  mDebug("complete clear checkpoints in all Dbs");
×
379
}
×
380

381
void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) {
×
382
  int32_t code = 0;
×
383
  int64_t now = taosGetTimestampMs();
×
384
  int32_t num = taosArrayGetSize(pLongChkpts);
×
385

386
  mInfo("start to kill %d long checkpoint trans", num);
×
387

388
  for(int32_t i = 0; i < num; ++i) {
×
389
    SStreamTransInfo* pTrans = (SStreamTransInfo*) taosArrayGet(pLongChkpts, i);
×
390
    if (pTrans == NULL) {
×
391
      continue;
×
392
    }
393

394
    double el = (now - pTrans->startTime) / 1000.0;
×
395
    mInfo("stream:0x%" PRIx64 " start to kill ongoing long checkpoint transId:%d, elapsed time:%.2fs. killed",
×
396
          pTrans->streamId, pTrans->transId, el);
397

398
    SStreamObj *p = NULL;
×
399
    code = mndGetStreamObj(pMnode, pTrans->streamId, &p);
×
400
    if (code == 0 && p != NULL) {
×
401
      mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb);
×
402

403
      mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name,
×
404
             pTrans->streamId, pTrans->transId, p->checkpointId);
405

406
      code = mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId);
×
407
      if (code) {
×
408
        mError("stream:%s 0x%"PRIx64" failed to create reset stream task, code:%s", p->name, p->uid, tstrerror(code));
×
409
      }
410
      sdbRelease(pMnode->pSdb, p);
×
411
    }
412
  }
413
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc