• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3599

08 Feb 2025 11:23AM UTC coverage: 1.77% (-61.6%) from 63.396%
#3599

push

travis-ci

web-flow
Merge pull request #29712 from taosdata/fix/TD-33652-3.0

fix: reduce write rows from 30w to 3w

3776 of 278949 branches covered (1.35%)

Branch coverage included in aggregate %.

6012 of 274147 relevant lines covered (2.19%)

1642.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
typedef struct SKeyInfo {
20
  void   *pKey;
21
  int32_t keyLen;
22
} SKeyInfo;
23

24
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
25
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
26
}
27

28
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
×
29
  SStreamTransInfo info = {
×
30
      .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
×
31
  return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
×
32
}
33

34
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) {
×
35
  size_t  keyLen = 0;
×
36
  void   *pIter = NULL;
×
37
  SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
×
38
  int32_t numOfChkpt = 0;
×
39

40
  if (pNumOfActiveChkpt != NULL) {
×
41
    *pNumOfActiveChkpt = 0;
×
42
  }
43

44
  if (pList == NULL) {
×
45
    return terrno;
×
46
  }
47

48
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
×
49
    SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
×
50

51
    // let's clear the finished trans
52
    STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
×
53
    if (pTrans == NULL) {
×
54
      void *pKey = taosHashGetKey(pEntry, &keyLen);
×
55
      // key is the name of src/dst db name
56
      SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
×
57
      mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
×
58
             pEntry->streamId, pEntry->name, pEntry->startTime);
59
      void* p = taosArrayPush(pList, &info);
×
60
      if (p == NULL) {
×
61
        return terrno;
×
62
      }
63
    } else {
64
      if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
×
65
        numOfChkpt++;
×
66
      }
67
      mndReleaseTrans(pMnode, pTrans);
×
68
    }
69
  }
70

71
  int32_t size = taosArrayGetSize(pList);
×
72
  for (int32_t i = 0; i < size; ++i) {
×
73
    SKeyInfo *pKey = taosArrayGet(pList, i);
×
74
    if (pKey == NULL) {
×
75
      continue;
×
76
    }
77

78
    int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
×
79
    if (code != 0) {
×
80
      taosArrayDestroy(pList);
×
81
      return code;
×
82
    }
83
  }
84

85
  mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
×
86
         taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
87

88
  taosArrayDestroy(pList);
×
89

90
  if (pNumOfActiveChkpt != NULL) {
×
91
    *pNumOfActiveChkpt = numOfChkpt;
×
92
  }
93

94
  return 0;
×
95
}
96

97
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
×
98
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
×
99
  if (num <= 0) {
×
100
    return 0;
×
101
  }
102

103
  // if any task updates exist, any other stream trans are not allowed to be created
104
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
×
105
  if (code) {
×
106
    mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
×
107
  }
108

109
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
×
110
  if (pEntry != NULL) {
×
111
    SStreamTransInfo tInfo = *pEntry;
×
112

113
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
×
114
      if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
×
115
          (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
×
116
        mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
117
              tInfo.name);
118
        return TSDB_CODE_MND_TRANS_CONFLICT;
×
119
      } else {
120
        mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
×
121
      }
122
    } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
×
123
               (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
×
124
               (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
×
125
               strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) {
×
126
      mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
127
            tInfo.name);
128
      return TSDB_CODE_MND_TRANS_CONFLICT;
×
129
    }
130
  } else {
131
    mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
×
132
  }
133

134
  return TSDB_CODE_SUCCESS;
×
135
}
136

137
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
138
// For a given stream:
139
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
140
// 2. create/drop/reset/update trans are conflict with any other trans.
141
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
×
142
  if (lock) {
×
143
    streamMutexLock(&execInfo.lock);
×
144
  }
145

146
  int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
×
147

148
  if (lock) {
×
149
    streamMutexUnlock(&execInfo.lock);
×
150
  }
151

152
  return code;
×
153
}
154

155
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
×
156
  streamMutexLock(&execInfo.lock);
×
157
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
×
158
  if (num <= 0) {
×
159
    streamMutexUnlock(&execInfo.lock);
×
160
    return 0;
×
161
  }
162

163
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
×
164
  if (code) {
×
165
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
166
  }
167

168
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
×
169
  if (pEntry != NULL) {
×
170
    SStreamTransInfo tInfo = *pEntry;
×
171
    streamMutexUnlock(&execInfo.lock);
×
172

173
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
×
174
        strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
×
175
      return tInfo.transId;
×
176
    }
177
  } else {
178
    streamMutexUnlock(&execInfo.lock);
×
179
  }
180

181
  return 0;
×
182
}
183

184
int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
×
185
                      const char *pMsg, STrans **pTrans1) {
186
  *pTrans1 = NULL;
×
187
  terrno = 0;
×
188

189
  int32_t code = 0;
×
190
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
×
191
  if (p == NULL) {
×
192
    mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
193
    return terrno;
×
194
  }
195

196
  mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
×
197

198
  mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
×
199
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
×
200
    mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
×
201
    mndTransDrop(p);
×
202
    return code;
×
203
  }
204

205
  *pTrans1 = p;
×
206
  return code;
×
207
}
208

209
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
×
210
  int32_t code = 0;
×
211
  int32_t lino = 0;
×
212
  void   *buf = NULL;
×
213

214
  SEncoder encoder;
215
  tEncoderInit(&encoder, NULL, 0);
×
216
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
×
217
    tEncoderClear(&encoder);
×
218
    TSDB_CHECK_CODE(code, lino, _over);
×
219
  }
220

221
  int32_t tlen = encoder.pos;
×
222
  tEncoderClear(&encoder);
×
223

224
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
×
225
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
×
226
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
×
227

228
  buf = taosMemoryMalloc(tlen);
×
229
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
×
230

231
  tEncoderInit(&encoder, buf, tlen);
×
232
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
×
233
    tEncoderClear(&encoder);
×
234
    TSDB_CHECK_CODE(code, lino, _over);
×
235
  }
236

237
  tEncoderClear(&encoder);
×
238

239
  int32_t dataPos = 0;
×
240
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
×
241
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
×
242
  SDB_SET_DATALEN(pRaw, dataPos, _over);
×
243

244
_over:
×
245
  taosMemoryFreeClear(buf);
×
246
  if (code != TSDB_CODE_SUCCESS) {
×
247
    mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
×
248
    sdbFreeRaw(pRaw);
×
249
    terrno = code;
×
250
    return NULL;
×
251
  }
252

253
  terrno = 0;
×
254
  mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
×
255
         pStream->checkpointId);
256
  return pRaw;
×
257
}
258

259
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
×
260
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
×
261
  if (pCommitRaw == NULL) {
×
262
    mError("failed to encode stream since %s", terrstr());
×
263
    mndTransDrop(pTrans);
×
264
    return terrno;
×
265
  }
266

267
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
×
268
    mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
269
    sdbFreeRaw(pCommitRaw);
×
270
    mndTransDrop(pTrans);
×
271
    return terrno;
×
272
  }
273

274
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
×
275
    mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
276
    sdbFreeRaw(pCommitRaw);
×
277
    mndTransDrop(pTrans);
×
278
    return terrno;
×
279
  }
280

281
  return 0;
×
282
}
283

284
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
×
285
                       int32_t retryCode, int32_t acceptCode) {
286
  STransAction action = {.epSet = *pEpset,
×
287
                         .contLen = contLen,
288
                         .pCont = pCont,
289
                         .msgType = msgType,
290
                         .retryCode = retryCode,
291
                         .acceptableCode = acceptCode};
292
  return mndTransAppendRedoAction(pTrans, &action);
×
293
}
294

295
bool isNodeUpdateTransActive() {
×
296
  bool  exist = false;
×
297
  void *pIter = NULL;
×
298

299
  streamMutexLock(&execInfo.lock);
×
300

301
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
×
302
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
×
303
    if (strcmp(pTransInfo->name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
×
304
      mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed",
×
305
             pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime);
306
      exist = true;
×
307
    }
308
  }
309

310
  streamMutexUnlock(&execInfo.lock);
×
311
  return exist;
×
312
}
313

314
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
×
315
  void *pIter = NULL;
×
316

317
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
×
318
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
×
319
    if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
×
320
      continue;
×
321
    }
322

323
    SStreamObj *pStream = NULL;
×
324
    int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
×
325
    if (pStream != NULL && code == 0) {
×
326
      if (identicalName(pStream->sourceDb, pDBName, len)) {
×
327
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
×
328
      } else if (identicalName(pStream->targetDb, pDBName, len)) {
×
329
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
×
330
      }
331

332
      mndReleaseStream(pMnode, pStream);
×
333
    }
334
  }
335

336
  return TSDB_CODE_SUCCESS;
×
337
}
338

339
// kill all trans in the dst DB
340
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
×
341
  mDebug("start to clear checkpoints in all Dbs");
×
342
  char p[128] = {0};
×
343

344
  void *pIter = NULL;
×
345
  while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
×
346
    char *pDb = (char *)pIter;
×
347

348
    size_t len = 0;
×
349
    void  *pKey = taosHashGetKey(pDb, &len);
×
350
    int cpLen = (127 < len) ? 127 : len;
×
351
    TAOS_STRNCPY(p, pKey, cpLen);
×
352
    p[cpLen] = '\0';
×
353

354
    int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
×
355
    if (code) {
×
356
      mError("failed to kill trans, transId:%p", pKey);
×
357
    } else {
358
      mDebug("clear checkpoint trans in Db:%s", p);
×
359
    }
360
  }
361

362
  mDebug("complete clear checkpoints in all Dbs");
×
363
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc