• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
typedef struct SKeyInfo {
20
  void   *pKey;
21
  int32_t keyLen;
22
} SKeyInfo;
23

24
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
25
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
26
}
27

UNCOV
28
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
×
UNCOV
29
  SStreamTransInfo info = {
×
UNCOV
30
      .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
×
UNCOV
31
  return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
×
32
}
33

UNCOV
34
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) {
×
UNCOV
35
  size_t  keyLen = 0;
×
UNCOV
36
  void   *pIter = NULL;
×
UNCOV
37
  SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
×
UNCOV
38
  int32_t numOfChkpt = 0;
×
39

UNCOV
40
  if (pNumOfActiveChkpt != NULL) {
×
UNCOV
41
    *pNumOfActiveChkpt = 0;
×
42
  }
43

UNCOV
44
  if (pList == NULL) {
×
45
    return terrno;
×
46
  }
47

UNCOV
48
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
×
UNCOV
49
    SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
×
50

51
    // let's clear the finished trans
UNCOV
52
    STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
×
UNCOV
53
    if (pTrans == NULL) {
×
UNCOV
54
      void *pKey = taosHashGetKey(pEntry, &keyLen);
×
55
      // key is the name of src/dst db name
UNCOV
56
      SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
×
UNCOV
57
      mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
×
58
             pEntry->streamId, pEntry->name, pEntry->startTime);
UNCOV
59
      void* p = taosArrayPush(pList, &info);
×
UNCOV
60
      if (p == NULL) {
×
61
        return terrno;
×
62
      }
63
    } else {
UNCOV
64
      if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
×
UNCOV
65
        numOfChkpt++;
×
66
      }
UNCOV
67
      mndReleaseTrans(pMnode, pTrans);
×
68
    }
69
  }
70

UNCOV
71
  int32_t size = taosArrayGetSize(pList);
×
UNCOV
72
  for (int32_t i = 0; i < size; ++i) {
×
UNCOV
73
    SKeyInfo *pKey = taosArrayGet(pList, i);
×
UNCOV
74
    if (pKey == NULL) {
×
75
      continue;
×
76
    }
77

UNCOV
78
    int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
×
UNCOV
79
    if (code != 0) {
×
80
      taosArrayDestroy(pList);
×
81
      return code;
×
82
    }
83
  }
84

UNCOV
85
  mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
×
86
         taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
87

UNCOV
88
  taosArrayDestroy(pList);
×
89

UNCOV
90
  if (pNumOfActiveChkpt != NULL) {
×
UNCOV
91
    *pNumOfActiveChkpt = numOfChkpt;
×
92
  }
93

UNCOV
94
  return 0;
×
95
}
96

UNCOV
97
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
×
UNCOV
98
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
×
UNCOV
99
  if (num <= 0) {
×
UNCOV
100
    return 0;
×
101
  }
102

103
  // if any task updates exist, any other stream trans are not allowed to be created
UNCOV
104
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
×
UNCOV
105
  if (code) {
×
106
    mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
×
107
  }
108

UNCOV
109
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
×
UNCOV
110
  if (pEntry != NULL) {
×
UNCOV
111
    SStreamTransInfo tInfo = *pEntry;
×
112

UNCOV
113
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
×
UNCOV
114
      if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
×
115
          (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
×
116
        mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
117
              tInfo.name);
UNCOV
118
        return TSDB_CODE_MND_TRANS_CONFLICT;
×
119
      } else {
UNCOV
120
        mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
×
121
      }
UNCOV
122
    } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
×
UNCOV
123
               (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
×
UNCOV
124
               (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
×
UNCOV
125
               strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) {
×
UNCOV
126
      mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
127
            tInfo.name);
UNCOV
128
      return TSDB_CODE_MND_TRANS_CONFLICT;
×
129
    }
130
  } else {
UNCOV
131
    mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
×
132
  }
133

UNCOV
134
  return TSDB_CODE_SUCCESS;
×
135
}
136

137
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
138
// For a given stream:
139
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
140
// 2. create/drop/reset/update trans are conflict with any other trans.
UNCOV
141
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
×
UNCOV
142
  if (lock) {
×
UNCOV
143
    streamMutexLock(&execInfo.lock);
×
144
  }
145

UNCOV
146
  int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
×
147

UNCOV
148
  if (lock) {
×
UNCOV
149
    streamMutexUnlock(&execInfo.lock);
×
150
  }
151

UNCOV
152
  return code;
×
153
}
154

UNCOV
155
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
×
UNCOV
156
  streamMutexLock(&execInfo.lock);
×
UNCOV
157
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
×
UNCOV
158
  if (num <= 0) {
×
UNCOV
159
    streamMutexUnlock(&execInfo.lock);
×
UNCOV
160
    return 0;
×
161
  }
162

UNCOV
163
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
×
UNCOV
164
  if (code) {
×
165
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
166
  }
167

UNCOV
168
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
×
UNCOV
169
  if (pEntry != NULL) {
×
UNCOV
170
    SStreamTransInfo tInfo = *pEntry;
×
UNCOV
171
    streamMutexUnlock(&execInfo.lock);
×
172

UNCOV
173
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
×
UNCOV
174
        strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
×
175
      return tInfo.transId;
×
176
    }
177
  } else {
UNCOV
178
    streamMutexUnlock(&execInfo.lock);
×
179
  }
180

UNCOV
181
  return 0;
×
182
}
183

UNCOV
184
int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
×
185
                      const char *pMsg, STrans **pTrans1) {
UNCOV
186
  *pTrans1 = NULL;
×
UNCOV
187
  terrno = 0;
×
188

UNCOV
189
  int32_t code = 0;
×
UNCOV
190
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
×
UNCOV
191
  if (p == NULL) {
×
192
    mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
193
    return terrno;
×
194
  }
195

UNCOV
196
  mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
×
197

UNCOV
198
  mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
×
UNCOV
199
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
×
200
    mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
×
201
    mndTransDrop(p);
×
202
    return code;
×
203
  }
204

UNCOV
205
  *pTrans1 = p;
×
UNCOV
206
  return code;
×
207
}
208

UNCOV
209
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
×
UNCOV
210
  int32_t code = 0;
×
UNCOV
211
  int32_t lino = 0;
×
UNCOV
212
  void   *buf = NULL;
×
213

214
  SEncoder encoder;
UNCOV
215
  tEncoderInit(&encoder, NULL, 0);
×
UNCOV
216
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
×
217
    tEncoderClear(&encoder);
×
218
    TSDB_CHECK_CODE(code, lino, _over);
×
219
  }
220

UNCOV
221
  int32_t tlen = encoder.pos;
×
UNCOV
222
  tEncoderClear(&encoder);
×
223

UNCOV
224
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
×
UNCOV
225
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
×
UNCOV
226
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
×
227

UNCOV
228
  buf = taosMemoryMalloc(tlen);
×
UNCOV
229
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
×
230

UNCOV
231
  tEncoderInit(&encoder, buf, tlen);
×
UNCOV
232
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
×
233
    tEncoderClear(&encoder);
×
234
    TSDB_CHECK_CODE(code, lino, _over);
×
235
  }
236

UNCOV
237
  tEncoderClear(&encoder);
×
238

UNCOV
239
  int32_t dataPos = 0;
×
UNCOV
240
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
×
UNCOV
241
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
×
UNCOV
242
  SDB_SET_DATALEN(pRaw, dataPos, _over);
×
243

UNCOV
244
_over:
×
UNCOV
245
  taosMemoryFreeClear(buf);
×
UNCOV
246
  if (code != TSDB_CODE_SUCCESS) {
×
247
    mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
×
248
    sdbFreeRaw(pRaw);
×
249
    terrno = code;
×
250
    return NULL;
×
251
  }
252

UNCOV
253
  terrno = 0;
×
UNCOV
254
  mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
×
255
         pStream->checkpointId);
UNCOV
256
  return pRaw;
×
257
}
258

UNCOV
259
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
×
UNCOV
260
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
×
UNCOV
261
  if (pCommitRaw == NULL) {
×
262
    mError("failed to encode stream since %s", terrstr());
×
263
    mndTransDrop(pTrans);
×
264
    return terrno;
×
265
  }
266

UNCOV
267
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
×
268
    mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
269
    sdbFreeRaw(pCommitRaw);
×
270
    mndTransDrop(pTrans);
×
271
    return terrno;
×
272
  }
273

UNCOV
274
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
×
275
    mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
276
    sdbFreeRaw(pCommitRaw);
×
277
    mndTransDrop(pTrans);
×
278
    return terrno;
×
279
  }
280

UNCOV
281
  return 0;
×
282
}
283

UNCOV
284
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
×
285
                       int32_t retryCode, int32_t acceptCode) {
UNCOV
286
  STransAction action = {.epSet = *pEpset,
×
287
                         .contLen = contLen,
288
                         .pCont = pCont,
289
                         .msgType = msgType,
290
                         .retryCode = retryCode,
291
                         .acceptableCode = acceptCode};
UNCOV
292
  return mndTransAppendRedoAction(pTrans, &action);
×
293
}
294

UNCOV
295
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
×
UNCOV
296
  void *pIter = NULL;
×
297

UNCOV
298
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
×
UNCOV
299
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
×
UNCOV
300
    if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
×
UNCOV
301
      continue;
×
302
    }
303

UNCOV
304
    SStreamObj *pStream = NULL;
×
UNCOV
305
    int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
×
UNCOV
306
    if (pStream != NULL && code == 0) {
×
307
      if (identicalName(pStream->sourceDb, pDBName, len)) {
×
308
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
×
309
      } else if (identicalName(pStream->targetDb, pDBName, len)) {
×
310
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
×
311
      }
312

313
      mndReleaseStream(pMnode, pStream);
×
314
    }
315
  }
316

UNCOV
317
  return TSDB_CODE_SUCCESS;
×
318
}
319

320
// kill all trans in the dst DB
UNCOV
321
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
×
UNCOV
322
  mDebug("start to clear checkpoints in all Dbs");
×
UNCOV
323
  char p[128] = {0};
×
324

UNCOV
325
  void *pIter = NULL;
×
UNCOV
326
  while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
×
UNCOV
327
    char *pDb = (char *)pIter;
×
328

UNCOV
329
    size_t len = 0;
×
UNCOV
330
    void  *pKey = taosHashGetKey(pDb, &len);
×
UNCOV
331
    int cpLen = (127 < len) ? 127 : len;
×
UNCOV
332
    TAOS_STRNCPY(p, pKey, cpLen);
×
UNCOV
333
    p[cpLen] = '\0';
×
334

UNCOV
335
    int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
×
UNCOV
336
    if (code) {
×
337
      mError("failed to kill trans, transId:%p", pKey);
×
338
    } else {
UNCOV
339
      mDebug("clear checkpoint trans in Db:%s", p);
×
340
    }
341
  }
342

UNCOV
343
  mDebug("complete clear checkpoints in all Dbs");
×
UNCOV
344
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc