• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3910

23 Apr 2025 02:47AM UTC coverage: 62.362% (-0.7%) from 63.063%
#3910

push

travis-ci

web-flow
docs(datain): add missing health status types (#30828)

155061 of 317305 branches covered (48.87%)

Branch coverage included in aggregate %.

240172 of 316469 relevant lines covered (75.89%)

6269478.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.74
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
#define MAX_CHKPT_EXEC_ELAPSED (600*1000*3)  // 600s
20

21
typedef struct SKeyInfo {
22
  void   *pKey;
23
  int32_t keyLen;
24
} SKeyInfo;
25

26
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
27
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
28
}
29

30
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
1,196✔
31
  SStreamTransInfo info = {
1,196✔
32
      .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
1,196✔
33
  return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
1,196✔
34
}
35

36
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans) {
2,603✔
37
  size_t  keyLen = 0;
2,603✔
38
  void   *pIter = NULL;
2,603✔
39
  SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
2,603✔
40
  int32_t numOfChkpt = 0;
2,603✔
41
  int64_t now = taosGetTimestampMs();
2,603✔
42

43
  if (pNumOfActiveChkpt != NULL) {
2,603✔
44
    *pNumOfActiveChkpt = 0;
1,014✔
45
  }
46

47
  if (pList == NULL) {
2,603!
48
    return terrno;
×
49
  }
50

51
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
4,416✔
52
    SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
1,813✔
53

54
    // let's clear the finished trans
55
    STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
1,813✔
56
    if (pTrans == NULL) {
1,813✔
57
      void *pKey = taosHashGetKey(pEntry, &keyLen);
1,128✔
58
      // key is the name of src/dst db name
59
      SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
1,128✔
60
      mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
1,128✔
61
             pEntry->streamId, pEntry->name, pEntry->startTime);
62
      void* p = taosArrayPush(pList, &info);
1,128✔
63
      if (p == NULL) {
1,128!
64
        return terrno;
×
65
      }
66
    } else {
67
      if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
685✔
68
        numOfChkpt++;
107✔
69

70
        // last for 10min, kill it
71
        int64_t dur = now - pTrans->createdTime;
107✔
72
        if ((dur >= MAX_CHKPT_EXEC_ELAPSED) && (pLongChkptTrans != NULL)) {
107!
73
          mInfo("long chkpt transId:%d, start:%" PRId64
×
74
                " exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status",
75
                pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0));
76
          void* p = taosArrayPush(pLongChkptTrans, pEntry);
×
77
          if (p == NULL) {
×
78
            mError("failed to add long checkpoint trans, transId:%d, code:%s", pEntry->transId, tstrerror(terrno));
×
79
          }
80
        }
81
      }
82
      mndReleaseTrans(pMnode, pTrans);
685✔
83
    }
84
  }
85

86
  int32_t size = taosArrayGetSize(pList);
2,603✔
87
  for (int32_t i = 0; i < size; ++i) {
3,731✔
88
    SKeyInfo *pKey = taosArrayGet(pList, i);
1,128✔
89
    if (pKey == NULL) {
1,128!
90
      continue;
×
91
    }
92

93
    int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
1,128✔
94
    if (code != 0) {
1,128!
95
      taosArrayDestroy(pList);
×
96
      return code;
×
97
    }
98
  }
99

100
  mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
2,603✔
101
         taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
102

103
  taosArrayDestroy(pList);
2,603✔
104

105
  if (pNumOfActiveChkpt != NULL) {
2,603✔
106
    *pNumOfActiveChkpt = numOfChkpt;
1,014✔
107
  }
108

109
  return 0;
2,603✔
110
}
111

112
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
1,195✔
113
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
1,195✔
114
  if (num <= 0) {
1,195✔
115
    return 0;
183✔
116
  }
117

118
  // if any task updates exist, any other stream trans are not allowed to be created
119
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
1,012✔
120
  if (code) {
1,012!
121
    mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
×
122
  }
123

124
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
1,012✔
125
  if (pEntry != NULL) {
1,012✔
126
    SStreamTransInfo tInfo = *pEntry;
16✔
127

128
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
16✔
129
      if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
15!
130
          (strcmp(pTransName, MND_STREAM_STOP_NAME) != 0)) {
×
131
        mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
132
              tInfo.name);
133
        return TSDB_CODE_MND_TRANS_CONFLICT;
1✔
134
      } else {
135
        mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
15!
136
      }
137
    } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
1!
138
               (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
×
139
               (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
×
140
               (strcmp(tInfo.name, MND_STREAM_CHKPT_CONSEN_NAME) == 0) ||
×
141
               strcmp(tInfo.name, MND_STREAM_STOP_NAME) == 0) {
×
142
      mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
1!
143
            tInfo.name);
144
      return TSDB_CODE_MND_TRANS_CONFLICT;
1✔
145
    }
146
  } else {
147
    mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
996✔
148
  }
149

150
  return TSDB_CODE_SUCCESS;
1,011✔
151
}
152

153
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
154
// For a given stream:
155
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
156
// 2. create/drop/reset/update/chkpt-consensus trans are conflict with any other trans.
157
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
1,195✔
158
  if (lock) {
1,195✔
159
    streamMutexLock(&execInfo.lock);
645✔
160
  }
161

162
  int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
1,195✔
163

164
  if (lock) {
1,195✔
165
    streamMutexUnlock(&execInfo.lock);
645✔
166
  }
167

168
  return code;
1,195✔
169
}
170

171
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
621✔
172
  streamMutexLock(&execInfo.lock);
621✔
173
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
621✔
174
  if (num <= 0) {
621✔
175
    streamMutexUnlock(&execInfo.lock);
44✔
176
    return 0;
44✔
177
  }
178

179
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
577✔
180
  if (code) {
577!
181
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
182
  }
183

184
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
577✔
185
  if (pEntry != NULL) {
577✔
186
    SStreamTransInfo tInfo = *pEntry;
568✔
187
    streamMutexUnlock(&execInfo.lock);
568✔
188

189
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
568!
190
        strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
568!
191
      return tInfo.transId;
×
192
    }
193
  } else {
194
    streamMutexUnlock(&execInfo.lock);
9✔
195
  }
196

197
  return 0;
577✔
198
}
199

200
int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
2,248✔
201
                      const char *pMsg, STrans **pTrans1) {
202
  *pTrans1 = NULL;
2,248✔
203
  terrno = 0;
2,248✔
204

205
  int32_t code = 0;
2,248✔
206
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
2,248✔
207
  if (p == NULL) {
2,248!
208
    mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
209
    return terrno;
×
210
  }
211

212
  mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
2,248!
213
  p->ableToBeKilled = true;
2,248✔
214

215
  mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
2,248✔
216
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
2,248!
217
    mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
×
218
    mndTransDrop(p);
×
219
    return code;
×
220
  }
221

222
  *pTrans1 = p;
2,248✔
223
  return code;
2,248✔
224
}
225

226
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
2,769✔
227
  int32_t code = 0;
2,769✔
228
  int32_t lino = 0;
2,769✔
229
  void   *buf = NULL;
2,769✔
230

231
  SEncoder encoder;
232
  tEncoderInit(&encoder, NULL, 0);
2,769✔
233
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
2,769!
234
    tEncoderClear(&encoder);
×
235
    TSDB_CHECK_CODE(code, lino, _over);
×
236
  }
237

238
  int32_t tlen = encoder.pos;
2,769✔
239
  tEncoderClear(&encoder);
2,769✔
240

241
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
2,769✔
242
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
2,769✔
243
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
2,769!
244

245
  buf = taosMemoryMalloc(tlen);
2,769!
246
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
2,769!
247

248
  tEncoderInit(&encoder, buf, tlen);
2,769✔
249
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
2,769!
250
    tEncoderClear(&encoder);
×
251
    TSDB_CHECK_CODE(code, lino, _over);
×
252
  }
253

254
  tEncoderClear(&encoder);
2,769✔
255

256
  int32_t dataPos = 0;
2,769✔
257
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
2,769!
258
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
2,769!
259
  SDB_SET_DATALEN(pRaw, dataPos, _over);
2,769!
260

261
_over:
2,769✔
262
  taosMemoryFreeClear(buf);
2,769!
263
  if (code != TSDB_CODE_SUCCESS) {
2,769!
264
    mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
×
265
    sdbFreeRaw(pRaw);
×
266
    terrno = code;
×
267
    return NULL;
×
268
  }
269

270
  terrno = 0;
2,769✔
271
  mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
2,769✔
272
         pStream->checkpointId);
273
  return pRaw;
2,769✔
274
}
275

276
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
2,301✔
277
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
2,301✔
278
  if (pCommitRaw == NULL) {
2,301!
279
    mError("failed to encode stream since %s", terrstr());
×
280
    mndTransDrop(pTrans);
×
281
    return terrno;
×
282
  }
283

284
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
2,301!
285
    mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
286
    sdbFreeRaw(pCommitRaw);
×
287
    mndTransDrop(pTrans);
×
288
    return terrno;
×
289
  }
290

291
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
2,301!
292
    mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
293
    sdbFreeRaw(pCommitRaw);
×
294
    mndTransDrop(pTrans);
×
295
    return terrno;
×
296
  }
297

298
  return 0;
2,301✔
299
}
300

301
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
14,779✔
302
                       int32_t retryCode, int32_t acceptCode) {
303
  STransAction action = {.epSet = *pEpset,
14,779✔
304
                         .contLen = contLen,
305
                         .pCont = pCont,
306
                         .msgType = msgType,
307
                         .retryCode = retryCode,
308
                         .acceptableCode = acceptCode};
309
  return mndTransAppendRedoAction(pTrans, &action);
14,779✔
310
}
311

312
bool isNodeUpdateTransActive() {
1,056✔
313
  bool  exist = false;
1,056✔
314
  void *pIter = NULL;
1,056✔
315

316
  streamMutexLock(&execInfo.lock);
1,056✔
317

318
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
1,570✔
319
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
514✔
320
    if (strcmp(pTransInfo->name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
514!
321
      mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed",
×
322
             pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime);
323
      exist = true;
×
324
    }
325
  }
326

327
  streamMutexUnlock(&execInfo.lock);
1,056✔
328
  return exist;
1,056✔
329
}
330

331
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
4✔
332
  void *pIter = NULL;
4✔
333

334
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
5✔
335
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
1✔
336
    if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
1!
337
      continue;
×
338
    }
339

340
    SStreamObj *pStream = NULL;
1✔
341
    int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
1✔
342
    if (pStream != NULL && code == 0) {
1!
343
      if (identicalName(pStream->sourceDb, pDBName, len)) {
×
344
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
×
345
      } else if (identicalName(pStream->targetDb, pDBName, len)) {
×
346
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
×
347
      }
348

349
      mndReleaseStream(pMnode, pStream);
×
350
    }
351
  }
352

353
  return TSDB_CODE_SUCCESS;
4✔
354
}
355

356
// kill all trans in the dst DB
357
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
4✔
358
  mDebug("start to clear checkpoints in all Dbs");
4✔
359
  char p[128] = {0};
4✔
360

361
  void *pIter = NULL;
4✔
362
  while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
8✔
363
    char *pDb = (char *)pIter;
4✔
364

365
    size_t len = 0;
4✔
366
    void  *pKey = taosHashGetKey(pDb, &len);
4✔
367
    int cpLen = (127 < len) ? 127 : len;
4✔
368
    TAOS_STRNCPY(p, pKey, cpLen);
4✔
369
    p[cpLen] = '\0';
4✔
370

371
    int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
4✔
372
    if (code) {
4!
373
      mError("failed to kill trans, transId:%p", pKey);
×
374
    } else {
375
      mDebug("clear checkpoint trans in Db:%s", p);
4✔
376
    }
377
  }
378

379
  mDebug("complete clear checkpoints in all Dbs");
4✔
380
}
4✔
381

382
void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) {
×
383
  int32_t code = 0;
×
384
  int64_t now = taosGetTimestampMs();
×
385
  int32_t num = taosArrayGetSize(pLongChkpts);
×
386

387
  mInfo("start to kill %d long checkpoint trans", num);
×
388

389
  for(int32_t i = 0; i < num; ++i) {
×
390
    SStreamTransInfo* pTrans = (SStreamTransInfo*) taosArrayGet(pLongChkpts, i);
×
391
    if (pTrans == NULL) {
×
392
      continue;
×
393
    }
394

395
    double el = (now - pTrans->startTime) / 1000.0;
×
396
    mInfo("stream:0x%" PRIx64 " start to kill ongoing long checkpoint transId:%d, elapsed time:%.2fs. killed",
×
397
          pTrans->streamId, pTrans->transId, el);
398

399
    SStreamObj *p = NULL;
×
400
    code = mndGetStreamObj(pMnode, pTrans->streamId, &p);
×
401
    if (code == 0 && p != NULL) {
×
402
      mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb);
×
403

404
      mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name,
×
405
             pTrans->streamId, pTrans->transId, p->checkpointId);
406

407
      code = mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId);
×
408
      if (code) {
×
409
        mError("stream:%s 0x%"PRIx64" failed to create reset stream task, code:%s", p->name, p->uid, tstrerror(code));
×
410
      }
411
      sdbRelease(pMnode->pSdb, p);
×
412
    }
413
  }
414
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc