• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5033

24 Apr 2026 11:25AM UTC coverage: 73.058% (-0.02%) from 73.076%
#5033

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1337 of 1975 new or added lines in 48 files covered. (67.7%)

14198 existing lines in 150 files now uncovered.

275895 of 377640 relevant lines covered (73.06%)

132323361.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.64
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
#define MAX_CHKPT_EXEC_ELAPSED (600*1000*3)  // 600s
20

21
typedef struct SKeyInfo {
22
  void   *pKey;
23
  int32_t keyLen;
24
} SKeyInfo;
25

26
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
27
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
28
}
29

30
int32_t mndStreamCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, STrans **ppTrans) {
210,316✔
31
  int64_t streamId = pStream->pCreate->streamId;
210,316✔
32
  int32_t code = 0;
210,316✔
33

34
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
210,316✔
35
  if (p == NULL) {
210,316✔
36
    mstsError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
37
    return terrno;
×
38
  }
39

40
  mstsInfo("start to build trans %s, transId:%d", name, p->id);
210,316✔
41
  p->ableToBeKilled = true;
210,316✔
42

43
  mndTransSetDbName(p, pStream->pCreate->streamDB, pStream->pCreate->outTblName);
210,316✔
44
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
210,316✔
45
    mstsError("failed to build trans:%s for stream, code:%s", name, tstrerror(terrno));
×
46
    mndTransDrop(p);
×
47
    return code;
×
48
  }
49

50
  *ppTrans = p;
210,316✔
51
  return code;
210,316✔
52
}
53

54
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
418,840✔
55
  int32_t code = 0;
418,840✔
56
  int32_t lino = 0;
418,840✔
57
  void   *buf = NULL;
418,840✔
58
  int64_t streamId = pStream->pCreate->streamId;
418,840✔
59

60
  SEncoder encoder;
418,143✔
61
  tEncoderInit(&encoder, NULL, 0);
418,840✔
62
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
418,840✔
63
    tEncoderClear(&encoder);
×
64
    TSDB_CHECK_CODE(code, lino, _over);
×
65
  }
66

67
  int32_t tlen = encoder.pos;
418,840✔
68
  tEncoderClear(&encoder);
418,840✔
69

70
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
418,840✔
71
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
418,840✔
72
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
418,840✔
73

74
  buf = taosMemoryMalloc(tlen);
418,840✔
75
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
418,840✔
76

77
  tEncoderInit(&encoder, buf, tlen);
418,840✔
78
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
418,840✔
79
    tEncoderClear(&encoder);
×
80
    TSDB_CHECK_CODE(code, lino, _over);
×
81
  }
82

83
  tEncoderClear(&encoder);
418,840✔
84

85
  int32_t dataPos = 0;
418,840✔
86
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
418,840✔
87
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
418,840✔
88
  SDB_SET_DATALEN(pRaw, dataPos, _over);
418,840✔
89

90
_over:
418,840✔
91

92
  taosMemoryFreeClear(buf);
418,840✔
93
  if (code != TSDB_CODE_SUCCESS) {
418,840✔
94
    mstsError("failed to encode stream %s to raw:%p at line:%d since %s", pStream->pCreate->name, pRaw, lino, tstrerror(code));
×
95
    sdbFreeRaw(pRaw);
×
96
    terrno = code;
×
97
    return NULL;
×
98
  }
99

100
  mstsTrace("stream %s encoded to raw:%p", pStream->pCreate->name, pRaw);
418,840✔
101
         
102
  return pRaw;
418,840✔
103
}
104

105
int32_t mndStreamTransAppend(SStreamObj *pStream, STrans *pTrans, int32_t status) {
246,718✔
106
  int64_t streamId = pStream->pCreate->streamId;
246,718✔
107
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
246,718✔
108
  if (pCommitRaw == NULL) {
246,718✔
109
    mstsError("failed to encode stream since %s", terrstr());
×
110
    mndTransDrop(pTrans);
×
111
    return terrno;
×
112
  }
113

114
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
246,718✔
115
    mstsError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
116
    sdbFreeRaw(pCommitRaw);
×
117
    mndTransDrop(pTrans);
×
118
    return terrno;
×
119
  }
120

121
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
246,718✔
122
    mstsError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
123
    mndTransDrop(pTrans);
×
124
    return terrno;
×
125
  }
126

127
  return 0;
246,718✔
128
}
129

UNCOV
130
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
×
131
                       int32_t retryCode, int32_t acceptCode) {
UNCOV
132
  STransAction action = {.epSet = *pEpset,
×
133
                         .contLen = contLen,
134
                         .pCont = pCont,
135
                         .msgType = msgType,
136
                         .retryCode = retryCode,
137
                         .acceptableCode = acceptCode};
UNCOV
138
  return mndTransAppendRedoAction(pTrans, &action);
×
139
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc