• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 26581360379

28 May 2026 02:34PM UTC coverage: 61.346% (-0.01%) from 61.356%
26581360379

push

github

web-flow
feat(wallet): stealth-address claim key for L1->L2 burns (#7861)

## Summary

Wallet-level implementation of the L1 half of
[tari-project/tari-ootle#1890](https://github.com/tari-project/tari-ootle/issues/1890)
— switches L1→L2 burn claims from "on-chain claim key = recipient's L2
account pubkey" to a stealth address scheme.

No L1 or L2 consensus changes; only L1 wallet behaviour. Matching L2
verifier/wallet changes will land separately in `tari-ootle`.

### What changes

- The on-chain `ConfidentialOutputData.claim_public_key` now carries the
stealth address `C = H(r·P)·G + P` instead of the recipient's pubkey
`P`. `C` looks like a random nonce per burn and is unlinkable to `P`
across burns; for the L2 wallets that want it, the on-chain value is
also a discoverability signal (`C` derivable from the on-chain `R` + the
account's `p`).
- The mask-key ownership signature commits to `H(commitment ‖ C)`. A
third party holding the proof material cannot construct an L2 claim —
only the L2 wallet holding `p` can derive the spend secret `s = H(R·p) +
p` against `C`.
- `SideChainFeature` is still emitted for L2-bound burns;
`sidechain_deployment_key` (when provided) still produces the
`SideChainId` Schnorr sig identifying the target L2 network. The
original "deployment key without claim public key" sanity check is
preserved.
- `sender_offset` key for L2-bound burns is seed-deterministic (derived
from the commitment mask). Plain burns keep `get_random_key`.
- `encrypted_data` for L2-bound burns is encrypted to `DH(P, r)`
(unchanged) so the existing L2 decryption with `DH(R, p)` keeps working.
Plain burns encrypt to the L1 view key.
- `PartialBurnClaimProof` wire shape is unchanged: `claim_public_key`
still carries `P`, `sender_offset_public_key` carries `R`. `C` is not on
the wire — both L1 and L2 recompute it from `(R, P, p)` (the L2 already
has `owner_stealth_dh_stealth_address` for this in
`dan/crates/wallet/crypto/src/kdfs.rs`).
- New key-manager helpers: `... (continued)

0 of 84 new or added lines in 4 files covered. (0.0%)

5 existing lines in 5 files now uncovered.

71853 of 117127 relevant lines covered (61.35%)

223235.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.36
/common_sqlite/src/connection.rs
1
// Copyright 2020. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::TryFrom,
25
    env::temp_dir,
26
    fs,
27
    iter,
28
    path::{Path, PathBuf},
29
    sync::{Arc, RwLock, RwLockWriteGuard},
30
};
31

32
use diesel::{
33
    SqliteConnection,
34
    r2d2::{ConnectionManager, PooledConnection},
35
};
36
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
37
use log::*;
38
use rand::{RngExt, distr::Alphanumeric};
39
use serde::{Deserialize, Serialize};
40

41
use crate::{
42
    connection_options::PRAGMA_BUSY_TIMEOUT,
43
    error::{SqliteStorageError, StorageError},
44
    sqlite_connection_pool::{PooledDbConnection, SqliteConnectionPool},
45
};
46

47
const LOG_TARGET: &str = "common_sqlite::connection";
48

49
/// Describes how to connect to the database (currently, SQLite).
50
#[derive(Clone, Debug, Serialize, Deserialize)]
51
#[serde(into = "String", try_from = "String")]
52
pub enum DbConnectionUrl {
53
    /// In-memory database. Each connection has it's own database
54
    Memory,
55
    /// In-memory database shared with more than one in-process connection according to the given identifier
56
    MemoryShared(String),
57
    /// Database persisted on disk
58
    File(PathBuf),
59
}
60

61
impl DbConnectionUrl {
62
    /// Use a file to store the database
63
    pub fn file<P: AsRef<Path>>(path: P) -> Self {
36✔
64
        DbConnectionUrl::File(path.as_ref().to_path_buf())
36✔
65
    }
36✔
66

67
    /// Returns a database connection string
68
    pub fn to_url_string(&self) -> String {
336✔
69
        use DbConnectionUrl::{File, Memory, MemoryShared};
70
        match self {
336✔
71
            Memory => ":memory:".to_owned(),
64✔
72
            MemoryShared(identifier) => format!("file:{identifier}?mode=memory&cache=shared"),
30✔
73
            File(path) => path
242✔
74
                .to_str()
242✔
75
                .expect("Invalid non-UTF8 character in database path")
242✔
76
                .to_owned(),
242✔
77
        }
78
    }
336✔
79

80
    /// Sets relative paths to use a common base path
81
    pub fn set_base_path<P: AsRef<Path>>(&mut self, base_path: P) {
×
82
        if let DbConnectionUrl::File(inner) = self &&
×
83
            !inner.is_absolute()
×
84
        {
×
85
            *inner = base_path.as_ref().join(inner.as_path());
×
86
        }
×
87
    }
×
88
}
89

90
impl From<DbConnectionUrl> for String {
91
    fn from(source: DbConnectionUrl) -> Self {
×
92
        source.to_url_string()
×
93
    }
×
94
}
95

96
impl TryFrom<String> for DbConnectionUrl {
97
    type Error = String;
98

99
    fn try_from(value: String) -> Result<Self, Self::Error> {
×
100
        if value.as_str() == ":memory:" {
×
101
            Ok(Self::Memory)
×
102
        } else {
103
            Ok(Self::File(PathBuf::from(value)))
×
104
        }
105
    }
×
106
}
107

108
lazy_static::lazy_static! {
109
    static ref DB_WRITE_LOCK: Arc<RwLock<()>> = Arc::new(RwLock::new(()));
110
}
111

112
/// An SQLite database connection using the Diesel ORM with its r2d2 connection pool and SQLite WAL backend.
113
/// --------------------------------------------------------------------------------------------------------------------
114
/// Notes on SQLite’s Concurrency Limitations (causes of intermittent “Database is Locked” errors)
115
///
116
/// SQLite allows only one writer at a time, even in WAL mode. Under high concurrency (e.g. many threads doing writes),
117
/// collisions are inevitable – one transaction holds an exclusive write lock while others must wait. If a write lock
118
/// cannot be acquired within the busy_timeout, SQLite returns a SQLITE_BUSY (“database is locked”) error. In WAL mode,
119
/// readers don’t block writers and vice versa, but still only one writer can commit at any given moment. This
120
/// single-writer bottleneck means that bursts of simultaneous writes can lead to contention. If a transaction takes too
121
/// long (holding the lock), queued writers may time out (even with a 60s timeout). In short, heavy write concurrency
122
/// can exceed SQLite’s design limits, causing intermittent “database is locked” errors during high load.
123
///
124
/// “Busy Timeout” Not Always Honored – Deferred Write Pitfall: Even with WAL + a busy timeout, you can still get
125
/// immediate lock errors in certain cases. A known scenario involves deferred transactions upgrading to writes, often
126
/// called the “write-after-read” pattern. By default, BEGIN in SQLite is deferred – the transaction starts as read-only
127
/// if the first statement is a SELECT. If you later issue a write in that same transaction, SQLite will try to upgrade
128
/// it to a write transaction.
129
///
130
/// Mitigations and Best Practices for Write Concurrency with SQLite
131
/// - Use WAL Mode and Busy Timeout
132
/// - Start Write Transactions in IMMEDIATE Mode (`SqliteConnection::immediate_transaction(...)`)
133
/// - Keep Transactions Short and Optimize Write Duration
134
/// - Limit Write Concurrency & Pool Sizing
135
/// - Handle and Retry Busy Errors Gracefully
136
/// -
137
/// --------------------------------------------------------------------------------------------------------------------
138
#[derive(Clone)]
139
pub struct DbConnection {
140
    pool: SqliteConnectionPool,
141
}
142

143
impl DbConnection {
144
    /// Connect using the given [DbConnectionUrl](self::DbConnectionUrl), optionally using the given pool size to
145
    /// override the default setting of 1.
146
    /// Note: See https://github.com/launchbadge/sqlx/issues/362#issuecomment-636661146
147
    pub fn connect_url(db_url: &DbConnectionUrl, sqlite_pool_size: Option<usize>) -> Result<Self, StorageError> {
336✔
148
        debug!(target: LOG_TARGET, "Connecting to database using '{db_url:?}'");
336✔
149

150
        // Ensure the path exists
151
        if let DbConnectionUrl::File(path) = db_url &&
336✔
152
            let Some(parent) = path.parent()
242✔
153
        {
154
            std::fs::create_dir_all(parent)?;
242✔
155
        }
94✔
156

157
        let mut pool = SqliteConnectionPool::new(
336✔
158
            db_url.to_url_string(),
336✔
159
            sqlite_pool_size.unwrap_or(1),
336✔
160
            true,
161
            true,
162
            PRAGMA_BUSY_TIMEOUT,
163
        );
164
        pool.create_pool()?;
336✔
165

166
        debug!(target: LOG_TARGET, "{}", pool);
336✔
167

168
        Ok(Self::new(pool))
336✔
169
    }
336✔
170

171
    fn acquire_migration_write_lock() -> Result<RwLockWriteGuard<'static, ()>, StorageError> {
336✔
172
        match DB_WRITE_LOCK.write() {
336✔
173
            Ok(value) => Ok(value),
336✔
174
            Err(err) => Err(StorageError::DatabaseMigrationLockError(format!(
×
175
                "Failed to acquire write lock for database migration: {err}"
×
176
            ))),
×
177
        }
178
    }
336✔
179

180
    /// Returns true **if** the migration write lock is currently held by *some* writer in this
181
    /// process. We detect this by attempting a non-blocking read; it fails while a write lock is
182
    /// held.
183
    #[inline]
184
    pub fn migration_lock_active() -> bool {
2,232✔
185
        DB_WRITE_LOCK.try_read().is_err()
2,232✔
186
    }
2,232✔
187

188
    /// Connect and migrate the database, once complete, then return a handle to the migrated database.
189
    pub fn connect_and_migrate(
336✔
190
        db_url: &DbConnectionUrl,
336✔
191
        migrations: EmbeddedMigrations,
336✔
192
        sqlite_pool_size: Option<usize>,
336✔
193
    ) -> Result<Self, StorageError> {
336✔
194
        let _lock = Self::acquire_migration_write_lock()?;
336✔
195
        let conn = Self::connect_url(db_url, sqlite_pool_size)?;
336✔
196
        let output = conn.migrate(migrations)?;
336✔
197
        debug!(target: LOG_TARGET, "Database migration: {}", output.trim());
336✔
198
        Ok(conn)
336✔
199
    }
336✔
200

201
    fn temp_db_dir() -> PathBuf {
407✔
202
        temp_dir().join("tari-temp")
407✔
203
    }
407✔
204

205
    /// Connect and migrate the database in a temporary location, then return a handle to the migrated database.
206
    pub fn connect_temp_file_and_migrate(migrations: EmbeddedMigrations) -> Result<Self, StorageError> {
181✔
207
        fn prefixed_string(prefix: &str, len: usize) -> String {
181✔
208
            let mut rng = rand::rng();
181✔
209
            let rand_str = iter::repeat(())
181✔
210
                .map(|_| rng.sample(Alphanumeric) as char)
3,620✔
211
                .take(len)
181✔
212
                .collect::<String>();
181✔
213
            format!("{prefix}{rand_str}")
181✔
214
        }
181✔
215

216
        let path = DbConnection::temp_db_dir().join(prefixed_string("data-", 20));
181✔
217
        fs::create_dir_all(&path)?;
181✔
218
        let db_url = DbConnectionUrl::File(path.join("my_temp.db"));
181✔
219
        DbConnection::connect_and_migrate(&db_url, migrations, Some(10))
181✔
220
    }
181✔
221

222
    fn new(pool: SqliteConnectionPool) -> Self {
336✔
223
        Self { pool }
336✔
224
    }
336✔
225

226
    /// Fetch a connection from the pool. This function synchronously blocks the current thread for up to 60 seconds or
227
    /// until a connection is available.
228
    pub fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, StorageError> {
243,797✔
229
        self.pool.get_pooled_connection().map_err(StorageError::DieselR2d2Error)
243,797✔
230
    }
243,797✔
231

232
    /// Run database migrations
233
    pub fn migrate(&self, migrations: EmbeddedMigrations) -> Result<String, StorageError> {
336✔
234
        let mut conn = self.get_pooled_connection()?;
336✔
235
        let result: Vec<String> = conn
336✔
236
            .run_pending_migrations(migrations)
336✔
237
            .map(|v| v.into_iter().map(|b| format!("Running migration {b}")).collect())
2,108✔
238
            .map_err(|err| StorageError::DatabaseMigrationFailed(format!("Database migration failed {err}")))?;
336✔
239

240
        Ok(result.join("\r\n"))
336✔
241
    }
336✔
242

243
    #[cfg(test)]
244
    pub(crate) fn db_path(&self) -> PathBuf {
1✔
245
        self.pool.db_path()
1✔
246
    }
1✔
247
}
248

249
impl Drop for DbConnection {
250
    fn drop(&mut self) {
60,064✔
251
        let path = self.pool.db_path();
60,064✔
252

253
        if path.exists() &&
60,064✔
254
            let Some(parent) = path.parent() &&
226✔
255
            parent.starts_with(DbConnection::temp_db_dir())
226✔
256
        {
257
            debug!(target: LOG_TARGET, "DbConnection - Dropping database: {}", path.display());
181✔
258
            // Explicitly cleanup and drop the connection pool to ensure all connections are released
259
            let pool_state = self.pool.cleanup();
181✔
260
            debug!(target: LOG_TARGET, "DbConnection - Pool stats before cleanup: {pool_state:?}");
181✔
261
            debug!(target: LOG_TARGET, "DbConnection - Cleaning up tempdir: {}", parent.display());
181✔
262
            if let Err(e) = fs::remove_dir_all(parent) {
181✔
UNCOV
263
                error!(target: LOG_TARGET, "Failed to clean up temp dir: {e}");
×
264
            } else {
265
                debug!(target: LOG_TARGET, "Temp dir cleaned up: {}", parent.display());
181✔
266
            }
267
        }
59,883✔
268
    }
60,064✔
269
}
270

271
impl PooledDbConnection for DbConnection {
272
    type Error = SqliteStorageError;
273

274
    fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, Self::Error> {
×
275
        let conn = self.pool.get_pooled_connection()?;
×
276
        Ok(conn)
×
277
    }
×
278
}
279

280
#[cfg(test)]
281
mod test {
282
    use std::sync::Arc;
283

284
    use diesel::{
285
        RunQueryDsl,
286
        connection::SimpleConnection,
287
        dsl::sql,
288
        sql_types::{Integer, Text},
289
    };
290
    use diesel_migrations::embed_migrations;
291
    use tokio::{sync::Barrier, task::JoinSet};
292

293
    use super::*;
294

295
    #[tokio::test]
296
    async fn connect_and_migrate() {
1✔
297
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./test/migrations");
298

299
        let db_conn = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
1✔
300
        let path = db_conn.db_path();
1✔
301
        let mut pool_conn = db_conn.get_pooled_connection().unwrap();
1✔
302
        let count: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
1✔
303
            .get_result(&mut pool_conn)
1✔
304
            .unwrap();
1✔
305
        assert_eq!(count, 0);
1✔
306

307
        // Test temporary file cleanup
308
        assert!(path.exists());
1✔
309
        drop(pool_conn);
1✔
310
        drop(db_conn);
1✔
311
        assert!(!path.exists());
1✔
312
    }
1✔
313

314
    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
315
    async fn stress_connect_and_migrate_contention() {
1✔
316
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./test/migrations");
317
        let db = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
1✔
318

319
        // Force very frequent WAL checkpoints to increase write pressure.
320
        // The SQLite "PRAGMA wal_autocheckpoint = 1;" executes a SQLite PRAGMA that will checkpoint
321
        // the Write-Ahead Log (WAL) after every transaction. This increases the frequency of
322
        // checkpointing, which can help test write contention and durability in high-concurrency
323
        // scenarios.
324
        let mut c = db.get_pooled_connection().unwrap();
1✔
325
        sql::<Integer>("PRAGMA wal_autocheckpoint = 1;")
1✔
326
            .execute(&mut c)
1✔
327
            .unwrap();
1✔
328
        let mode: String = sql::<Text>("PRAGMA journal_mode;").get_result(&mut c).unwrap();
1✔
329
        assert!(mode.eq_ignore_ascii_case("wal"));
1✔
330

331
        let busy: String = sql::<Text>("PRAGMA busy_timeout;").get_result(&mut c).unwrap();
1✔
332
        assert!(busy.parse::<u128>().unwrap() >= PRAGMA_BUSY_TIMEOUT.as_millis());
1✔
333

334
        // We have 'sqlite_pool_size = Some(10))', so '160 writers + 320 readers' must queue.
335
        const WRITERS: usize = 160;
336
        const READERS: usize = 320;
337
        const HOLD_MS: u64 = 100;
338

339
        let barrier = Arc::new(Barrier::new(WRITERS + READERS));
1✔
340
        let mut tasks = JoinSet::new();
1✔
341

342
        // Writers
343
        for _ in 0..WRITERS {
1✔
344
            // Let each spawned async task gets its own reference to the same synchronization barrier.
345
            let synchronization_barrier = barrier.clone();
160✔
346
            let db2 = db.clone();
160✔
347
            tasks.spawn(async move {
160✔
348
                // The synchronization barrier allows all tasks to wait at the barrier and proceed together once all
349
                // have reached it, enabling coordinated concurrent execution for this test.
350
                synchronization_barrier.wait().await;
160✔
351
                // IMPORTANT: await the blocking job
352
                tokio::task::spawn_blocking(move || {
160✔
353
                    let mut conn = db2.get_pooled_connection().expect("writer checkout");
160✔
354
                    // Acquires an immediate exclusive lock on the database for this write
355
                    conn.batch_execute("BEGIN EXCLUSIVE;").unwrap();
160✔
356
                    sql::<Integer>("INSERT INTO test_table DEFAULT VALUES;")
160✔
357
                        .execute(&mut conn)
160✔
358
                        .unwrap();
160✔
359
                    std::thread::sleep(std::time::Duration::from_millis(HOLD_MS));
160✔
360
                    conn.batch_execute("COMMIT;").unwrap();
160✔
361
                })
160✔
362
                .await
160✔
363
                .expect("writer join");
160✔
364
            });
160✔
365
        }
366
        // Readers
367
        for _ in 0..READERS {
1✔
368
            let b = barrier.clone();
320✔
369
            let db2 = db.clone();
320✔
370
            tasks.spawn(async move {
320✔
371
                b.wait().await;
320✔
372
                tokio::task::spawn_blocking(move || {
320✔
373
                    let mut conn = db2.get_pooled_connection().expect("reader checkout");
320✔
374
                    for _ in 0..3 {
960✔
375
                        let _: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
960✔
376
                            .get_result(&mut conn)
960✔
377
                            .expect("reader select");
960✔
378
                        // Small pause between reads (async sleep outside blocking isn’t usable here)
960✔
379
                        std::thread::sleep(std::time::Duration::from_millis(10));
960✔
380
                    }
960✔
381
                })
320✔
382
                .await
320✔
383
                .expect("reader join");
320✔
384
            });
320✔
385
        }
386

387
        while let Some(res) = tasks.join_next().await {
481✔
388
            res.expect("task panicked");
480✔
389
        }
480✔
390

391
        // Verify row count
392
        let mut c = db.get_pooled_connection().unwrap();
1✔
393
        let count: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
1✔
394
            .get_result(&mut c)
1✔
395
            .unwrap();
1✔
396
        assert_eq!(count as usize, WRITERS);
1✔
397
    }
1✔
398
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc