• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15280118615

27 May 2025 04:01PM UTC coverage: 73.59% (+0.4%) from 73.233%
15280118615

push

github

web-flow
feat: add base node HTTP wallet service (#7061)

Description
---
Added a new HTTP server for base node that exposes some wallet related
query functionality.

Current new endpoints (examples on **esmeralda** network):
 - http://127.0.0.1:9005/get_tip_info
 - http://127.0.0.1:9005/get_header_by_height?height=6994
 - http://127.0.0.1:9005/get_height_at_time?time=1747739959

Default ports for http service (by network):
```
MainNet: 9000,
StageNet: 9001,
NextNet: 9002,
LocalNet: 9003,
Igor: 9004,
Esmeralda: 9005,
```

New configuration needs to be set in base node:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000" # this is optional, but if not set, when someone requests for the external address, just returns a None, so wallets can't contact base node
```

Motivation and Context
---


How Has This Been Tested?
---
### Manually

#### Basic test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base node (with `tail -f ...` command for instance) and
see that the HTTP endpoints are used

#### Use RPC fallback test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9001"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base nod... (continued)

9 of 114 new or added lines in 4 files covered. (7.89%)

1592 existing lines in 62 files now uncovered.

82227 of 111736 relevant lines covered (73.59%)

272070.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.09
/common_sqlite/src/connection.rs
1
// Copyright 2020. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::TryFrom,
25
    env::temp_dir,
26
    fs,
27
    iter,
28
    path::{Path, PathBuf},
29
    sync::{Arc, RwLock, RwLockWriteGuard},
30
    time::Duration,
31
};
32

33
use diesel::{
34
    r2d2::{ConnectionManager, PooledConnection},
35
    SqliteConnection,
36
};
37
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
38
use log::*;
39
use rand::{distributions::Alphanumeric, thread_rng, Rng};
40
use serde::{Deserialize, Serialize};
41

42
use crate::{
43
    error::{SqliteStorageError, StorageError},
44
    sqlite_connection_pool::{PooledDbConnection, SqliteConnectionPool},
45
};
46

47
const LOG_TARGET: &str = "common_sqlite::connection";
48

49
/// Describes how to connect to the database (currently, SQLite).
UNCOV
50
#[derive(Clone, Debug, Serialize, Deserialize)]
×
51
#[serde(into = "String", try_from = "String")]
52
pub enum DbConnectionUrl {
53
    /// In-memory database. Each connection has it's own database
54
    Memory,
55
    /// In-memory database shared with more than one in-process connection according to the given identifier
56
    MemoryShared(String),
57
    /// Database persisted on disk
58
    File(PathBuf),
59
}
60

61
impl DbConnectionUrl {
62
    /// Use a file to store the database
63
    pub fn file<P: AsRef<Path>>(path: P) -> Self {
37✔
64
        DbConnectionUrl::File(path.as_ref().to_path_buf())
37✔
65
    }
37✔
66

67
    /// Returns a database connection string
68
    pub fn to_url_string(&self) -> String {
743✔
69
        use DbConnectionUrl::{File, Memory, MemoryShared};
70
        match self {
743✔
71
            Memory => ":memory:".to_owned(),
84✔
72
            MemoryShared(identifier) => format!("file:{}?mode=memory&cache=shared", identifier),
394✔
73
            File(path) => path
265✔
74
                .to_str()
265✔
75
                .expect("Invalid non-UTF8 character in database path")
265✔
76
                .to_owned(),
265✔
77
        }
78
    }
743✔
79

80
    /// Sets relative paths to use a common base path
UNCOV
81
    pub fn set_base_path<P: AsRef<Path>>(&mut self, base_path: P) {
×
82
        if let DbConnectionUrl::File(inner) = self {
×
UNCOV
83
            if !inner.is_absolute() {
×
UNCOV
84
                *inner = base_path.as_ref().join(inner.as_path());
×
UNCOV
85
            }
×
UNCOV
86
        }
×
87
    }
×
88
}
89

90
impl From<DbConnectionUrl> for String {
UNCOV
91
    fn from(source: DbConnectionUrl) -> Self {
×
UNCOV
92
        source.to_url_string()
×
UNCOV
93
    }
×
94
}
95

96
impl TryFrom<String> for DbConnectionUrl {
97
    type Error = String;
98

99
    fn try_from(value: String) -> Result<Self, Self::Error> {
×
UNCOV
100
        if value.as_str() == ":memory:" {
×
101
            Ok(Self::Memory)
×
102
        } else {
UNCOV
103
            Ok(Self::File(PathBuf::from(value)))
×
104
        }
UNCOV
105
    }
×
106
}
107

108
lazy_static::lazy_static! {
109
    static ref DB_WRITE_LOCK: Arc<RwLock<()>> = Arc::new(RwLock::new(()));
110
}
111

112
/// An SQLite database connection using the Diesel ORM with its r2d2 connection pool and SQLite WAL backend.
113
/// --------------------------------------------------------------------------------------------------------------------
114
/// Notes on SQLite’s Concurrency Limitations (causes of intermittent “Database is Locked” errors)
115
///
116
/// SQLite allows only one writer at a time, even in WAL mode. Under high concurrency (e.g. many threads doing writes),
117
/// collisions are inevitable – one transaction holds an exclusive write lock while others must wait. If a write lock
118
/// cannot be acquired within the busy_timeout, SQLite returns a SQLITE_BUSY (“database is locked”) error. In WAL mode,
119
/// readers don’t block writers and vice versa, but still only one writer can commit at any given moment. This
120
/// single-writer bottleneck means that bursts of simultaneous writes can lead to contention. If a transaction takes too
121
/// long (holding the lock), queued writers may time out (even with a 60s timeout). In short, heavy write concurrency
122
/// can exceed SQLite’s design limits, causing intermittent “database is locked” errors during high load.
123
///
124
/// “Busy Timeout” Not Always Honored – Deferred Write Pitfall: Even with WAL + a busy timeout, you can still get
125
/// immediate lock errors in certain cases. A known scenario involves deferred transactions upgrading to writes, often
126
/// called the “write-after-read” pattern. By default, BEGIN in SQLite is deferred – the transaction starts as read-only
127
/// if the first statement is a SELECT. If you later issue a write in that same transaction, SQLite will try to upgrade
128
/// it to a write transaction.
129
///
130
/// Mitigations and Best Practices for Write Concurrency with SQLite
131
/// - Use WAL Mode and Busy Timeout
132
/// - Start Write Transactions in IMMEDIATE Mode (`SqliteConnection::immediate_transaction(...)`)
133
/// - Keep Transactions Short and Optimize Write Duration
134
/// - Limit Write Concurrency & Pool Sizing
135
/// - Handle and Retry Busy Errors Gracefully
136
/// -
137
/// --------------------------------------------------------------------------------------------------------------------
138
#[derive(Clone)]
139
pub struct DbConnection {
140
    pool: SqliteConnectionPool,
141
}
142

143
impl DbConnection {
144
    /// Connect using the given [DbConnectionUrl](self::DbConnectionUrl), optionally using the given pool size to
145
    /// override the default setting of 1.
146
    /// Note: See https://github.com/launchbadge/sqlx/issues/362#issuecomment-636661146
147
    pub fn connect_url(db_url: &DbConnectionUrl, sqlite_pool_size: Option<usize>) -> Result<Self, StorageError> {
743✔
148
        debug!(target: LOG_TARGET, "Connecting to database using '{:?}'", db_url);
743✔
149

150
        // Ensure the path exists
151
        if let DbConnectionUrl::File(ref path) = db_url {
743✔
152
            if let Some(parent) = path.parent() {
265✔
153
                std::fs::create_dir_all(parent)?;
265✔
UNCOV
154
            }
×
155
        }
478✔
156

157
        let mut pool = SqliteConnectionPool::new(
743✔
158
            db_url.to_url_string(),
743✔
159
            sqlite_pool_size.unwrap_or(1),
743✔
160
            true,
743✔
161
            true,
743✔
162
            Duration::from_secs(60),
743✔
163
        );
743✔
164
        pool.create_pool()?;
743✔
165

166
        Ok(Self::new(pool))
743✔
167
    }
743✔
168

169
    fn acquire_migration_write_lock() -> Result<RwLockWriteGuard<'static, ()>, StorageError> {
379✔
170
        match DB_WRITE_LOCK.write() {
379✔
171
            Ok(value) => Ok(value),
379✔
UNCOV
172
            Err(err) => Err(StorageError::DatabaseMigrationLockError(format!(
×
UNCOV
173
                "Failed to acquire write lock for database migration: {}",
×
UNCOV
174
                err
×
UNCOV
175
            ))),
×
176
        }
177
    }
379✔
178

179
    /// Connect and migrate the database, once complete, then return a handle to the migrated database.
180
    pub fn connect_and_migrate(
379✔
181
        db_url: &DbConnectionUrl,
379✔
182
        migrations: EmbeddedMigrations,
379✔
183
        sqlite_pool_size: Option<usize>,
379✔
184
    ) -> Result<Self, StorageError> {
379✔
185
        let _lock = Self::acquire_migration_write_lock()?;
379✔
186
        let conn = Self::connect_url(db_url, sqlite_pool_size)?;
379✔
187
        let output = conn.migrate(migrations)?;
379✔
188
        debug!(target: LOG_TARGET, "Database migration: {}", output.trim());
379✔
189
        Ok(conn)
379✔
190
    }
379✔
191

192
    fn temp_db_dir() -> PathBuf {
381✔
193
        temp_dir().join("tari-temp")
381✔
194
    }
381✔
195

196
    /// Connect and migrate the database in a temporary location, then return a handle to the migrated database.
197
    pub fn connect_temp_file_and_migrate(migrations: EmbeddedMigrations) -> Result<Self, StorageError> {
184✔
198
        fn prefixed_string(prefix: &str, len: usize) -> String {
184✔
199
            let mut rng = thread_rng();
184✔
200
            let rand_str = iter::repeat(())
184✔
201
                .map(|_| rng.sample(Alphanumeric) as char)
3,680✔
202
                .take(len)
184✔
203
                .collect::<String>();
184✔
204
            format!("{}{}", prefix, rand_str)
184✔
205
        }
184✔
206

207
        let path = DbConnection::temp_db_dir().join(prefixed_string("data-", 20));
184✔
208
        fs::create_dir_all(&path)?;
184✔
209
        let db_url = DbConnectionUrl::File(path.join("my_temp.db"));
184✔
210
        DbConnection::connect_and_migrate(&db_url, migrations, Some(10))
184✔
211
    }
184✔
212

213
    fn new(pool: SqliteConnectionPool) -> Self {
743✔
214
        Self { pool }
743✔
215
    }
743✔
216

217
    /// Fetch a connection from the pool. This function synchronously blocks the current thread for up to 60 seconds or
218
    /// until a connection is available.
219
    pub fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, StorageError> {
3,693,590✔
220
        self.pool.get_pooled_connection().map_err(StorageError::DieselR2d2Error)
3,693,590✔
221
    }
3,693,590✔
222

223
    /// Run database migrations
224
    pub fn migrate(&self, migrations: EmbeddedMigrations) -> Result<String, StorageError> {
379✔
225
        let mut conn = self.get_pooled_connection()?;
379✔
226
        let result: Vec<String> = conn
379✔
227
            .run_pending_migrations(migrations)
379✔
228
            .map(|v| v.into_iter().map(|b| format!("Running migration {}", b)).collect())
1,111✔
229
            .map_err(|err| StorageError::DatabaseMigrationFailed(format!("Database migration failed {}", err)))?;
379✔
230

231
        Ok(result.join("\r\n"))
379✔
232
    }
379✔
233

234
    #[cfg(test)]
235
    pub(crate) fn db_path(&self) -> PathBuf {
1✔
236
        self.pool.db_path()
1✔
237
    }
1✔
238
}
239

240
impl Drop for DbConnection {
241
    fn drop(&mut self) {
920,244✔
242
        let path = self.pool.db_path();
920,244✔
243

920,244✔
244
        if path.exists() {
920,244✔
245
            if let Some(parent) = path.parent() {
197✔
246
                if parent.starts_with(DbConnection::temp_db_dir()) {
197✔
247
                    debug!(target: LOG_TARGET, "DbConnection - Dropping database: {}", path.display());
184✔
248
                    // Explicitly cleanup and drop the connection pool to ensure all connections are released
249
                    let pool_state = self.pool.cleanup();
184✔
250
                    debug!(target: LOG_TARGET, "DbConnection - Pool stats before cleanup: {:?}", pool_state);
184✔
251
                    debug!(target: LOG_TARGET, "DbConnection - Cleaning up tempdir: {}", parent.display());
184✔
252
                    if let Err(e) = fs::remove_dir_all(parent) {
184✔
UNCOV
253
                        error!(target: LOG_TARGET, "Failed to clean up temp dir: {}", e);
×
254
                    } else {
255
                        debug!(target: LOG_TARGET, "Temp dir cleaned up: {}", parent.display());
184✔
256
                    }
257
                }
13✔
UNCOV
258
            }
×
259
        }
920,047✔
260
    }
920,244✔
261
}
262

263
impl PooledDbConnection for DbConnection {
264
    type Error = SqliteStorageError;
265

266
    fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, Self::Error> {
29,348✔
267
        let conn = self.pool.get_pooled_connection()?;
29,348✔
268
        Ok(conn)
29,348✔
269
    }
29,348✔
270
}
271

272
#[cfg(test)]
273
mod test {
274
    use diesel::{dsl::sql, sql_types::Integer, RunQueryDsl};
275
    use diesel_migrations::embed_migrations;
276

277
    use super::*;
278

279
    #[tokio::test]
280
    async fn connect_and_migrate() {
1✔
281
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./test/migrations");
1✔
282

1✔
283
        let db_conn = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
1✔
284
        let path = db_conn.db_path();
1✔
285
        let mut pool_conn = db_conn.get_pooled_connection().unwrap();
1✔
286
        let count: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
1✔
287
            .get_result(&mut pool_conn)
1✔
288
            .unwrap();
1✔
289
        assert_eq!(count, 0);
1✔
290

1✔
291
        // Test temporary file cleanup
1✔
292
        assert!(path.exists());
1✔
293
        drop(pool_conn);
1✔
294
        drop(db_conn);
1✔
295
        assert!(!path.exists());
1✔
296
    }
1✔
297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc