• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3567

26 Dec 2024 02:00AM UTC coverage: 62.733% (+0.3%) from 62.422%
#3567

push

travis-ci

web-flow
Merge pull request #29290 from taosdata/enh/TD-33262-3.0

enh: test coverage of tfs

138620 of 284159 branches covered (48.78%)

Branch coverage included in aggregate %.

20 of 25 new or added lines in 3 files covered. (80.0%)

606 existing lines in 128 files now uncovered.

216140 of 281347 relevant lines covered (76.82%)

18804943.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.71
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
typedef struct SKeyInfo {
20
  void   *pKey;
21
  int32_t keyLen;
22
} SKeyInfo;
23

24
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
25
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
26
}
27

28
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
4,703✔
29
  SStreamTransInfo info = {
4,703✔
30
      .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
4,703✔
31
  return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
4,703✔
32
}
33

34
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) {
5,793✔
35
  size_t  keyLen = 0;
5,793✔
36
  void   *pIter = NULL;
5,793✔
37
  SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
5,793✔
38
  int32_t numOfChkpt = 0;
5,793✔
39

40
  if (pNumOfActiveChkpt != NULL) {
5,793✔
41
    *pNumOfActiveChkpt = 0;
595✔
42
  }
43

44
  if (pList == NULL) {
5,793!
45
    return terrno;
×
46
  }
47

48
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
11,853✔
49
    SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
6,060✔
50

51
    // let's clear the finished trans
52
    STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
6,060✔
53
    if (pTrans == NULL) {
6,060✔
54
      void *pKey = taosHashGetKey(pEntry, &keyLen);
4,473✔
55
      // key is the name of src/dst db name
56
      SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
4,473✔
57
      mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
4,473✔
58
             pEntry->streamId, pEntry->name, pEntry->startTime);
59
      void* p = taosArrayPush(pList, &info);
4,473✔
60
      if (p == NULL) {
4,473!
61
        return terrno;
×
62
      }
63
    } else {
64
      if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
1,587✔
65
        numOfChkpt++;
228✔
66
      }
67
      mndReleaseTrans(pMnode, pTrans);
1,587✔
68
    }
69
  }
70

71
  int32_t size = taosArrayGetSize(pList);
5,793✔
72
  for (int32_t i = 0; i < size; ++i) {
10,266✔
73
    SKeyInfo *pKey = taosArrayGet(pList, i);
4,473✔
74
    if (pKey == NULL) {
4,473!
75
      continue;
×
76
    }
77

78
    int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
4,473✔
79
    if (code != 0) {
4,473!
80
      taosArrayDestroy(pList);
×
81
      return code;
×
82
    }
83
  }
84

85
  mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
5,793✔
86
         taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
87

88
  taosArrayDestroy(pList);
5,793✔
89

90
  if (pNumOfActiveChkpt != NULL) {
5,793✔
91
    *pNumOfActiveChkpt = numOfChkpt;
595✔
92
  }
93

94
  return 0;
5,793✔
95
}
96

97
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
4,633✔
98
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
4,633✔
99
  if (num <= 0) {
4,633✔
100
    return 0;
739✔
101
  }
102

103
  // if any task updates exist, any other stream trans are not allowed to be created
104
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
3,894✔
105
  if (code) {
3,894!
106
    mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
×
107
  }
108

109
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
3,894✔
110
  if (pEntry != NULL) {
3,894✔
111
    SStreamTransInfo tInfo = *pEntry;
42✔
112

113
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
42✔
114
      if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
38!
115
          (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
1!
116
        mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
1!
117
              tInfo.name);
118
        return TSDB_CODE_MND_TRANS_CONFLICT;
1✔
119
      } else {
120
        mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
37✔
121
      }
122
    } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
4!
123
               (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
4!
124
               (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
4!
125
               strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) {
4!
UNCOV
126
      mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
127
            tInfo.name);
UNCOV
128
      return TSDB_CODE_MND_TRANS_CONFLICT;
×
129
    }
130
  } else {
131
    mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
3,852✔
132
  }
133

134
  return TSDB_CODE_SUCCESS;
3,893✔
135
}
136

137
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
138
// For a given stream:
139
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
140
// 2. create/drop/reset/update trans are conflict with any other trans.
141
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
4,633✔
142
  if (lock) {
4,633✔
143
    streamMutexLock(&execInfo.lock);
2,741✔
144
  }
145

146
  int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
4,633✔
147

148
  if (lock) {
4,633✔
149
    streamMutexUnlock(&execInfo.lock);
2,741✔
150
  }
151

152
  return code;
4,633✔
153
}
154

155
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
1,352✔
156
  streamMutexLock(&execInfo.lock);
1,352✔
157
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
1,352✔
158
  if (num <= 0) {
1,352✔
159
    streamMutexUnlock(&execInfo.lock);
48✔
160
    return 0;
48✔
161
  }
162

163
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
1,304✔
164
  if (code) {
1,304!
165
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
166
  }
167

168
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
1,304✔
169
  if (pEntry != NULL) {
1,304✔
170
    SStreamTransInfo tInfo = *pEntry;
1,270✔
171
    streamMutexUnlock(&execInfo.lock);
1,270✔
172

173
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
1,270!
174
        strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
1,270!
175
      return tInfo.transId;
×
176
    }
177
  } else {
178
    streamMutexUnlock(&execInfo.lock);
34✔
179
  }
180

181
  return 0;
1,304✔
182
}
183

184
int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
6,431✔
185
                      const char *pMsg, STrans **pTrans1) {
186
  *pTrans1 = NULL;
6,431✔
187
  terrno = 0;
6,431✔
188

189
  int32_t code = 0;
6,431✔
190
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
6,431✔
191
  if (p == NULL) {
6,431!
192
    mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
193
    return terrno;
×
194
  }
195

196
  mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
6,431!
197

198
  mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
6,431✔
199
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
6,431!
200
    mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
×
201
    mndTransDrop(p);
×
202
    return code;
×
203
  }
204

205
  *pTrans1 = p;
6,431✔
206
  return code;
6,431✔
207
}
208

209
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
7,258✔
210
  int32_t code = 0;
7,258✔
211
  int32_t lino = 0;
7,258✔
212
  void   *buf = NULL;
7,258✔
213

214
  SEncoder encoder;
215
  tEncoderInit(&encoder, NULL, 0);
7,258✔
216
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
7,258!
217
    tEncoderClear(&encoder);
×
218
    TSDB_CHECK_CODE(code, lino, _over);
×
219
  }
220

221
  int32_t tlen = encoder.pos;
7,258✔
222
  tEncoderClear(&encoder);
7,258✔
223

224
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
7,258✔
225
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
7,258✔
226
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
7,258!
227

228
  buf = taosMemoryMalloc(tlen);
7,258!
229
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
7,258!
230

231
  tEncoderInit(&encoder, buf, tlen);
7,258✔
232
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
7,258!
233
    tEncoderClear(&encoder);
×
234
    TSDB_CHECK_CODE(code, lino, _over);
×
235
  }
236

237
  tEncoderClear(&encoder);
7,258✔
238

239
  int32_t dataPos = 0;
7,258✔
240
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
7,258!
241
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
7,258!
242
  SDB_SET_DATALEN(pRaw, dataPos, _over);
7,258!
243

244
_over:
7,258✔
245
  taosMemoryFreeClear(buf);
7,258!
246
  if (code != TSDB_CODE_SUCCESS) {
7,258!
247
    mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
×
248
    sdbFreeRaw(pRaw);
×
249
    terrno = code;
×
250
    return NULL;
×
251
  }
252

253
  terrno = 0;
7,258✔
254
  mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
7,258✔
255
         pStream->checkpointId);
256
  return pRaw;
7,258✔
257
}
258

259
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
6,554✔
260
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
6,554✔
261
  if (pCommitRaw == NULL) {
6,554!
262
    mError("failed to encode stream since %s", terrstr());
×
263
    mndTransDrop(pTrans);
×
264
    return terrno;
×
265
  }
266

267
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
6,554!
268
    mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
269
    sdbFreeRaw(pCommitRaw);
×
270
    mndTransDrop(pTrans);
×
271
    return terrno;
×
272
  }
273

274
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
6,554!
275
    mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
276
    sdbFreeRaw(pCommitRaw);
×
277
    mndTransDrop(pTrans);
×
278
    return terrno;
×
279
  }
280

281
  return 0;
6,554✔
282
}
283

284
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
33,543✔
285
                       int32_t retryCode, int32_t acceptCode) {
286
  STransAction action = {.epSet = *pEpset,
33,543✔
287
                         .contLen = contLen,
288
                         .pCont = pCont,
289
                         .msgType = msgType,
290
                         .retryCode = retryCode,
291
                         .acceptableCode = acceptCode};
292
  return mndTransAppendRedoAction(pTrans, &action);
33,543✔
293
}
294

295
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
8✔
296
  void *pIter = NULL;
8✔
297

298
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
15✔
299
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
7✔
300
    if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
7✔
301
      continue;
6✔
302
    }
303

304
    SStreamObj *pStream = NULL;
1✔
305
    int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
1✔
306
    if (pStream != NULL && code == 0) {
1!
307
      if (identicalName(pStream->sourceDb, pDBName, len)) {
×
308
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
×
309
      } else if (identicalName(pStream->targetDb, pDBName, len)) {
×
310
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
×
311
      }
312

313
      mndReleaseStream(pMnode, pStream);
×
314
    }
315
  }
316

317
  return TSDB_CODE_SUCCESS;
8✔
318
}
319

320
// kill all trans in the dst DB
321
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
8✔
322
  mDebug("start to clear checkpoints in all Dbs");
8✔
323
  char p[128] = {0};
8✔
324

325
  void *pIter = NULL;
8✔
326
  while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
16✔
327
    char *pDb = (char *)pIter;
8✔
328

329
    size_t len = 0;
8✔
330
    void  *pKey = taosHashGetKey(pDb, &len);
8✔
331
    int cpLen = (127 < len) ? 127 : len;
8✔
332
    TAOS_STRNCPY(p, pKey, cpLen);
8✔
333
    p[cpLen] = '\0';
8✔
334

335
    int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
8✔
336
    if (code) {
8!
337
      mError("failed to kill trans, transId:%p", pKey);
×
338
    } else {
339
      mDebug("clear checkpoint trans in Db:%s", p);
8✔
340
    }
341
  }
342

343
  mDebug("complete clear checkpoints in all Dbs");
8✔
344
}
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc