• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3626

28 Feb 2025 03:34AM UTC coverage: 63.764% (+0.1%) from 63.633%
#3626

push

travis-ci

web-flow
Merge pull request #29961 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

149233 of 299935 branches covered (49.76%)

Branch coverage included in aggregate %.

53 of 91 new or added lines in 8 files covered. (58.24%)

3267 existing lines in 138 files now uncovered.

233601 of 300457 relevant lines covered (77.75%)

17374158.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.39
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
#define MAX_CHKPT_EXEC_ELAPSED (600*1000)  // 600s
20

21
typedef struct SKeyInfo {
22
  void   *pKey;
23
  int32_t keyLen;
24
} SKeyInfo;
25

26
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
27
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
28
}
29

30
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
4,706✔
31
  SStreamTransInfo info = {
4,706✔
32
      .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
4,706✔
33
  return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
4,706✔
34
}
35

36
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans) {
6,647✔
37
  size_t  keyLen = 0;
6,647✔
38
  void   *pIter = NULL;
6,647✔
39
  SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
6,647✔
40
  int32_t numOfChkpt = 0;
6,647✔
41
  int64_t now = taosGetTimestampMs();
6,647✔
42

43
  if (pNumOfActiveChkpt != NULL) {
6,647✔
44
    *pNumOfActiveChkpt = 0;
1,703✔
45
  }
46

47
  if (pList == NULL) {
6,647!
48
    return terrno;
×
49
  }
50

51
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
12,654✔
52
    SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
6,007✔
53

54
    // let's clear the finished trans
55
    STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
6,007✔
56
    if (pTrans == NULL) {
6,007✔
57
      void *pKey = taosHashGetKey(pEntry, &keyLen);
4,426✔
58
      // key is the name of src/dst db name
59
      SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
4,426✔
60
      mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
4,426✔
61
             pEntry->streamId, pEntry->name, pEntry->startTime);
62
      void* p = taosArrayPush(pList, &info);
4,426✔
63
      if (p == NULL) {
4,426!
64
        return terrno;
×
65
      }
66
    } else {
67
      if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
1,581✔
68
        numOfChkpt++;
211✔
69

70
        // last for 10min, kill it
71
        int64_t dur = now - pTrans->createdTime;
211✔
72
        if ((dur >= MAX_CHKPT_EXEC_ELAPSED) && (pLongChkptTrans != NULL)) {
211!
NEW
73
          mInfo("long chkpt transId:%d, start:%" PRId64
×
74
                " exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status",
75
                pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0));
NEW
76
          void* p = taosArrayPush(pLongChkptTrans, pEntry);
×
NEW
77
          if (p == NULL) {
×
NEW
78
            mError("failed to add long checkpoint trans, transId:%d, code:%s", pEntry->transId, tstrerror(terrno));
×
79
          }
80
        }
81
      }
82
      mndReleaseTrans(pMnode, pTrans);
1,581✔
83
    }
84
  }
85

86
  int32_t size = taosArrayGetSize(pList);
6,647✔
87
  for (int32_t i = 0; i < size; ++i) {
11,073✔
88
    SKeyInfo *pKey = taosArrayGet(pList, i);
4,426✔
89
    if (pKey == NULL) {
4,426!
90
      continue;
×
91
    }
92

93
    int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
4,426✔
94
    if (code != 0) {
4,426!
95
      taosArrayDestroy(pList);
×
96
      return code;
×
97
    }
98
  }
99

100
  mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
6,647✔
101
         taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
102

103
  taosArrayDestroy(pList);
6,647✔
104

105
  if (pNumOfActiveChkpt != NULL) {
6,647✔
106
    *pNumOfActiveChkpt = numOfChkpt;
1,703✔
107
  }
108

109
  return 0;
6,647✔
110
}
111

112
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
4,553✔
113
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
4,553✔
114
  if (num <= 0) {
4,553✔
115
    return 0;
903✔
116
  }
117

118
  // if any task updates exist, any other stream trans are not allowed to be created
119
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
3,650✔
120
  if (code) {
3,650!
121
    mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
×
122
  }
123

124
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
3,650✔
125
  if (pEntry != NULL) {
3,650✔
126
    SStreamTransInfo tInfo = *pEntry;
33✔
127

128
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
33✔
129
      if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
29!
130
          (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
×
131
        mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
132
              tInfo.name);
133
        return TSDB_CODE_MND_TRANS_CONFLICT;
2✔
134
      } else {
135
        mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
29✔
136
      }
137
    } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
4!
138
               (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
2!
139
               (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
2!
140
               strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) {
2!
141
      mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
2!
142
            tInfo.name);
143
      return TSDB_CODE_MND_TRANS_CONFLICT;
2✔
144
    }
145
  } else {
146
    mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
3,617✔
147
  }
148

149
  return TSDB_CODE_SUCCESS;
3,648✔
150
}
151

152
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
153
// For a given stream:
154
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
155
// 2. create/drop/reset/update trans are conflict with any other trans.
156
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
4,553✔
157
  if (lock) {
4,553✔
158
    streamMutexLock(&execInfo.lock);
2,701✔
159
  }
160

161
  int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
4,553✔
162

163
  if (lock) {
4,553✔
164
    streamMutexUnlock(&execInfo.lock);
2,701✔
165
  }
166

167
  return code;
4,553✔
168
}
169

170
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
1,351✔
171
  streamMutexLock(&execInfo.lock);
1,351✔
172
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
1,351✔
173
  if (num <= 0) {
1,351✔
174
    streamMutexUnlock(&execInfo.lock);
57✔
175
    return 0;
57✔
176
  }
177

178
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
1,294✔
179
  if (code) {
1,294!
180
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
181
  }
182

183
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
1,294✔
184
  if (pEntry != NULL) {
1,294✔
185
    SStreamTransInfo tInfo = *pEntry;
1,269✔
186
    streamMutexUnlock(&execInfo.lock);
1,269✔
187

188
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
1,269!
189
        strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
1,269!
190
      return tInfo.transId;
×
191
    }
192
  } else {
193
    streamMutexUnlock(&execInfo.lock);
25✔
194
  }
195

196
  return 0;
1,294✔
197
}
198

199
int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
6,461✔
200
                      const char *pMsg, STrans **pTrans1) {
201
  *pTrans1 = NULL;
6,461✔
202
  terrno = 0;
6,461✔
203

204
  int32_t code = 0;
6,461✔
205
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
6,461✔
206
  if (p == NULL) {
6,461!
207
    mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
208
    return terrno;
×
209
  }
210

211
  mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
6,461!
212

213
  mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
6,461✔
214
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
6,461!
215
    mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
×
216
    mndTransDrop(p);
×
217
    return code;
×
218
  }
219

220
  *pTrans1 = p;
6,461✔
221
  return code;
6,461✔
222
}
223

224
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
7,330✔
225
  int32_t code = 0;
7,330✔
226
  int32_t lino = 0;
7,330✔
227
  void   *buf = NULL;
7,330✔
228

229
  SEncoder encoder;
230
  tEncoderInit(&encoder, NULL, 0);
7,330✔
231
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
7,330!
232
    tEncoderClear(&encoder);
×
233
    TSDB_CHECK_CODE(code, lino, _over);
×
234
  }
235

236
  int32_t tlen = encoder.pos;
7,330✔
237
  tEncoderClear(&encoder);
7,330✔
238

239
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
7,330✔
240
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
7,330✔
241
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
7,330!
242

243
  buf = taosMemoryMalloc(tlen);
7,330!
244
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
7,330!
245

246
  tEncoderInit(&encoder, buf, tlen);
7,330✔
247
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
7,330!
248
    tEncoderClear(&encoder);
×
249
    TSDB_CHECK_CODE(code, lino, _over);
×
250
  }
251

252
  tEncoderClear(&encoder);
7,330✔
253

254
  int32_t dataPos = 0;
7,330✔
255
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
7,330!
256
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
7,330!
257
  SDB_SET_DATALEN(pRaw, dataPos, _over);
7,330!
258

259
_over:
7,330✔
260
  taosMemoryFreeClear(buf);
7,330!
261
  if (code != TSDB_CODE_SUCCESS) {
7,330!
262
    mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
×
263
    sdbFreeRaw(pRaw);
×
264
    terrno = code;
×
265
    return NULL;
×
266
  }
267

268
  terrno = 0;
7,330✔
269
  mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
7,330✔
270
         pStream->checkpointId);
271
  return pRaw;
7,330✔
272
}
273

274
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
6,587✔
275
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
6,587✔
276
  if (pCommitRaw == NULL) {
6,587!
277
    mError("failed to encode stream since %s", terrstr());
×
278
    mndTransDrop(pTrans);
×
279
    return terrno;
×
280
  }
281

282
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
6,587!
283
    mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
284
    sdbFreeRaw(pCommitRaw);
×
285
    mndTransDrop(pTrans);
×
286
    return terrno;
×
287
  }
288

289
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
6,587!
290
    mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
291
    sdbFreeRaw(pCommitRaw);
×
292
    mndTransDrop(pTrans);
×
293
    return terrno;
×
294
  }
295

296
  return 0;
6,587✔
297
}
298

299
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
33,719✔
300
                       int32_t retryCode, int32_t acceptCode) {
301
  STransAction action = {.epSet = *pEpset,
33,719✔
302
                         .contLen = contLen,
303
                         .pCont = pCont,
304
                         .msgType = msgType,
305
                         .retryCode = retryCode,
306
                         .acceptableCode = acceptCode};
307
  return mndTransAppendRedoAction(pTrans, &action);
33,719✔
308
}
309

310
bool isNodeUpdateTransActive() {
1,759✔
311
  bool  exist = false;
1,759✔
312
  void *pIter = NULL;
1,759✔
313

314
  streamMutexLock(&execInfo.lock);
1,759✔
315

316
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
2,946✔
317
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
1,187✔
318
    if (strcmp(pTransInfo->name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
1,187!
319
      mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed",
×
320
             pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime);
321
      exist = true;
×
322
    }
323
  }
324

325
  streamMutexUnlock(&execInfo.lock);
1,759✔
326
  return exist;
1,759✔
327
}
328

329
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
8✔
330
  void *pIter = NULL;
8✔
331

332
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
14✔
333
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
6✔
334
    if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
6✔
335
      continue;
5✔
336
    }
337

338
    SStreamObj *pStream = NULL;
1✔
339
    int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
1✔
340
    if (pStream != NULL && code == 0) {
1!
341
      if (identicalName(pStream->sourceDb, pDBName, len)) {
×
342
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
×
343
      } else if (identicalName(pStream->targetDb, pDBName, len)) {
×
344
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
×
345
      }
346

347
      mndReleaseStream(pMnode, pStream);
×
348
    }
349
  }
350

351
  return TSDB_CODE_SUCCESS;
8✔
352
}
353

354
// kill all trans in the dst DB
355
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
8✔
356
  mDebug("start to clear checkpoints in all Dbs");
8✔
357
  char p[128] = {0};
8✔
358

359
  void *pIter = NULL;
8✔
360
  while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
16✔
361
    char *pDb = (char *)pIter;
8✔
362

363
    size_t len = 0;
8✔
364
    void  *pKey = taosHashGetKey(pDb, &len);
8✔
365
    int cpLen = (127 < len) ? 127 : len;
8✔
366
    TAOS_STRNCPY(p, pKey, cpLen);
8✔
367
    p[cpLen] = '\0';
8✔
368

369
    int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
8✔
370
    if (code) {
8!
371
      mError("failed to kill trans, transId:%p", pKey);
×
372
    } else {
373
      mDebug("clear checkpoint trans in Db:%s", p);
8✔
374
    }
375
  }
376

377
  mDebug("complete clear checkpoints in all Dbs");
8✔
378
}
8✔
379

NEW
380
void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) {
×
NEW
381
  int32_t code = 0;
×
NEW
382
  int64_t now = taosGetTimestampMs();
×
NEW
383
  int32_t num = taosArrayGetSize(pLongChkpts);
×
384

NEW
385
  mInfo("start to kill %d long checkpoint trans", num);
×
386

NEW
387
  for(int32_t i = 0; i < num; ++i) {
×
NEW
388
    SStreamTransInfo* pTrans = (SStreamTransInfo*) taosArrayGet(pLongChkpts, i);
×
NEW
389
    if (pTrans == NULL) {
×
NEW
390
      continue;
×
391
    }
392

NEW
393
    double el = (now - pTrans->startTime) / 1000.0;
×
NEW
394
    mInfo("stream:0x%" PRIx64 " start to kill ongoing long checkpoint transId:%d, elapsed time:%.2fs. killed",
×
395
          pTrans->streamId, pTrans->transId, el);
396

NEW
397
    SStreamObj *p = NULL;
×
NEW
398
    code = mndGetStreamObj(pMnode, pTrans->streamId, &p);
×
NEW
399
    if (code == 0 && p != NULL) {
×
NEW
400
      mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb);
×
401

NEW
402
      mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name,
×
403
             pTrans->streamId, pTrans->transId, p->checkpointId);
404

NEW
405
      code = mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId);
×
NEW
406
      if (code) {
×
NEW
407
        mError("stream:%s 0x%"PRIx64" failed to create reset stream task, code:%s", p->name, p->uid, tstrerror(code));
×
408
      }
NEW
409
      sdbRelease(pMnode->pSdb, p);
×
410
    }
411
  }
NEW
412
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc