• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3542

27 Nov 2024 02:52AM UTC coverage: 60.819% (+0.04%) from 60.776%
#3542

push

travis-ci

web-flow
Merge pull request #28931 from taosdata/enh/jdbc-demo-3.0

update jdbc demo, and version history

120305 of 252779 branches covered (47.59%)

Branch coverage included in aggregate %.

201010 of 275538 relevant lines covered (72.95%)

19989893.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.87
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
typedef struct SKeyInfo {
20
  void   *pKey;
21
  int32_t keyLen;
22
} SKeyInfo;
23

24
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
25
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
26
}
27

28
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
4,407✔
29
  SStreamTransInfo info = {
4,407✔
30
      .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
4,407✔
31
  return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
4,407✔
32
}
33

34
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) {
5,486✔
35
  size_t  keyLen = 0;
5,486✔
36
  void   *pIter = NULL;
5,486✔
37
  SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
5,486✔
38
  int32_t num = 0;
5,486✔
39

40
  if (pList == NULL) {
5,486!
41
    return terrno;
×
42
  }
43

44
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
11,318✔
45
    SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
5,832✔
46

47
    // let's clear the finished trans
48
    STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
5,832✔
49
    if (pTrans == NULL) {
5,832✔
50
      void *pKey = taosHashGetKey(pEntry, &keyLen);
4,181✔
51
      // key is the name of src/dst db name
52
      SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
4,181✔
53
      mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
4,181✔
54
             pEntry->startTime);
55
      void* p = taosArrayPush(pList, &info);
4,181✔
56
      if (p == NULL) {
4,181!
57
        return terrno;
×
58
      }
59
    } else {
60
      if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
1,651✔
61
        num++;
292✔
62
      }
63
      mndReleaseTrans(pMnode, pTrans);
1,651✔
64
    }
65
  }
66

67
  int32_t size = taosArrayGetSize(pList);
5,486✔
68
  for (int32_t i = 0; i < size; ++i) {
9,667✔
69
    SKeyInfo *pKey = taosArrayGet(pList, i);
4,181✔
70
    if (pKey == NULL) {
4,181!
71
      continue;
×
72
    }
73

74
    int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
4,181✔
75
    if (code != 0) {
4,181!
76
      taosArrayDestroy(pList);
×
77
      return code;
×
78
    }
79
  }
80

81
  mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size,
5,486✔
82
         taosHashGetSize(execInfo.transMgmt.pDBTrans), num);
83

84
  taosArrayDestroy(pList);
5,486✔
85

86
  if (pNumOfActiveChkpt != NULL) {
5,486✔
87
    *pNumOfActiveChkpt = num;
569✔
88
  }
89

90
  return 0;
5,486✔
91
}
92

93
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
94
// For a given stream:
95
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
96
// 2. create/drop/reset/update trans are conflict with any other trans.
97
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
4,365✔
98
  if (lock) {
4,365✔
99
    streamMutexLock(&execInfo.lock);
2,453✔
100
  }
101

102
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
4,365✔
103
  if (num <= 0) {
4,365✔
104
    if (lock) {
729✔
105
      streamMutexUnlock(&execInfo.lock);
676✔
106
    }
107
    return 0;
729✔
108
  }
109

110
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
3,636✔
111
  if (code) {
3,636!
112
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
113
  }
114

115
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
3,636✔
116
  if (pEntry != NULL) {
3,636✔
117
    SStreamTransInfo tInfo = *pEntry;
43✔
118

119
    if (lock) {
43✔
120
      streamMutexUnlock(&execInfo.lock);
42✔
121
    }
122

123
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
43✔
124
      if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
40!
125
          (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
×
126
        mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
127
              tInfo.name);
128
        return TSDB_CODE_MND_TRANS_CONFLICT;
×
129
      } else {
130
        mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
40✔
131
      }
132
    } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
3!
133
               (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
3!
134
               (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
3!
135
               strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) {
3!
136
      mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
137
            tInfo.name);
138
      return TSDB_CODE_MND_TRANS_CONFLICT;
×
139
    }
140
  } else {
141
    mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
3,593✔
142
  }
143

144
  if (lock) {
3,636✔
145
    streamMutexUnlock(&execInfo.lock);
1,777✔
146
  }
147

148
  return 0;
3,636✔
149
}
150

151
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
1,332✔
152
  streamMutexLock(&execInfo.lock);
1,332✔
153
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
1,332✔
154
  if (num <= 0) {
1,332✔
155
    streamMutexUnlock(&execInfo.lock);
51✔
156
    return 0;
51✔
157
  }
158

159
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
1,281✔
160
  if (code) {
1,281!
161
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
162
  }
163

164
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
1,281✔
165
  if (pEntry != NULL) {
1,281✔
166
    SStreamTransInfo tInfo = *pEntry;
1,254✔
167
    streamMutexUnlock(&execInfo.lock);
1,254✔
168

169
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
1,254!
170
        strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
1,254!
171
      return tInfo.transId;
×
172
    }
173
  } else {
174
    streamMutexUnlock(&execInfo.lock);
27✔
175
  }
176

177
  return 0;
1,281✔
178
}
179

180
int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
6,004✔
181
                      const char *pMsg, STrans **pTrans1) {
182
  *pTrans1 = NULL;
6,004✔
183
  terrno = 0;
6,004✔
184

185
  int32_t code = 0;
6,004✔
186
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
6,004✔
187
  if (p == NULL) {
6,004!
188
    mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
189
    return terrno;
×
190
  }
191

192
  mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
6,004!
193

194
  mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
6,004✔
195
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
6,004!
196
    mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
×
197
    mndTransDrop(p);
×
198
    return code;
×
199
  }
200

201
  *pTrans1 = p;
6,004✔
202
  return code;
6,004✔
203
}
204

205
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
6,705✔
206
  int32_t code = 0;
6,705✔
207
  int32_t lino = 0;
6,705✔
208
  void   *buf = NULL;
6,705✔
209

210
  SEncoder encoder;
211
  tEncoderInit(&encoder, NULL, 0);
6,705✔
212
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
6,705!
213
    tEncoderClear(&encoder);
×
214
    TSDB_CHECK_CODE(code, lino, _over);
×
215
  }
216

217
  int32_t tlen = encoder.pos;
6,705✔
218
  tEncoderClear(&encoder);
6,705✔
219

220
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
6,705✔
221
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
6,705✔
222
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
6,705!
223

224
  buf = taosMemoryMalloc(tlen);
6,705✔
225
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
6,705!
226

227
  tEncoderInit(&encoder, buf, tlen);
6,705✔
228
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
6,705!
229
    tEncoderClear(&encoder);
×
230
    TSDB_CHECK_CODE(code, lino, _over);
×
231
  }
232

233
  tEncoderClear(&encoder);
6,705✔
234

235
  int32_t dataPos = 0;
6,705✔
236
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
6,705!
237
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
6,705!
238
  SDB_SET_DATALEN(pRaw, dataPos, _over);
6,705!
239

240
_over:
6,705✔
241
  taosMemoryFreeClear(buf);
6,705!
242
  if (code != TSDB_CODE_SUCCESS) {
6,705!
243
    mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
×
244
    sdbFreeRaw(pRaw);
×
245
    terrno = code;
×
246
    return NULL;
×
247
  }
248

249
  terrno = 0;
6,705✔
250
  mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
6,705✔
251
         pStream->checkpointId);
252
  return pRaw;
6,705✔
253
}
254

255
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
6,128✔
256
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
6,128✔
257
  if (pCommitRaw == NULL) {
6,128!
258
    mError("failed to encode stream since %s", terrstr());
×
259
    mndTransDrop(pTrans);
×
260
    return terrno;
×
261
  }
262

263
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
6,128!
264
    mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
265
    sdbFreeRaw(pCommitRaw);
×
266
    mndTransDrop(pTrans);
×
267
    return terrno;
×
268
  }
269

270
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
6,128!
271
    mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
272
    sdbFreeRaw(pCommitRaw);
×
273
    mndTransDrop(pTrans);
×
274
    return terrno;
×
275
  }
276

277
  return 0;
6,128✔
278
}
279

280
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
31,942✔
281
                       int32_t retryCode, int32_t acceptCode) {
282
  STransAction action = {.epSet = *pEpset,
31,942✔
283
                         .contLen = contLen,
284
                         .pCont = pCont,
285
                         .msgType = msgType,
286
                         .retryCode = retryCode,
287
                         .acceptableCode = acceptCode};
288
  return mndTransAppendRedoAction(pTrans, &action);
31,942✔
289
}
290

291
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
2✔
292
  void *pIter = NULL;
2✔
293

294
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
3✔
295
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
1✔
296
    if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
1!
297
      continue;
1✔
298
    }
299

300
    SStreamObj *pStream = NULL;
×
301
    int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
×
302
    if (pStream != NULL && code == 0) {
×
303
      if (identicalName(pStream->sourceDb, pDBName, len)) {
×
304
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
×
305
      } else if (identicalName(pStream->targetDb, pDBName, len)) {
×
306
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
×
307
      }
308

309
      mndReleaseStream(pMnode, pStream);
×
310
    }
311
  }
312

313
  return TSDB_CODE_SUCCESS;
2✔
314
}
315

316
// kill all trans in the dst DB
317
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
2✔
318
  mDebug("start to clear checkpoints in all Dbs");
2!
319
  char p[128] = {0};
2✔
320

321
  void *pIter = NULL;
2✔
322
  while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
4✔
323
    char *pDb = (char *)pIter;
2✔
324

325
    size_t len = 0;
2✔
326
    void  *pKey = taosHashGetKey(pDb, &len);
2✔
327
    int cpLen = (127 < len) ? 127 : len;
2✔
328
    TAOS_STRNCPY(p, pKey, cpLen);
2✔
329
    p[cpLen] = '\0';
2✔
330

331
    int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
2✔
332
    if (code) {
2!
333
      mError("failed to kill trans, transId:%p", pKey);
×
334
    } else {
335
      mDebug("clear checkpoint trans in Db:%s", p);
2!
336
    }
337
  }
338

339
  mDebug("complete clear checkpoints in all Dbs");
2!
340
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc