• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacks-network / stacks-core / 26250451051-1

21 May 2026 08:11PM UTC coverage: 85.585% (-0.1%) from 85.712%
26250451051-1

Pull #7215

github

ec9d4c
web-flow
Merge 9487bf852 into af1280aac
Pull Request #7215: Chore: fix flake in non_blocking_minority_configured_to_favour_...

188844 of 220651 relevant lines covered (85.58%)

18975267.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.78
/libsigner/src/session.rs
1
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
2
// Copyright (C) 2020-2024 Stacks Open Internet Foundation
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
use std::net::TcpStream;
18
use std::str;
19
use std::time::Duration;
20

21
use clarity::vm::types::QualifiedContractIdentifier;
22
use libstackerdb::{
23
    stackerdb_get_chunk_path, stackerdb_get_metadata_path, stackerdb_post_chunk_path, SlotMetadata,
24
    StackerDBChunkAckData, StackerDBChunkData, SIGNERS_STACKERDB_CHUNK_SIZE,
25
    STACKERDB_MAX_CHUNK_SIZE,
26
};
27
use stacks_common::codec::StacksMessageCodec;
28

29
use crate::error::RPCError;
30
use crate::http::run_http_request;
31

32
/// Trait for connecting to and querying a signer Stacker DB replica
33
pub trait SignerSession {
34
    /// connect to the replica
35
    fn connect(
36
        &mut self,
37
        host: String,
38
        stackerdb_contract_id: QualifiedContractIdentifier,
39
    ) -> Result<(), RPCError>;
40
    /// query the replica for a list of chunks
41
    fn list_chunks(&mut self) -> Result<Vec<SlotMetadata>, RPCError>;
42
    /// query the replica for zero or more chunks
43
    fn get_chunks(
44
        &mut self,
45
        slots_and_versions: &[(u32, u32)],
46
    ) -> Result<Vec<Option<Vec<u8>>>, RPCError>;
47
    /// query the replica for zero or more latest chunks
48
    fn get_latest_chunks(&mut self, slot_ids: &[u32]) -> Result<Vec<Option<Vec<u8>>>, RPCError>;
49
    /// Upload a chunk to the stacker DB instance
50
    fn put_chunk(&mut self, chunk: &StackerDBChunkData) -> Result<StackerDBChunkAckData, RPCError>;
51

52
    /// Get a single chunk with the given version
53
    /// Returns Ok(Some(..)) if the chunk exists
54
    /// Returns Ok(None) if the chunk with the given version does not exist
55
    /// Returns Err(..) on transport error
56
    fn get_chunk(&mut self, slot_id: u32, version: u32) -> Result<Option<Vec<u8>>, RPCError> {
×
57
        let mut chunks = self.get_chunks(&[(slot_id, version)])?;
×
58
        // check if chunks is empty because [0] and remove(0) panic on out-of-bounds
59
        if chunks.is_empty() {
×
60
            return Ok(None);
×
61
        }
×
62
        // swap_remove breaks the ordering of latest_chunks, but we don't care because we
63
        //  only want the first element anyways.
64
        Ok(chunks.swap_remove(0))
×
65
    }
×
66

67
    /// Get a single latest chunk.
68
    /// Returns Ok(Some(..)) if the slot exists
69
    /// Returns Ok(None) if not
70
    /// Returns Err(..) on transport error
71
    fn get_latest_chunk(&mut self, slot_id: u32) -> Result<Option<Vec<u8>>, RPCError> {
2,762✔
72
        let mut latest_chunks = self.get_latest_chunks(&[slot_id])?;
2,762✔
73
        // check if latest_chunks is empty because [0] and remove(0) panic on out-of-bounds
74
        if latest_chunks.is_empty() {
2,762✔
75
            return Ok(None);
×
76
        }
2,762✔
77
        // swap_remove breaks the ordering of latest_chunks, but we don't care because we
78
        //  only want the first element anyways.
79
        Ok(latest_chunks.swap_remove(0))
2,762✔
80
    }
2,762✔
81

82
    /// Get a single latest chunk from the StackerDB and deserialize into `T` using the
83
    /// StacksMessageCodec.
84
    fn get_latest<T: StacksMessageCodec>(&mut self, slot_id: u32) -> Result<Option<T>, RPCError> {
2,762✔
85
        let Some(latest_bytes) = self.get_latest_chunk(slot_id)? else {
2,762✔
86
            return Ok(None);
×
87
        };
88
        Some(
89
            T::consensus_deserialize(&mut latest_bytes.as_slice()).map_err(|e| {
2,762✔
90
                let msg = format!("StacksMessageCodec::consensus_deserialize failure: {e}");
159✔
91
                RPCError::Deserialize(msg)
159✔
92
            }),
159✔
93
        )
94
        .transpose()
2,762✔
95
    }
2,762✔
96
}
97

98
/// signer session for a stackerdb instance
99
#[derive(Debug)]
100
pub struct StackerDBSession {
101
    /// host we're talking to
102
    pub host: String,
103
    /// contract we're talking to
104
    pub stackerdb_contract_id: QualifiedContractIdentifier,
105
    /// connection to the replica
106
    sock: Option<TcpStream>,
107
    /// The timeout applied to HTTP read and write operations
108
    socket_timeout: Duration,
109
}
110

111
impl StackerDBSession {
112
    /// instantiate but don't connect
113
    pub fn new(
29,131✔
114
        host: &str,
29,131✔
115
        stackerdb_contract_id: QualifiedContractIdentifier,
29,131✔
116
        socket_timeout: Duration,
29,131✔
117
    ) -> StackerDBSession {
29,131✔
118
        StackerDBSession {
29,131✔
119
            host: host.to_owned(),
29,131✔
120
            stackerdb_contract_id,
29,131✔
121
            sock: None,
29,131✔
122
            socket_timeout,
29,131✔
123
        }
29,131✔
124
    }
29,131✔
125

126
    /// connect or reconnect to the node
127
    fn connect_or_reconnect(&mut self) -> Result<(), RPCError> {
247,610✔
128
        debug!("connect to {}", &self.host);
247,610✔
129
        let sock = TcpStream::connect(&self.host)?;
247,610✔
130
        // Make sure we don't hang forever if for some reason our node does not
131
        // respond as expected such as failing to properly close the connection
132
        sock.set_read_timeout(Some(self.socket_timeout))?;
247,043✔
133
        sock.set_write_timeout(Some(self.socket_timeout))?;
247,043✔
134
        self.sock = Some(sock);
247,043✔
135
        Ok(())
247,043✔
136
    }
247,610✔
137

138
    /// Do something with the connected socket
139
    fn with_socket<F, R>(&mut self, todo: F) -> Result<R, RPCError>
247,609✔
140
    where
247,609✔
141
        F: FnOnce(&mut StackerDBSession, &mut TcpStream) -> R,
247,609✔
142
    {
143
        // TODO: fix this so we can use persistent connection
144
        // See https://github.com/stacks-network/stacks-blockchain/issues/3922
145
        //if self.sock.is_none() {
146
        self.connect_or_reconnect()?;
247,609✔
147

148
        let mut sock = if let Some(s) = self.sock.take() {
247,042✔
149
            s
247,042✔
150
        } else {
151
            return Err(RPCError::NotConnected);
×
152
        };
153

154
        let res = todo(self, &mut sock);
247,042✔
155

156
        self.sock = Some(sock);
247,042✔
157
        Ok(res)
247,042✔
158
    }
247,609✔
159

160
    /// send an HTTP RPC request and receive a reply.
161
    /// Return the HTTP reply, decoded if it was chunked
162
    fn rpc_request(
247,609✔
163
        &mut self,
247,609✔
164
        verb: &str,
247,609✔
165
        path: &str,
247,609✔
166
        content_type: Option<&str>,
247,609✔
167
        payload: &[u8],
247,609✔
168
    ) -> Result<Vec<u8>, RPCError> {
247,609✔
169
        self.with_socket(|session, sock| {
247,609✔
170
            run_http_request(sock, &session.host, verb, path, content_type, payload)
247,042✔
171
        })?
247,042✔
172
    }
247,609✔
173
}
174

175
impl SignerSession for StackerDBSession {
176
    /// connect to the replica
177
    fn connect(
×
178
        &mut self,
×
179
        host: String,
×
180
        stackerdb_contract_id: QualifiedContractIdentifier,
×
181
    ) -> Result<(), RPCError> {
×
182
        self.host = host;
×
183
        self.stackerdb_contract_id = stackerdb_contract_id;
×
184
        self.connect_or_reconnect()
×
185
    }
×
186

187
    /// query the replica for a list of chunks
188
    fn list_chunks(&mut self) -> Result<Vec<SlotMetadata>, RPCError> {
×
189
        let bytes = self.rpc_request(
×
190
            "GET",
×
191
            &stackerdb_get_metadata_path(self.stackerdb_contract_id.clone()),
×
192
            None,
×
193
            &[],
×
194
        )?;
×
195
        let metadata: Vec<SlotMetadata> = serde_json::from_slice(&bytes)
×
196
            .map_err(|e| RPCError::Deserialize(format!("{:?}", &e)))?;
×
197
        Ok(metadata)
×
198
    }
×
199

200
    /// query the replica for zero or more chunks
201
    fn get_chunks(
×
202
        &mut self,
×
203
        slots_and_versions: &[(u32, u32)],
×
204
    ) -> Result<Vec<Option<Vec<u8>>>, RPCError> {
×
205
        let mut payloads = vec![];
×
206
        for (slot_id, slot_version) in slots_and_versions.iter() {
×
207
            let path = stackerdb_get_chunk_path(
×
208
                self.stackerdb_contract_id.clone(),
×
209
                *slot_id,
×
210
                Some(*slot_version),
×
211
            );
212
            let chunk = match self.rpc_request("GET", &path, None, &[]) {
×
213
                Ok(body_bytes) => Some(body_bytes),
×
214
                Err(RPCError::HttpError(code)) => {
×
215
                    if code != 404 {
×
216
                        return Err(RPCError::HttpError(code));
×
217
                    }
×
218
                    None
×
219
                }
220
                Err(e) => {
×
221
                    return Err(e);
×
222
                }
223
            };
224
            payloads.push(chunk);
×
225
        }
226
        Ok(payloads)
×
227
    }
×
228

229
    /// query the replica for zero or more latest chunks
230
    fn get_latest_chunks(&mut self, slot_ids: &[u32]) -> Result<Vec<Option<Vec<u8>>>, RPCError> {
16,596✔
231
        let mut payloads = vec![];
16,596✔
232
        let limit = if self.stackerdb_contract_id.name.starts_with("signer") {
16,596✔
233
            SIGNERS_STACKERDB_CHUNK_SIZE
7,266✔
234
        } else {
235
            usize::try_from(STACKERDB_MAX_CHUNK_SIZE)
9,330✔
236
                .expect("infallible: StackerDB chunk size exceeds usize::MAX")
9,330✔
237
        };
238
        for slot_id in slot_ids.iter() {
44,619✔
239
            let path = stackerdb_get_chunk_path(self.stackerdb_contract_id.clone(), *slot_id, None);
44,619✔
240
            let chunk = match self.rpc_request("GET", &path, None, &[]) {
44,619✔
241
                Ok(body_bytes) => {
44,616✔
242
                    // Verify that the chunk is not too large
243
                    if body_bytes.len() > limit {
44,616✔
244
                        None
×
245
                    } else {
246
                        Some(body_bytes)
44,616✔
247
                    }
248
                }
249
                Err(RPCError::HttpError(code)) => {
3✔
250
                    if code != 404 {
3✔
251
                        return Err(RPCError::HttpError(code));
3✔
252
                    }
1✔
253
                    None
1✔
254
                }
255
                Err(e) => {
×
256
                    return Err(e);
×
257
                }
258
            };
259
            payloads.push(chunk);
44,616✔
260
        }
261
        Ok(payloads)
16,593✔
262
    }
16,596✔
263

264
    /// upload a chunk
265
    fn put_chunk(&mut self, chunk: &StackerDBChunkData) -> Result<StackerDBChunkAckData, RPCError> {
202,989✔
266
        let body =
202,989✔
267
            serde_json::to_vec(chunk).map_err(|e| RPCError::Deserialize(format!("{e:?}")))?;
202,989✔
268
        let path = stackerdb_post_chunk_path(self.stackerdb_contract_id.clone());
202,989✔
269
        let resp_bytes = self.rpc_request("POST", &path, Some("application/json"), &body)?;
202,989✔
270
        let ack: StackerDBChunkAckData = serde_json::from_slice(&resp_bytes)
202,326✔
271
            .map_err(|e| RPCError::Deserialize(format!("{e:?}")))?;
202,326✔
272
        Ok(ack)
202,326✔
273
    }
202,989✔
274
}
275

276
#[cfg(test)]
277
mod tests {
278
    use std::io::Write;
279
    use std::net::TcpListener;
280
    use std::thread;
281

282
    use super::*;
283

284
    #[test]
285
    fn socket_timeout_works_as_expected() {
1✔
286
        let listener = TcpListener::bind("127.0.0.1:0").expect("bind failed");
1✔
287
        let addr = listener.local_addr().unwrap();
1✔
288

289
        let short_timeout = Duration::from_millis(200);
1✔
290
        thread::spawn(move || {
1✔
291
            if let Ok((mut stream, _)) = listener.accept() {
1✔
292
                // Sleep long enough so the client should hit its timeout
1✔
293
                std::thread::sleep(short_timeout * 2);
1✔
294
                let _ = stream.write_all(b"HTTP/1.1 200 OK\r\n\r\n");
1✔
295
            }
1✔
296
        });
1✔
297

298
        let contract_id = QualifiedContractIdentifier::transient();
1✔
299
        let mut session = StackerDBSession::new(&addr.to_string(), contract_id, short_timeout);
1✔
300

301
        session.connect_or_reconnect().expect("connect failed");
1✔
302

303
        // This should fail due to the timeout
304
        let result = session.rpc_request("GET", "/", None, &[]);
1✔
305
        match result {
1✔
306
            Err(RPCError::IO(e)) => {
1✔
307
                assert_eq!(e.kind(), std::io::ErrorKind::WouldBlock);
1✔
308
            }
309
            other => panic!("expected timeout error, got {other:?}"),
×
310
        }
311
    }
1✔
312
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc