• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19491336680

19 Nov 2025 05:51AM UTC coverage: 61.332% (+10.0%) from 51.294%
19491336680

push

github

web-flow
feat: make key manager stateless (#7550)

Description
---
Changes the key manager to make it stateless. 
This allows the key manager to only work from view and spend keys. 
Removes all async from the key manager.

Motivation and Context
---
Simplify the usage of the key manager


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
  * Hardware ledger wallet support and new wallet types.

* **Improvements**
* Key-management and transaction APIs now operate synchronously for more
predictable behavior.
* Memo construction, transaction signing, and error reporting refined
for more reliable submissions and clearer diagnostics.

* **Bug Fixes**
* Improved memo creation and wallet address error handling to reduce
failed transactions.

* **Refactor**
* Large internal modernization of key-management, wallet, and test
infrastructure (performance and maintainability).
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: stringhandler <stringhandler@protonmail.com>

2156 of 4401 new or added lines in 81 files covered. (48.99%)

271 existing lines in 19 files now uncovered.

70402 of 114788 relevant lines covered (61.33%)

228492.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.49
/common_sqlite/src/connection.rs
1
// Copyright 2020. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::TryFrom,
25
    env::temp_dir,
26
    fs,
27
    iter,
28
    path::{Path, PathBuf},
29
    sync::{Arc, RwLock, RwLockWriteGuard},
30
};
31

32
use diesel::{
33
    r2d2::{ConnectionManager, PooledConnection},
34
    SqliteConnection,
35
};
36
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
37
use log::*;
38
use rand::{distributions::Alphanumeric, thread_rng, Rng};
39
use serde::{Deserialize, Serialize};
40

41
use crate::{
42
    connection_options::PRAGMA_BUSY_TIMEOUT,
43
    error::{SqliteStorageError, StorageError},
44
    sqlite_connection_pool::{PooledDbConnection, SqliteConnectionPool},
45
};
46

47
const LOG_TARGET: &str = "common_sqlite::connection";
48

49
/// Describes how to connect to the database (currently, SQLite).
50
#[derive(Clone, Debug, Serialize, Deserialize)]
51
#[serde(into = "String", try_from = "String")]
52
pub enum DbConnectionUrl {
53
    /// In-memory database. Each connection has it's own database
54
    Memory,
55
    /// In-memory database shared with more than one in-process connection according to the given identifier
56
    MemoryShared(String),
57
    /// Database persisted on disk
58
    File(PathBuf),
59
}
60

61
impl DbConnectionUrl {
62
    /// Use a file to store the database
63
    pub fn file<P: AsRef<Path>>(path: P) -> Self {
31✔
64
        DbConnectionUrl::File(path.as_ref().to_path_buf())
31✔
65
    }
31✔
66

67
    /// Returns a database connection string
68
    pub fn to_url_string(&self) -> String {
323✔
69
        use DbConnectionUrl::{File, Memory, MemoryShared};
70
        match self {
323✔
71
            Memory => ":memory:".to_owned(),
52✔
72
            MemoryShared(identifier) => format!("file:{identifier}?mode=memory&cache=shared"),
30✔
73
            File(path) => path
241✔
74
                .to_str()
241✔
75
                .expect("Invalid non-UTF8 character in database path")
241✔
76
                .to_owned(),
241✔
77
        }
78
    }
323✔
79

80
    /// Sets relative paths to use a common base path
81
    pub fn set_base_path<P: AsRef<Path>>(&mut self, base_path: P) {
×
82
        if let DbConnectionUrl::File(inner) = self {
×
83
            if !inner.is_absolute() {
×
84
                *inner = base_path.as_ref().join(inner.as_path());
×
85
            }
×
86
        }
×
87
    }
×
88
}
89

90
impl From<DbConnectionUrl> for String {
91
    fn from(source: DbConnectionUrl) -> Self {
×
92
        source.to_url_string()
×
93
    }
×
94
}
95

96
impl TryFrom<String> for DbConnectionUrl {
97
    type Error = String;
98

99
    fn try_from(value: String) -> Result<Self, Self::Error> {
×
100
        if value.as_str() == ":memory:" {
×
101
            Ok(Self::Memory)
×
102
        } else {
103
            Ok(Self::File(PathBuf::from(value)))
×
104
        }
105
    }
×
106
}
107

108
lazy_static::lazy_static! {
109
    static ref DB_WRITE_LOCK: Arc<RwLock<()>> = Arc::new(RwLock::new(()));
110
}
111

112
/// An SQLite database connection using the Diesel ORM with its r2d2 connection pool and SQLite WAL backend.
113
/// --------------------------------------------------------------------------------------------------------------------
114
/// Notes on SQLite’s Concurrency Limitations (causes of intermittent “Database is Locked” errors)
115
///
116
/// SQLite allows only one writer at a time, even in WAL mode. Under high concurrency (e.g. many threads doing writes),
117
/// collisions are inevitable – one transaction holds an exclusive write lock while others must wait. If a write lock
118
/// cannot be acquired within the busy_timeout, SQLite returns a SQLITE_BUSY (“database is locked”) error. In WAL mode,
119
/// readers don’t block writers and vice versa, but still only one writer can commit at any given moment. This
120
/// single-writer bottleneck means that bursts of simultaneous writes can lead to contention. If a transaction takes too
121
/// long (holding the lock), queued writers may time out (even with a 60s timeout). In short, heavy write concurrency
122
/// can exceed SQLite’s design limits, causing intermittent “database is locked” errors during high load.
123
///
124
/// “Busy Timeout” Not Always Honored – Deferred Write Pitfall: Even with WAL + a busy timeout, you can still get
125
/// immediate lock errors in certain cases. A known scenario involves deferred transactions upgrading to writes, often
126
/// called the “write-after-read” pattern. By default, BEGIN in SQLite is deferred – the transaction starts as read-only
127
/// if the first statement is a SELECT. If you later issue a write in that same transaction, SQLite will try to upgrade
128
/// it to a write transaction.
129
///
130
/// Mitigations and Best Practices for Write Concurrency with SQLite
131
/// - Use WAL Mode and Busy Timeout
132
/// - Start Write Transactions in IMMEDIATE Mode (`SqliteConnection::immediate_transaction(...)`)
133
/// - Keep Transactions Short and Optimize Write Duration
134
/// - Limit Write Concurrency & Pool Sizing
135
/// - Handle and Retry Busy Errors Gracefully
136
/// -
137
/// --------------------------------------------------------------------------------------------------------------------
138
#[derive(Clone)]
139
pub struct DbConnection {
140
    pool: SqliteConnectionPool,
141
}
142

143
impl DbConnection {
144
    /// Connect using the given [DbConnectionUrl](self::DbConnectionUrl), optionally using the given pool size to
145
    /// override the default setting of 1.
146
    /// Note: See https://github.com/launchbadge/sqlx/issues/362#issuecomment-636661146
147
    pub fn connect_url(db_url: &DbConnectionUrl, sqlite_pool_size: Option<usize>) -> Result<Self, StorageError> {
323✔
148
        debug!(target: LOG_TARGET, "Connecting to database using '{db_url:?}'");
323✔
149

150
        // Ensure the path exists
151
        if let DbConnectionUrl::File(ref path) = db_url {
323✔
152
            if let Some(parent) = path.parent() {
241✔
153
                std::fs::create_dir_all(parent)?;
241✔
154
            }
×
155
        }
82✔
156

157
        let mut pool = SqliteConnectionPool::new(
323✔
158
            db_url.to_url_string(),
323✔
159
            sqlite_pool_size.unwrap_or(1),
323✔
160
            true,
161
            true,
162
            PRAGMA_BUSY_TIMEOUT,
163
        );
164
        pool.create_pool()?;
323✔
165

166
        debug!(target: LOG_TARGET, "{}", pool);
323✔
167

168
        Ok(Self::new(pool))
323✔
169
    }
323✔
170

171
    fn acquire_migration_write_lock() -> Result<RwLockWriteGuard<'static, ()>, StorageError> {
323✔
172
        match DB_WRITE_LOCK.write() {
323✔
173
            Ok(value) => Ok(value),
323✔
174
            Err(err) => Err(StorageError::DatabaseMigrationLockError(format!(
×
175
                "Failed to acquire write lock for database migration: {err}"
×
176
            ))),
×
177
        }
178
    }
323✔
179

180
    /// Returns true **if** the migration write lock is currently held by *some* writer in this
181
    /// process. We detect this by attempting a non-blocking read; it fails while a write lock is
182
    /// held.
183
    #[inline]
184
    pub fn migration_lock_active() -> bool {
2,104✔
185
        DB_WRITE_LOCK.try_read().is_err()
2,104✔
186
    }
2,104✔
187

188
    /// Connect and migrate the database, once complete, then return a handle to the migrated database.
189
    pub fn connect_and_migrate(
323✔
190
        db_url: &DbConnectionUrl,
323✔
191
        migrations: EmbeddedMigrations,
323✔
192
        sqlite_pool_size: Option<usize>,
323✔
193
    ) -> Result<Self, StorageError> {
323✔
194
        let _lock = Self::acquire_migration_write_lock()?;
323✔
195
        let conn = Self::connect_url(db_url, sqlite_pool_size)?;
323✔
196
        let output = conn.migrate(migrations)?;
323✔
197
        debug!(target: LOG_TARGET, "Database migration: {}", output.trim());
323✔
198
        Ok(conn)
323✔
199
    }
323✔
200

201
    fn temp_db_dir() -> PathBuf {
397✔
202
        temp_dir().join("tari-temp")
397✔
203
    }
397✔
204

205
    /// Connect and migrate the database in a temporary location, then return a handle to the migrated database.
206
    pub fn connect_temp_file_and_migrate(migrations: EmbeddedMigrations) -> Result<Self, StorageError> {
192✔
207
        fn prefixed_string(prefix: &str, len: usize) -> String {
192✔
208
            let mut rng = thread_rng();
192✔
209
            let rand_str = iter::repeat(())
192✔
210
                .map(|_| rng.sample(Alphanumeric) as char)
3,840✔
211
                .take(len)
192✔
212
                .collect::<String>();
192✔
213
            format!("{prefix}{rand_str}")
192✔
214
        }
192✔
215

216
        let path = DbConnection::temp_db_dir().join(prefixed_string("data-", 20));
192✔
217
        fs::create_dir_all(&path)?;
192✔
218
        let db_url = DbConnectionUrl::File(path.join("my_temp.db"));
192✔
219
        DbConnection::connect_and_migrate(&db_url, migrations, Some(10))
192✔
220
    }
192✔
221

222
    fn new(pool: SqliteConnectionPool) -> Self {
323✔
223
        Self { pool }
323✔
224
    }
323✔
225

226
    /// Fetch a connection from the pool. This function synchronously blocks the current thread for up to 60 seconds or
227
    /// until a connection is available.
228
    pub fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, StorageError> {
336,430✔
229
        self.pool.get_pooled_connection().map_err(StorageError::DieselR2d2Error)
336,430✔
230
    }
336,430✔
231

232
    /// Run database migrations
233
    pub fn migrate(&self, migrations: EmbeddedMigrations) -> Result<String, StorageError> {
323✔
234
        let mut conn = self.get_pooled_connection()?;
323✔
235
        let result: Vec<String> = conn
323✔
236
            .run_pending_migrations(migrations)
323✔
237
            .map(|v| v.into_iter().map(|b| format!("Running migration {b}")).collect())
1,787✔
238
            .map_err(|err| StorageError::DatabaseMigrationFailed(format!("Database migration failed {err}")))?;
323✔
239

240
        Ok(result.join("\r\n"))
323✔
241
    }
323✔
242

243
    #[cfg(test)]
244
    pub(crate) fn db_path(&self) -> PathBuf {
1✔
245
        self.pool.db_path()
1✔
246
    }
1✔
247
}
248

249
impl Drop for DbConnection {
250
    fn drop(&mut self) {
81,355✔
251
        let path = self.pool.db_path();
81,355✔
252

253
        if path.exists() {
81,355✔
254
            if let Some(parent) = path.parent() {
205✔
255
                if parent.starts_with(DbConnection::temp_db_dir()) {
205✔
256
                    debug!(target: LOG_TARGET, "DbConnection - Dropping database: {}", path.display());
192✔
257
                    // Explicitly cleanup and drop the connection pool to ensure all connections are released
258
                    let pool_state = self.pool.cleanup();
192✔
259
                    debug!(target: LOG_TARGET, "DbConnection - Pool stats before cleanup: {pool_state:?}");
192✔
260
                    debug!(target: LOG_TARGET, "DbConnection - Cleaning up tempdir: {}", parent.display());
192✔
261
                    if let Err(e) = fs::remove_dir_all(parent) {
192✔
262
                        error!(target: LOG_TARGET, "Failed to clean up temp dir: {e}");
×
263
                    } else {
264
                        debug!(target: LOG_TARGET, "Temp dir cleaned up: {}", parent.display());
192✔
265
                    }
266
                }
13✔
267
            }
×
268
        }
81,150✔
269
    }
81,355✔
270
}
271

272
impl PooledDbConnection for DbConnection {
273
    type Error = SqliteStorageError;
274

UNCOV
275
    fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, Self::Error> {
×
UNCOV
276
        let conn = self.pool.get_pooled_connection()?;
×
UNCOV
277
        Ok(conn)
×
UNCOV
278
    }
×
279
}
280

281
#[cfg(test)]
282
mod test {
283
    use std::sync::Arc;
284

285
    use diesel::{
286
        connection::SimpleConnection,
287
        dsl::sql,
288
        sql_types::{Integer, Text},
289
        RunQueryDsl,
290
    };
291
    use diesel_migrations::embed_migrations;
292
    use tokio::{sync::Barrier, task::JoinSet};
293

294
    use super::*;
295

296
    #[tokio::test]
297
    async fn connect_and_migrate() {
1✔
298
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./test/migrations");
299

300
        let db_conn = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
1✔
301
        let path = db_conn.db_path();
1✔
302
        let mut pool_conn = db_conn.get_pooled_connection().unwrap();
1✔
303
        let count: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
1✔
304
            .get_result(&mut pool_conn)
1✔
305
            .unwrap();
1✔
306
        assert_eq!(count, 0);
1✔
307

308
        // Test temporary file cleanup
309
        assert!(path.exists());
1✔
310
        drop(pool_conn);
1✔
311
        drop(db_conn);
1✔
312
        assert!(!path.exists());
1✔
313
    }
1✔
314

315
    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
316
    async fn stress_connect_and_migrate_contention() {
1✔
317
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./test/migrations");
318
        let db = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
1✔
319

320
        // Force very frequent WAL checkpoints to increase write pressure.
321
        // The SQLite "PRAGMA wal_autocheckpoint = 1;" executes a SQLite PRAGMA that will checkpoint
322
        // the Write-Ahead Log (WAL) after every transaction. This increases the frequency of
323
        // checkpointing, which can help test write contention and durability in high-concurrency
324
        // scenarios.
325
        let mut c = db.get_pooled_connection().unwrap();
1✔
326
        sql::<Integer>("PRAGMA wal_autocheckpoint = 1;")
1✔
327
            .execute(&mut c)
1✔
328
            .unwrap();
1✔
329
        let mode: String = sql::<Text>("PRAGMA journal_mode;").get_result(&mut c).unwrap();
1✔
330
        assert!(mode.eq_ignore_ascii_case("wal"));
1✔
331

332
        let busy: String = sql::<Text>("PRAGMA busy_timeout;").get_result(&mut c).unwrap();
1✔
333
        assert!(busy.parse::<u128>().unwrap() >= PRAGMA_BUSY_TIMEOUT.as_millis());
1✔
334

335
        // We have 'sqlite_pool_size = Some(10))', so '160 writers + 320 readers' must queue.
336
        const WRITERS: usize = 160;
337
        const READERS: usize = 320;
338
        const HOLD_MS: u64 = 100;
339

340
        let barrier = Arc::new(Barrier::new(WRITERS + READERS));
1✔
341
        let mut tasks = JoinSet::new();
1✔
342

343
        // Writers
344
        for _ in 0..WRITERS {
161✔
345
            // Let each spawned async task gets its own reference to the same synchronization barrier.
346
            let synchronization_barrier = barrier.clone();
160✔
347
            let db2 = db.clone();
160✔
348
            tasks.spawn(async move {
160✔
349
                // The synchronization barrier allows all tasks to wait at the barrier and proceed together once all
350
                // have reached it, enabling coordinated concurrent execution for this test.
351
                synchronization_barrier.wait().await;
160✔
352
                // IMPORTANT: await the blocking job
353
                tokio::task::spawn_blocking(move || {
160✔
354
                    let mut conn = db2.get_pooled_connection().expect("writer checkout");
160✔
355
                    // Acquires an immediate exclusive lock on the database for this write
356
                    conn.batch_execute("BEGIN EXCLUSIVE;").unwrap();
160✔
357
                    sql::<Integer>("INSERT INTO test_table DEFAULT VALUES;")
160✔
358
                        .execute(&mut conn)
160✔
359
                        .unwrap();
160✔
360
                    std::thread::sleep(std::time::Duration::from_millis(HOLD_MS));
160✔
361
                    conn.batch_execute("COMMIT;").unwrap();
160✔
362
                })
160✔
363
                .await
160✔
364
                .expect("writer join");
160✔
365
            });
160✔
366
        }
367
        // Readers
368
        for _ in 0..READERS {
321✔
369
            let b = barrier.clone();
320✔
370
            let db2 = db.clone();
320✔
371
            tasks.spawn(async move {
320✔
372
                b.wait().await;
320✔
373
                tokio::task::spawn_blocking(move || {
320✔
374
                    let mut conn = db2.get_pooled_connection().expect("reader checkout");
320✔
375
                    for _ in 0..3 {
1,280✔
376
                        let _: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
960✔
377
                            .get_result(&mut conn)
960✔
378
                            .expect("reader select");
960✔
379
                        // Small pause between reads (async sleep outside blocking isn’t usable here)
960✔
380
                        std::thread::sleep(std::time::Duration::from_millis(10));
960✔
381
                    }
960✔
382
                })
320✔
383
                .await
320✔
384
                .expect("reader join");
320✔
385
            });
320✔
386
        }
387

388
        while let Some(res) = tasks.join_next().await {
481✔
389
            res.expect("task panicked");
480✔
390
        }
480✔
391

392
        // Verify row count
393
        let mut c = db.get_pooled_connection().unwrap();
1✔
394
        let count: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
1✔
395
            .get_result(&mut c)
1✔
396
            .unwrap();
1✔
397
        assert_eq!(count as usize, WRITERS);
1✔
398
    }
1✔
399
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc