• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3929

25 Apr 2025 11:21AM UTC coverage: 62.507% (+0.1%) from 62.362%
#3929

push

travis-ci

web-flow
docs: jdbc tmq supports database subscription. [TS-6222] (#30819)

* docs: jdbc tmq supports database subscription. [TS-6222]

* Update docs/zh/07-develop/07-tmq.md

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* Update 07-tmq.md

---------

Co-authored-by: haoranchen <haoran920c@163.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

155520 of 317319 branches covered (49.01%)

Branch coverage included in aggregate %.

240651 of 316482 relevant lines covered (76.04%)

6469891.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.88
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
#define MAX_CHKPT_EXEC_ELAPSED (600*1000*3)  // 600s
20

21
typedef struct SKeyInfo {
22
  void   *pKey;
23
  int32_t keyLen;
24
} SKeyInfo;
25

26
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
27
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
28
}
29

30
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
1,208✔
31
  SStreamTransInfo info = {
1,208✔
32
      .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
1,208✔
33
  return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
1,208✔
34
}
35

36
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans) {
2,702✔
37
  size_t  keyLen = 0;
2,702✔
38
  void   *pIter = NULL;
2,702✔
39
  SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
2,702✔
40
  int32_t numOfChkpt = 0;
2,702✔
41
  int64_t now = taosGetTimestampMs();
2,702✔
42

43
  if (pNumOfActiveChkpt != NULL) {
2,702✔
44
    *pNumOfActiveChkpt = 0;
1,107✔
45
  }
46

47
  if (pList == NULL) {
2,702!
48
    return terrno;
×
49
  }
50

51
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
4,503✔
52
    SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
1,801✔
53

54
    // let's clear the finished trans
55
    STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
1,801✔
56
    if (pTrans == NULL) {
1,801✔
57
      void *pKey = taosHashGetKey(pEntry, &keyLen);
1,148✔
58
      // key is the name of src/dst db name
59
      SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
1,148✔
60
      mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
1,148✔
61
             pEntry->streamId, pEntry->name, pEntry->startTime);
62
      void* p = taosArrayPush(pList, &info);
1,148✔
63
      if (p == NULL) {
1,148!
64
        return terrno;
×
65
      }
66
    } else {
67
      if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
653✔
68
        numOfChkpt++;
76✔
69

70
        // last for 10min, kill it
71
        int64_t dur = now - pTrans->createdTime;
76✔
72
        if ((dur >= MAX_CHKPT_EXEC_ELAPSED) && (pLongChkptTrans != NULL)) {
76!
73
          mInfo("long chkpt transId:%d, start:%" PRId64
×
74
                " exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status",
75
                pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0));
76
          void* p = taosArrayPush(pLongChkptTrans, pEntry);
×
77
          if (p == NULL) {
×
78
            mError("failed to add long checkpoint trans, transId:%d, code:%s", pEntry->transId, tstrerror(terrno));
×
79
          }
80
        }
81
      }
82
      mndReleaseTrans(pMnode, pTrans);
653✔
83
    }
84
  }
85

86
  int32_t size = taosArrayGetSize(pList);
2,702✔
87
  for (int32_t i = 0; i < size; ++i) {
3,850✔
88
    SKeyInfo *pKey = taosArrayGet(pList, i);
1,148✔
89
    if (pKey == NULL) {
1,148!
90
      continue;
×
91
    }
92

93
    int32_t code = taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
1,148✔
94
    if (code != 0) {
1,148!
95
      taosArrayDestroy(pList);
×
96
      return code;
×
97
    }
98
  }
99

100
  mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
2,702✔
101
         taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
102

103
  taosArrayDestroy(pList);
2,702✔
104

105
  if (pNumOfActiveChkpt != NULL) {
2,702✔
106
    *pNumOfActiveChkpt = numOfChkpt;
1,107✔
107
  }
108

109
  return 0;
2,702✔
110
}
111

112
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
1,206✔
113
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
1,206✔
114
  if (num <= 0) {
1,206✔
115
    return 0;
188✔
116
  }
117

118
  // if any task updates exist, any other stream trans are not allowed to be created
119
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
1,018✔
120
  if (code) {
1,018!
121
    mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
×
122
  }
123

124
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
1,018✔
125
  if (pEntry != NULL) {
1,018✔
126
    SStreamTransInfo tInfo = *pEntry;
7✔
127

128
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
7!
129
      if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
7!
130
          (strcmp(pTransName, MND_STREAM_STOP_NAME) != 0)) {
×
131
        mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
132
              tInfo.name);
133
        return TSDB_CODE_MND_TRANS_CONFLICT;
×
134
      } else {
135
        mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
7!
136
      }
137
    } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
×
138
               (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
×
139
               (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
×
140
               (strcmp(tInfo.name, MND_STREAM_CHKPT_CONSEN_NAME) == 0) ||
×
141
               strcmp(tInfo.name, MND_STREAM_STOP_NAME) == 0) {
×
142
      mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
×
143
            tInfo.name);
144
      return TSDB_CODE_MND_TRANS_CONFLICT;
×
145
    }
146
  } else {
147
    mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
1,011✔
148
  }
149

150
  return TSDB_CODE_SUCCESS;
1,018✔
151
}
152

153
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
154
// For a given stream:
155
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
156
// 2. create/drop/reset/update/chkpt-consensus trans are conflict with any other trans.
157
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
1,206✔
158
  if (lock) {
1,206✔
159
    streamMutexLock(&execInfo.lock);
645✔
160
  }
161

162
  int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
1,206✔
163

164
  if (lock) {
1,206✔
165
    streamMutexUnlock(&execInfo.lock);
645✔
166
  }
167

168
  return code;
1,206✔
169
}
170

171
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
621✔
172
  streamMutexLock(&execInfo.lock);
621✔
173
  int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
621✔
174
  if (num <= 0) {
621✔
175
    streamMutexUnlock(&execInfo.lock);
44✔
176
    return 0;
44✔
177
  }
178

179
  int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
577✔
180
  if (code) {
577!
181
    mError("failed to clear finish trans, code:%s", tstrerror(code));
×
182
  }
183

184
  SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
577✔
185
  if (pEntry != NULL) {
577✔
186
    SStreamTransInfo tInfo = *pEntry;
568✔
187
    streamMutexUnlock(&execInfo.lock);
568✔
188

189
    if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
568!
190
        strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
568!
191
      return tInfo.transId;
×
192
    }
193
  } else {
194
    streamMutexUnlock(&execInfo.lock);
9✔
195
  }
196

197
  return 0;
577✔
198
}
199

200
int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
2,262✔
201
                      const char *pMsg, STrans **pTrans1) {
202
  *pTrans1 = NULL;
2,262✔
203
  terrno = 0;
2,262✔
204

205
  int32_t code = 0;
2,262✔
206
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
2,262✔
207
  if (p == NULL) {
2,262!
208
    mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
209
    return terrno;
×
210
  }
211

212
  mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
2,262!
213
  p->ableToBeKilled = true;
2,262✔
214

215
  mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
2,262✔
216
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
2,262!
217
    mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
×
218
    mndTransDrop(p);
×
219
    return code;
×
220
  }
221

222
  *pTrans1 = p;
2,262✔
223
  return code;
2,262✔
224
}
225

226
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
2,785✔
227
  int32_t code = 0;
2,785✔
228
  int32_t lino = 0;
2,785✔
229
  void   *buf = NULL;
2,785✔
230

231
  SEncoder encoder;
232
  tEncoderInit(&encoder, NULL, 0);
2,785✔
233
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
2,785!
234
    tEncoderClear(&encoder);
×
235
    TSDB_CHECK_CODE(code, lino, _over);
×
236
  }
237

238
  int32_t tlen = encoder.pos;
2,785✔
239
  tEncoderClear(&encoder);
2,785✔
240

241
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
2,785✔
242
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
2,785✔
243
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
2,785!
244

245
  buf = taosMemoryMalloc(tlen);
2,785!
246
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
2,785!
247

248
  tEncoderInit(&encoder, buf, tlen);
2,785✔
249
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
2,785!
250
    tEncoderClear(&encoder);
×
251
    TSDB_CHECK_CODE(code, lino, _over);
×
252
  }
253

254
  tEncoderClear(&encoder);
2,785✔
255

256
  int32_t dataPos = 0;
2,785✔
257
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
2,785!
258
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
2,785!
259
  SDB_SET_DATALEN(pRaw, dataPos, _over);
2,785!
260

261
_over:
2,785✔
262
  taosMemoryFreeClear(buf);
2,785!
263
  if (code != TSDB_CODE_SUCCESS) {
2,785!
264
    mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
×
265
    sdbFreeRaw(pRaw);
×
266
    terrno = code;
×
267
    return NULL;
×
268
  }
269

270
  terrno = 0;
2,785✔
271
  mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
2,785✔
272
         pStream->checkpointId);
273
  return pRaw;
2,785✔
274
}
275

276
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
2,315✔
277
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
2,315✔
278
  if (pCommitRaw == NULL) {
2,315!
279
    mError("failed to encode stream since %s", terrstr());
×
280
    mndTransDrop(pTrans);
×
281
    return terrno;
×
282
  }
283

284
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
2,315!
285
    mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
286
    sdbFreeRaw(pCommitRaw);
×
287
    mndTransDrop(pTrans);
×
288
    return terrno;
×
289
  }
290

291
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
2,315!
292
    mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
293
    sdbFreeRaw(pCommitRaw);
×
294
    mndTransDrop(pTrans);
×
295
    return terrno;
×
296
  }
297

298
  return 0;
2,315✔
299
}
300

301
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
15,207✔
302
                       int32_t retryCode, int32_t acceptCode) {
303
  STransAction action = {.epSet = *pEpset,
15,207✔
304
                         .contLen = contLen,
305
                         .pCont = pCont,
306
                         .msgType = msgType,
307
                         .retryCode = retryCode,
308
                         .acceptableCode = acceptCode};
309
  return mndTransAppendRedoAction(pTrans, &action);
15,207✔
310
}
311

312
bool isNodeUpdateTransActive() {
1,058✔
313
  bool  exist = false;
1,058✔
314
  void *pIter = NULL;
1,058✔
315

316
  streamMutexLock(&execInfo.lock);
1,058✔
317

318
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
1,568✔
319
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
510✔
320
    if (strcmp(pTransInfo->name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
510!
321
      mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed",
×
322
             pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime);
323
      exist = true;
×
324
    }
325
  }
326

327
  streamMutexUnlock(&execInfo.lock);
1,058✔
328
  return exist;
1,058✔
329
}
330

331
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
4✔
332
  void *pIter = NULL;
4✔
333

334
  while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
5✔
335
    SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
1✔
336
    if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
1!
337
      continue;
×
338
    }
339

340
    SStreamObj *pStream = NULL;
1✔
341
    int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
1✔
342
    if (pStream != NULL && code == 0) {
1!
343
      if (identicalName(pStream->sourceDb, pDBName, len)) {
×
344
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
×
345
      } else if (identicalName(pStream->targetDb, pDBName, len)) {
×
346
        mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
×
347
      }
348

349
      mndReleaseStream(pMnode, pStream);
×
350
    }
351
  }
352

353
  return TSDB_CODE_SUCCESS;
4✔
354
}
355

356
// kill all trans in the dst DB
357
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
4✔
358
  mDebug("start to clear checkpoints in all Dbs");
4✔
359
  char p[128] = {0};
4✔
360

361
  void *pIter = NULL;
4✔
362
  while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
8✔
363
    char *pDb = (char *)pIter;
4✔
364

365
    size_t len = 0;
4✔
366
    void  *pKey = taosHashGetKey(pDb, &len);
4✔
367
    int cpLen = (127 < len) ? 127 : len;
4✔
368
    TAOS_STRNCPY(p, pKey, cpLen);
4✔
369
    p[cpLen] = '\0';
4✔
370

371
    int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
4✔
372
    if (code) {
4!
373
      mError("failed to kill trans, transId:%p", pKey);
×
374
    } else {
375
      mDebug("clear checkpoint trans in Db:%s", p);
4✔
376
    }
377
  }
378

379
  mDebug("complete clear checkpoints in all Dbs");
4✔
380
}
4✔
381

382
void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) {
×
383
  int32_t code = 0;
×
384
  int64_t now = taosGetTimestampMs();
×
385
  int32_t num = taosArrayGetSize(pLongChkpts);
×
386

387
  mInfo("start to kill %d long checkpoint trans", num);
×
388

389
  for(int32_t i = 0; i < num; ++i) {
×
390
    SStreamTransInfo* pTrans = (SStreamTransInfo*) taosArrayGet(pLongChkpts, i);
×
391
    if (pTrans == NULL) {
×
392
      continue;
×
393
    }
394

395
    double el = (now - pTrans->startTime) / 1000.0;
×
396
    mInfo("stream:0x%" PRIx64 " start to kill ongoing long checkpoint transId:%d, elapsed time:%.2fs. killed",
×
397
          pTrans->streamId, pTrans->transId, el);
398

399
    SStreamObj *p = NULL;
×
400
    code = mndGetStreamObj(pMnode, pTrans->streamId, &p);
×
401
    if (code == 0 && p != NULL) {
×
402
      mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb);
×
403

404
      mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name,
×
405
             pTrans->streamId, pTrans->transId, p->checkpointId);
406

407
      code = mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId);
×
408
      if (code) {
×
409
        mError("stream:%s 0x%"PRIx64" failed to create reset stream task, code:%s", p->name, p->uid, tstrerror(code));
×
410
      }
411
      sdbRelease(pMnode->pSdb, p);
×
412
    }
413
  }
414
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc