• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3552

11 Dec 2024 06:08AM UTC coverage: 62.526% (+0.7%) from 61.798%
#3552

push

travis-ci

web-flow
Merge pull request #29092 from taosdata/fix/3.0/TD-33146

fix:[TD-33146] stmt_get_tag_fields return error code

124833 of 255773 branches covered (48.81%)

Branch coverage included in aggregate %.

209830 of 279467 relevant lines covered (75.08%)

19111707.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.54
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
typedef struct SKeyInfo {
20
  void   *pKey;
21
  int32_t keyLen;
22
} SKeyInfo;
23

24
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
25
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
26
}
27

28
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
4,348✔
29
  SStreamTransInfo info = {
4,348✔
30
      .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
4,348✔
31
  return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
4,348✔
32
}
33

34
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) {
5,404✔
35
  size_t  keyLen = 0;
5,404✔
36
  void   *pIter = NULL;
5,404✔
37
  SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
5,404✔
38
  int32_t numOfChkpt = 0;
5,404✔
39

40
  if (pNumOfActiveChkpt != NULL) {
5,404✔
41
    *pNumOfActiveChkpt = 0;
542✔
42
  }
43

44
  if (pList == NULL) {
5,404!
45
    return terrno;
×
46
  }
47

48
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
11,114✔
49
    SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
5,710✔
50

51
    // let's clear the finished trans
52
    STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
5,710✔
53
    if (pTrans == NULL) {
5,710✔
54
      void *pKey = taosHashGetKey(pEntry, &keyLen);
4,085✔
55
      // key is the name of src/dst db name
56
      SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
4,085✔
57
      mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
4,085✔
58
             pEntry->streamId, pEntry->name, pEntry->startTime);
59
      void* p = taosArrayPush(pList, &info);
4,085✔
60
      if (p == NULL) {
4,085!
61
        return terrno;
×
62
      }
63
    } else {
64
      if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
1,625✔
65
        numOfChkpt++;
276✔
66
      }
67
      mndReleaseTrans(pMnode, pTrans);
1,625✔
68
    }
69
  }
70

71
  int32_t size = taosArrayGetSize(pList);
5,404✔
72
  for (int32_t i = 0; i < size; ++i) {
9,489✔
73
    SKeyInfo *pKey = taosArrayGet(pList, i);
4,085✔
74
    if (pKey == NULL) {
4,085!
75
      continue;
×
76
    }
77

78
    int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
4,085✔
79
    if (code != 0) {
4,085!
80
      taosArrayDestroy(pList);
×
81
      return code;
×
82
    }
83
  }
84

85
  mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
5,404✔
86
         taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
87

88
  taosArrayDestroy(pList);
5,404✔
89

90
  if (pNumOfActiveChkpt != NULL) {
5,404✔
91
    *pNumOfActiveChkpt = numOfChkpt;
542✔
92
  }
93

94
  return 0;
5,404✔
95
}
96

97
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
4,257✔
98
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
4,257✔
99
  if (num <= 0) {
4,257✔
100
    return 0;
685✔
101
  }
102

103
  // if any task updates exist, any other stream trans are not allowed to be created
104
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
3,572✔
105
  if (code) {
3,572!
106
    mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
×
107
  }
108

109
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
3,572✔
110
  if (pEntry != NULL) {
3,572✔
111
    SStreamTransInfo tInfo = *pEntry;
52✔
112

113
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
52✔
114
      if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
48!
115
          (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
×
116
        mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
117
              tInfo.name);
118
        return TSDB_CODE_MND_TRANS_CONFLICT;
1✔
119
      } else {
120
        mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
48✔
121
      }
122
    } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
4!
123
               (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
3!
124
               (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
3!
125
               strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) {
3!
126
      mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
1!
127
            tInfo.name);
128
      return TSDB_CODE_MND_TRANS_CONFLICT;
1✔
129
    }
130
  } else {
131
    mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
3,520✔
132
  }
133

134
  return TSDB_CODE_SUCCESS;
3,571✔
135
}
136

137
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
138
// For a given stream:
139
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
140
// 2. create/drop/reset/update trans are conflict with any other trans.
141
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
4,257✔
142
  if (lock) {
4,257✔
143
    streamMutexLock(&execInfo.lock);
2,418✔
144
  }
145

146
  int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
4,257✔
147

148
  if (lock) {
4,257✔
149
    streamMutexUnlock(&execInfo.lock);
2,418✔
150
  }
151

152
  return code;
4,257✔
153
}
154

155
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
1,336✔
156
  streamMutexLock(&execInfo.lock);
1,336✔
157
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
1,336✔
158
  if (num <= 0) {
1,336✔
159
    streamMutexUnlock(&execInfo.lock);
46✔
160
    return 0;
46✔
161
  }
162

163
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
1,290✔
164
  if (code) {
1,290!
165
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
166
  }
167

168
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
1,290✔
169
  if (pEntry != NULL) {
1,290✔
170
    SStreamTransInfo tInfo = *pEntry;
1,255✔
171
    streamMutexUnlock(&execInfo.lock);
1,255✔
172

173
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
1,255!
174
        strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
1,255!
175
      return tInfo.transId;
×
176
    }
177
  } else {
178
    streamMutexUnlock(&execInfo.lock);
35✔
179
  }
180

181
  return 0;
1,290✔
182
}
183

184
int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
5,957✔
185
                      const char *pMsg, STrans **pTrans1) {
186
  *pTrans1 = NULL;
5,957✔
187
  terrno = 0;
5,957✔
188

189
  int32_t code = 0;
5,957✔
190
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
5,957✔
191
  if (p == NULL) {
5,957!
192
    mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
193
    return terrno;
×
194
  }
195

196
  mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
5,957!
197

198
  mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
5,957✔
199
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
5,957!
200
    mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
×
201
    mndTransDrop(p);
×
202
    return code;
×
203
  }
204

205
  *pTrans1 = p;
5,957✔
206
  return code;
5,957✔
207
}
208

209
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
6,667✔
210
  int32_t code = 0;
6,667✔
211
  int32_t lino = 0;
6,667✔
212
  void   *buf = NULL;
6,667✔
213

214
  SEncoder encoder;
215
  tEncoderInit(&encoder, NULL, 0);
6,667✔
216
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
6,667!
217
    tEncoderClear(&encoder);
×
218
    TSDB_CHECK_CODE(code, lino, _over);
×
219
  }
220

221
  int32_t tlen = encoder.pos;
6,667✔
222
  tEncoderClear(&encoder);
6,667✔
223

224
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
6,667✔
225
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
6,667✔
226
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
6,667!
227

228
  buf = taosMemoryMalloc(tlen);
6,667✔
229
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,667!
230

231
  tEncoderInit(&encoder, buf, tlen);
6,667✔
232
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
6,667!
233
    tEncoderClear(&encoder);
×
234
    TSDB_CHECK_CODE(code, lino, _over);
×
235
  }
236

237
  tEncoderClear(&encoder);
6,667✔
238

239
  int32_t dataPos = 0;
6,667✔
240
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
6,667!
241
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,667!
242
  SDB_SET_DATALEN(pRaw, dataPos, _over);
6,667!
243

244
_over:
6,667✔
245
  taosMemoryFreeClear(buf);
6,667!
246
  if (code != TSDB_CODE_SUCCESS) {
6,667!
247
    mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
×
248
    sdbFreeRaw(pRaw);
×
249
    terrno = code;
×
250
    return NULL;
×
251
  }
252

253
  terrno = 0;
6,667✔
254
  mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
6,667✔
255
         pStream->checkpointId);
256
  return pRaw;
6,667✔
257
}
258

259
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
6,084✔
260
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
6,084✔
261
  if (pCommitRaw == NULL) {
6,084!
262
    mError("failed to encode stream since %s", terrstr());
×
263
    mndTransDrop(pTrans);
×
264
    return terrno;
×
265
  }
266

267
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
6,084!
268
    mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
269
    sdbFreeRaw(pCommitRaw);
×
270
    mndTransDrop(pTrans);
×
271
    return terrno;
×
272
  }
273

274
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
6,084!
275
    mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
276
    sdbFreeRaw(pCommitRaw);
×
277
    mndTransDrop(pTrans);
×
278
    return terrno;
×
279
  }
280

281
  return 0;
6,084✔
282
}
283

284
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
31,886✔
285
                       int32_t retryCode, int32_t acceptCode) {
286
  STransAction action = {.epSet = *pEpset,
31,886✔
287
                         .contLen = contLen,
288
                         .pCont = pCont,
289
                         .msgType = msgType,
290
                         .retryCode = retryCode,
291
                         .acceptableCode = acceptCode};
292
  return mndTransAppendRedoAction(pTrans, &action);
31,886✔
293
}
294

295
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
10✔
296
  void *pIter = NULL;
10✔
297

298
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
18✔
299
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
8✔
300
    if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
8✔
301
      continue;
7✔
302
    }
303

304
    SStreamObj *pStream = NULL;
1✔
305
    int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
1✔
306
    if (pStream != NULL && code == 0) {
1!
307
      if (identicalName(pStream->sourceDb, pDBName, len)) {
×
308
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
×
309
      } else if (identicalName(pStream->targetDb, pDBName, len)) {
×
310
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
×
311
      }
312

313
      mndReleaseStream(pMnode, pStream);
×
314
    }
315
  }
316

317
  return TSDB_CODE_SUCCESS;
10✔
318
}
319

320
// kill all trans in the dst DB
321
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
10✔
322
  mDebug("start to clear checkpoints in all Dbs");
10✔
323
  char p[128] = {0};
10✔
324

325
  void *pIter = NULL;
10✔
326
  while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
20✔
327
    char *pDb = (char *)pIter;
10✔
328

329
    size_t len = 0;
10✔
330
    void  *pKey = taosHashGetKey(pDb, &len);
10✔
331
    int cpLen = (127 < len) ? 127 : len;
10✔
332
    TAOS_STRNCPY(p, pKey, cpLen);
10✔
333
    p[cpLen] = '\0';
10✔
334

335
    int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
10✔
336
    if (code) {
10!
337
      mError("failed to kill trans, transId:%p", pKey);
×
338
    } else {
339
      mDebug("clear checkpoint trans in Db:%s", p);
10✔
340
    }
341
  }
342

343
  mDebug("complete clear checkpoints in all Dbs");
10✔
344
}
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc