• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndStreamTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
#define MAX_CHKPT_EXEC_ELAPSED (600*1000*3)  // 600s
20

21
typedef struct SKeyInfo {
22
  void   *pKey;
23
  int32_t keyLen;
24
} SKeyInfo;
25

26
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
×
27
  return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
×
28
}
29

30
int32_t mndStreamCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, STrans **ppTrans) {
×
31
  int64_t streamId = pStream->pCreate->streamId;
×
32
  int32_t code = 0;
×
33

34
  STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
×
35
  if (p == NULL) {
×
36
    mstsError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
×
37
    return terrno;
×
38
  }
39

40
  mstsInfo("start to build trans %s, transId:%d", name, p->id);
×
41
  p->ableToBeKilled = true;
×
42

43
  mndTransSetDbName(p, pStream->pCreate->streamDB, pStream->pCreate->outTblName);
×
44
  if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
×
45
    mstsError("failed to build trans:%s for stream, code:%s", name, tstrerror(terrno));
×
46
    mndTransDrop(p);
×
47
    return code;
×
48
  }
49

50
  *ppTrans = p;
×
51
  return code;
×
52
}
53

54
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
×
55
  int32_t code = 0;
×
56
  int32_t lino = 0;
×
57
  void   *buf = NULL;
×
58
  int64_t streamId = pStream->pCreate->streamId;
×
59

60
  SEncoder encoder;
61
  tEncoderInit(&encoder, NULL, 0);
×
62
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
×
63
    tEncoderClear(&encoder);
×
64
    TSDB_CHECK_CODE(code, lino, _over);
×
65
  }
66

67
  int32_t tlen = encoder.pos;
×
68
  tEncoderClear(&encoder);
×
69

70
  int32_t  size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
×
71
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
×
72
  TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
×
73

74
  buf = taosMemoryMalloc(tlen);
×
75
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
×
76

77
  tEncoderInit(&encoder, buf, tlen);
×
78
  if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
×
79
    tEncoderClear(&encoder);
×
80
    TSDB_CHECK_CODE(code, lino, _over);
×
81
  }
82

83
  tEncoderClear(&encoder);
×
84

85
  int32_t dataPos = 0;
×
86
  SDB_SET_INT32(pRaw, dataPos, tlen, _over);
×
87
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
×
88
  SDB_SET_DATALEN(pRaw, dataPos, _over);
×
89

90
_over:
×
91

92
  taosMemoryFreeClear(buf);
×
93
  if (code != TSDB_CODE_SUCCESS) {
×
94
    mstsError("failed to encode stream %s to raw:%p at line:%d since %s", pStream->pCreate->name, pRaw, lino, tstrerror(code));
×
95
    sdbFreeRaw(pRaw);
×
96
    terrno = code;
×
97
    return NULL;
×
98
  }
99

100
  mstsTrace("stream %s encoded to raw:%p", pStream->pCreate->name, pRaw);
×
101
         
102
  return pRaw;
×
103
}
104

105
int32_t mndStreamTransAppend(SStreamObj *pStream, STrans *pTrans, int32_t status) {
×
106
  int64_t streamId = pStream->pCreate->streamId;
×
107
  SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
×
108
  if (pCommitRaw == NULL) {
×
109
    mstsError("failed to encode stream since %s", terrstr());
×
110
    mndTransDrop(pTrans);
×
111
    return terrno;
×
112
  }
113

114
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
×
115
    mstsError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
116
    sdbFreeRaw(pCommitRaw);
×
117
    mndTransDrop(pTrans);
×
118
    return terrno;
×
119
  }
120

121
  if (sdbSetRawStatus(pCommitRaw, status) != 0) {
×
122
    mstsError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
×
123
    sdbFreeRaw(pCommitRaw);
×
124
    mndTransDrop(pTrans);
×
125
    return terrno;
×
126
  }
127

128
  return 0;
×
129
}
130

131
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
×
132
                       int32_t retryCode, int32_t acceptCode) {
133
  STransAction action = {.epSet = *pEpset,
×
134
                         .contLen = contLen,
135
                         .pCont = pCont,
136
                         .msgType = msgType,
137
                         .retryCode = retryCode,
138
                         .acceptableCode = acceptCode};
139
  return mndTransAppendRedoAction(pTrans, &action);
×
140
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc