• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3522

07 Nov 2024 05:59AM UTC coverage: 58.216% (+1.3%) from 56.943%
#3522

push

travis-ci

web-flow
Merge pull request #28663 from taosdata/fix/3_liaohj

fix(stream): stop the underlying scan operations for stream

111884 of 248391 branches covered (45.04%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

1164 existing lines in 134 files now uncovered.

191720 of 273118 relevant lines covered (70.2%)

13088725.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.72
/source/libs/sync/src/syncRaftEntry.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncRaftEntry.h"
18
#include "syncUtil.h"
19
#include "tref.h"
20

21
SSyncRaftEntry* syncEntryBuild(int32_t dataLen) {
11,835,225✔
22
  int32_t         bytes = sizeof(SSyncRaftEntry) + dataLen;
11,835,225✔
23
  SSyncRaftEntry* pEntry = taosMemoryCalloc(1, bytes);
11,835,225✔
24
  if (pEntry == NULL) {
11,835,226!
25
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
26
    return NULL;
×
27
  }
28

29
  pEntry->bytes = bytes;
11,835,269✔
30
  pEntry->dataLen = dataLen;
11,835,269✔
31
  pEntry->rid = -1;
11,835,269✔
32

33
  return pEntry;
11,835,269✔
34
}
35

36
SSyncRaftEntry* syncEntryBuildFromClientRequest(const SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
94,167✔
37
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
94,167✔
38
  if (pEntry == NULL) return NULL;
94,168!
39

40
  pEntry->msgType = pMsg->msgType;
94,168✔
41
  pEntry->originalRpcType = pMsg->originalRpcType;
94,168✔
42
  pEntry->seqNum = pMsg->seqNum;
94,168✔
43
  pEntry->isWeak = pMsg->isWeak;
94,168✔
44
  pEntry->term = term;
94,168✔
45
  pEntry->index = index;
94,168✔
46
  memcpy(pEntry->data, pMsg->data, pMsg->dataLen);
94,168✔
47

48
  return pEntry;
94,168✔
49
}
50

51
SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, SyncIndex index) {
8,957,193✔
52
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->contLen);
8,957,193✔
53
  if (pEntry == NULL) return NULL;
8,957,269!
54

55
  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
8,957,269✔
56
  pEntry->originalRpcType = pMsg->msgType;
8,957,269✔
57
  pEntry->seqNum = 0;
8,957,269✔
58
  pEntry->isWeak = 0;
8,957,269✔
59
  pEntry->term = term;
8,957,269✔
60
  pEntry->index = index;
8,957,269✔
61
  memcpy(pEntry->data, pMsg->pCont, pMsg->contLen);
8,957,269✔
62

63
  return pEntry;
8,957,269✔
64
}
65

66
SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
2,564,541✔
67
  SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
2,564,541✔
68
  if (pEntry == NULL) {
2,564,543!
UNCOV
69
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
70
    return NULL;
×
71
  }
72
  memcpy(pEntry, pMsg->data, pMsg->dataLen);
2,564,543✔
73
  if (pEntry->bytes != pMsg->dataLen) {
2,564,543!
74
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
75
    return NULL;
×
76
  }
77
  return pEntry;
2,564,543✔
78
}
79

80
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) {
11,682✔
81
  SSyncRaftEntry* pEntry = syncEntryBuild(sizeof(SMsgHead));
11,682✔
82
  if (pEntry == NULL) return NULL;
11,682!
83

84
  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
11,682✔
85
  pEntry->originalRpcType = TDMT_SYNC_NOOP;
11,682✔
86
  pEntry->seqNum = 0;
11,682✔
87
  pEntry->isWeak = 0;
11,682✔
88
  pEntry->term = term;
11,682✔
89
  pEntry->index = index;
11,682✔
90

91
  SMsgHead* pHead = (SMsgHead*)pEntry->data;
11,682✔
92
  pHead->vgId = vgId;
11,682✔
93
  pHead->contLen = sizeof(SMsgHead);
11,682✔
94

95
  return pEntry;
11,682✔
96
}
97

98
void syncEntryDestroy(SSyncRaftEntry* pEntry) {
33,183,683✔
99
  if (pEntry != NULL) {
33,183,683✔
100
    sTrace("free entry:%p", pEntry);
14,395,976✔
101
    taosMemoryFree(pEntry);
14,395,976✔
102
  }
103
}
33,183,076✔
104

105
int32_t syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
2,870,407✔
106
  pRpcMsg->msgType = pEntry->originalRpcType;
2,870,407✔
107
  pRpcMsg->contLen = (int32_t)(pEntry->dataLen);
2,870,407✔
108
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
2,870,407✔
109
  if (pRpcMsg->pCont == NULL) {
2,870,403!
UNCOV
110
    return terrno;
×
111
  }
112
  memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
2,870,436✔
113

114
  return 0;
2,870,436✔
115
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc