• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4788

14 Oct 2025 11:21AM UTC coverage: 60.992% (-2.3%) from 63.264%
#4788

push

travis-ci

web-flow
Merge 7ca9b50f9 into 19574fe21

154868 of 324306 branches covered (47.75%)

Branch coverage included in aggregate %.

207304 of 269498 relevant lines covered (76.92%)

125773493.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.31
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
#define MAX_CHKPT_EXEC_ELAPSED (600*1000*3)  // 600s
20

21
typedef struct SKeyInfo {
22
  void   *pKey;
23
  int32_t keyLen;
24
} SKeyInfo;
25

26
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
27
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
28
}
29

30
int32_t mndStreamCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, STrans **ppTrans) {
225,135✔
31
  int64_t streamId = pStream->pCreate->streamId;
225,135✔
32
  int32_t code = 0;
225,135✔
33

34
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
225,135✔
35
  if (p == NULL) {
225,135!
36
    mstsError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
37
    return terrno;
×
38
  }
39

40
  mstsInfo("start to build trans %s, transId:%d", name, p->id);
225,135!
41
  p->ableToBeKilled = true;
225,135✔
42

43
  mndTransSetDbName(p, pStream->pCreate->streamDB, pStream->pCreate->outTblName);
225,135✔
44
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
225,135!
45
    mstsError("failed to build trans:%s for stream, code:%s", name, tstrerror(terrno));
×
46
    mndTransDrop(p);
×
47
    return code;
×
48
  }
49

50
  *ppTrans = p;
225,135✔
51
  return code;
225,135✔
52
}
53

54
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
420,747✔
55
  int32_t code = 0;
420,747✔
56
  int32_t lino = 0;
420,747✔
57
  void   *buf = NULL;
420,747✔
58
  int64_t streamId = pStream->pCreate->streamId;
420,747✔
59

60
  SEncoder encoder;
418,887✔
61
  tEncoderInit(&encoder, NULL, 0);
420,747✔
62
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
420,747!
63
    tEncoderClear(&encoder);
×
64
    TSDB_CHECK_CODE(code, lino, _over);
×
65
  }
66

67
  int32_t tlen = encoder.pos;
420,747✔
68
  tEncoderClear(&encoder);
420,747✔
69

70
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
420,747✔
71
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
420,747✔
72
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
420,747!
73

74
  buf = taosMemoryMalloc(tlen);
420,747!
75
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
420,747!
76

77
  tEncoderInit(&encoder, buf, tlen);
420,747✔
78
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
420,747!
79
    tEncoderClear(&encoder);
×
80
    TSDB_CHECK_CODE(code, lino, _over);
×
81
  }
82

83
  tEncoderClear(&encoder);
420,747✔
84

85
  int32_t dataPos = 0;
420,747✔
86
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
420,747!
87
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
420,747!
88
  SDB_SET_DATALEN(pRaw, dataPos, _over);
420,747!
89

90
_over:
420,747✔
91

92
  taosMemoryFreeClear(buf);
420,747!
93
  if (code != TSDB_CODE_SUCCESS) {
420,747!
94
    mstsError("failed to encode stream %s to raw:%p at line:%d since %s", pStream->pCreate->name, pRaw, lino, tstrerror(code));
×
95
    sdbFreeRaw(pRaw);
×
96
    terrno = code;
×
97
    return NULL;
×
98
  }
99

100
  mstsTrace("stream %s encoded to raw:%p", pStream->pCreate->name, pRaw);
420,747!
101
         
102
  return pRaw;
420,747✔
103
}
104

105
int32_t mndStreamTransAppend(SStreamObj *pStream, STrans *pTrans, int32_t status) {
232,036✔
106
  int64_t streamId = pStream->pCreate->streamId;
232,036✔
107
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
232,036✔
108
  if (pCommitRaw == NULL) {
232,036!
109
    mstsError("failed to encode stream since %s", terrstr());
×
110
    mndTransDrop(pTrans);
×
111
    return terrno;
×
112
  }
113

114
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
232,036!
115
    mstsError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
116
    sdbFreeRaw(pCommitRaw);
×
117
    mndTransDrop(pTrans);
×
118
    return terrno;
×
119
  }
120

121
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
232,036!
122
    mstsError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
123
    sdbFreeRaw(pCommitRaw);
×
124
    mndTransDrop(pTrans);
×
125
    return terrno;
×
126
  }
127

128
  return 0;
232,036✔
129
}
130

131
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
×
132
                       int32_t retryCode, int32_t acceptCode) {
133
  STransAction action = {.epSet = *pEpset,
×
134
                         .contLen = contLen,
135
                         .pCont = pCont,
136
                         .msgType = msgType,
137
                         .retryCode = retryCode,
138
                         .acceptableCode = acceptCode};
139
  return mndTransAppendRedoAction(pTrans, &action);
×
140
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc