• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17807091342

17 Sep 2025 06:28PM UTC coverage: 60.975% (+1.1%) from 59.9%
17807091342

push

github

web-flow
fix: always cancel transactions (#7500)

Description
---
Cancel tx and outputs. Don't assume they exist or not. Sending 1-sided
offline transactions doesn't have an existing transaction to cancel

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Bug Fixes**
* Improved reliability of canceling pending transactions: missing or
already-processed pending records and unlock failures are now logged as
warnings and do not abort cancellation.
* Reduces spurious errors during cancellation while preserving normal
cleanup, signals, and event emission for more stable wallet behavior.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

0 of 12 new or added lines in 1 file covered. (0.0%)

493 existing lines in 17 files now uncovered.

74571 of 122298 relevant lines covered (60.97%)

223264.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.29
/common_sqlite/src/connection.rs
1
// Copyright 2020. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::TryFrom,
25
    env::temp_dir,
26
    fs,
27
    iter,
28
    path::{Path, PathBuf},
29
    sync::{Arc, RwLock, RwLockWriteGuard},
30
};
31

32
use diesel::{
33
    r2d2::{ConnectionManager, PooledConnection},
34
    SqliteConnection,
35
};
36
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
37
use log::*;
38
use rand::{distributions::Alphanumeric, thread_rng, Rng};
39
use serde::{Deserialize, Serialize};
40

41
use crate::{
42
    connection_options::PRAGMA_BUSY_TIMEOUT,
43
    error::{SqliteStorageError, StorageError},
44
    sqlite_connection_pool::{PooledDbConnection, SqliteConnectionPool},
45
};
46

47
const LOG_TARGET: &str = "common_sqlite::connection";
48

49
/// Describes how to connect to the database (currently, SQLite).
50
#[derive(Clone, Debug, Serialize, Deserialize)]
×
51
#[serde(into = "String", try_from = "String")]
52
pub enum DbConnectionUrl {
53
    /// In-memory database. Each connection has it's own database
54
    Memory,
55
    /// In-memory database shared with more than one in-process connection according to the given identifier
56
    MemoryShared(String),
57
    /// Database persisted on disk
58
    File(PathBuf),
59
}
60

61
impl DbConnectionUrl {
62
    /// Use a file to store the database
63
    pub fn file<P: AsRef<Path>>(path: P) -> Self {
×
64
        DbConnectionUrl::File(path.as_ref().to_path_buf())
×
65
    }
×
66

67
    /// Returns a database connection string
68
    pub fn to_url_string(&self) -> String {
332✔
69
        use DbConnectionUrl::{File, Memory, MemoryShared};
70
        match self {
332✔
71
            Memory => ":memory:".to_owned(),
5✔
72
            MemoryShared(identifier) => format!("file:{identifier}?mode=memory&cache=shared"),
136✔
73
            File(path) => path
191✔
74
                .to_str()
191✔
75
                .expect("Invalid non-UTF8 character in database path")
191✔
76
                .to_owned(),
191✔
77
        }
78
    }
332✔
79

80
    /// Sets relative paths to use a common base path
81
    pub fn set_base_path<P: AsRef<Path>>(&mut self, base_path: P) {
×
82
        if let DbConnectionUrl::File(inner) = self {
×
83
            if !inner.is_absolute() {
×
84
                *inner = base_path.as_ref().join(inner.as_path());
×
85
            }
×
86
        }
×
87
    }
×
88
}
89

90
impl From<DbConnectionUrl> for String {
91
    fn from(source: DbConnectionUrl) -> Self {
×
92
        source.to_url_string()
×
93
    }
×
94
}
95

96
impl TryFrom<String> for DbConnectionUrl {
97
    type Error = String;
98

99
    fn try_from(value: String) -> Result<Self, Self::Error> {
×
100
        if value.as_str() == ":memory:" {
×
101
            Ok(Self::Memory)
×
102
        } else {
103
            Ok(Self::File(PathBuf::from(value)))
×
104
        }
105
    }
×
106
}
107

108
lazy_static::lazy_static! {
109
    static ref DB_WRITE_LOCK: Arc<RwLock<()>> = Arc::new(RwLock::new(()));
110
}
111

112
/// An SQLite database connection using the Diesel ORM with its r2d2 connection pool and SQLite WAL backend.
113
/// --------------------------------------------------------------------------------------------------------------------
114
/// Notes on SQLite’s Concurrency Limitations (causes of intermittent “Database is Locked” errors)
115
///
116
/// SQLite allows only one writer at a time, even in WAL mode. Under high concurrency (e.g. many threads doing writes),
117
/// collisions are inevitable – one transaction holds an exclusive write lock while others must wait. If a write lock
118
/// cannot be acquired within the busy_timeout, SQLite returns a SQLITE_BUSY (“database is locked”) error. In WAL mode,
119
/// readers don’t block writers and vice versa, but still only one writer can commit at any given moment. This
120
/// single-writer bottleneck means that bursts of simultaneous writes can lead to contention. If a transaction takes too
121
/// long (holding the lock), queued writers may time out (even with a 60s timeout). In short, heavy write concurrency
122
/// can exceed SQLite’s design limits, causing intermittent “database is locked” errors during high load.
123
///
124
/// “Busy Timeout” Not Always Honored – Deferred Write Pitfall: Even with WAL + a busy timeout, you can still get
125
/// immediate lock errors in certain cases. A known scenario involves deferred transactions upgrading to writes, often
126
/// called the “write-after-read” pattern. By default, BEGIN in SQLite is deferred – the transaction starts as read-only
127
/// if the first statement is a SELECT. If you later issue a write in that same transaction, SQLite will try to upgrade
128
/// it to a write transaction.
129
///
130
/// Mitigations and Best Practices for Write Concurrency with SQLite
131
/// - Use WAL Mode and Busy Timeout
132
/// - Start Write Transactions in IMMEDIATE Mode (`SqliteConnection::immediate_transaction(...)`)
133
/// - Keep Transactions Short and Optimize Write Duration
134
/// - Limit Write Concurrency & Pool Sizing
135
/// - Handle and Retry Busy Errors Gracefully
136
/// -
137
/// --------------------------------------------------------------------------------------------------------------------
138
#[derive(Clone)]
139
pub struct DbConnection {
140
    pool: SqliteConnectionPool,
141
}
142

143
impl DbConnection {
144
    /// Connect using the given [DbConnectionUrl](self::DbConnectionUrl), optionally using the given pool size to
145
    /// override the default setting of 1.
146
    /// Note: See https://github.com/launchbadge/sqlx/issues/362#issuecomment-636661146
147
    pub fn connect_url(db_url: &DbConnectionUrl, sqlite_pool_size: Option<usize>) -> Result<Self, StorageError> {
332✔
148
        debug!(target: LOG_TARGET, "Connecting to database using '{db_url:?}'");
332✔
149

150
        // Ensure the path exists
151
        if let DbConnectionUrl::File(ref path) = db_url {
332✔
152
            if let Some(parent) = path.parent() {
191✔
153
                std::fs::create_dir_all(parent)?;
191✔
154
            }
×
155
        }
141✔
156

157
        let mut pool = SqliteConnectionPool::new(
332✔
158
            db_url.to_url_string(),
332✔
159
            sqlite_pool_size.unwrap_or(1),
332✔
160
            true,
332✔
161
            true,
332✔
162
            PRAGMA_BUSY_TIMEOUT,
332✔
163
        );
332✔
164
        pool.create_pool()?;
332✔
165

166
        debug!(target: LOG_TARGET, "{}", pool);
332✔
167

168
        Ok(Self::new(pool))
332✔
169
    }
332✔
170

171
    fn acquire_migration_write_lock() -> Result<RwLockWriteGuard<'static, ()>, StorageError> {
226✔
172
        match DB_WRITE_LOCK.write() {
226✔
173
            Ok(value) => Ok(value),
226✔
174
            Err(err) => Err(StorageError::DatabaseMigrationLockError(format!(
×
UNCOV
175
                "Failed to acquire write lock for database migration: {err}"
×
UNCOV
176
            ))),
×
177
        }
178
    }
226✔
179

180
    /// Returns true **if** the migration write lock is currently held by *some* writer in this
181
    /// process. We detect this by attempting a non-blocking read; it fails while a write lock is
182
    /// held.
183
    #[inline]
184
    pub fn migration_lock_active() -> bool {
1,837✔
185
        DB_WRITE_LOCK.try_read().is_err()
1,837✔
186
    }
1,837✔
187

188
    /// Connect and migrate the database, once complete, then return a handle to the migrated database.
189
    pub fn connect_and_migrate(
226✔
190
        db_url: &DbConnectionUrl,
226✔
191
        migrations: EmbeddedMigrations,
226✔
192
        sqlite_pool_size: Option<usize>,
226✔
193
    ) -> Result<Self, StorageError> {
226✔
194
        let _lock = Self::acquire_migration_write_lock()?;
226✔
195
        let conn = Self::connect_url(db_url, sqlite_pool_size)?;
226✔
196
        let output = conn.migrate(migrations)?;
226✔
197
        debug!(target: LOG_TARGET, "Database migration: {}", output.trim());
226✔
198
        Ok(conn)
226✔
199
    }
226✔
200

201
    fn temp_db_dir() -> PathBuf {
378✔
202
        temp_dir().join("tari-temp")
378✔
203
    }
378✔
204

205
    /// Connect and migrate the database in a temporary location, then return a handle to the migrated database.
206
    pub fn connect_temp_file_and_migrate(migrations: EmbeddedMigrations) -> Result<Self, StorageError> {
189✔
207
        fn prefixed_string(prefix: &str, len: usize) -> String {
189✔
208
            let mut rng = thread_rng();
189✔
209
            let rand_str = iter::repeat(())
189✔
210
                .map(|_| rng.sample(Alphanumeric) as char)
3,780✔
211
                .take(len)
189✔
212
                .collect::<String>();
189✔
213
            format!("{prefix}{rand_str}")
189✔
214
        }
189✔
215

216
        let path = DbConnection::temp_db_dir().join(prefixed_string("data-", 20));
189✔
217
        fs::create_dir_all(&path)?;
189✔
218
        let db_url = DbConnectionUrl::File(path.join("my_temp.db"));
189✔
219
        DbConnection::connect_and_migrate(&db_url, migrations, Some(10))
189✔
220
    }
189✔
221

222
    fn new(pool: SqliteConnectionPool) -> Self {
332✔
223
        Self { pool }
332✔
224
    }
332✔
225

226
    /// Fetch a connection from the pool. This function synchronously blocks the current thread for up to 60 seconds or
227
    /// until a connection is available.
228
    pub fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, StorageError> {
12,701✔
229
        self.pool.get_pooled_connection().map_err(StorageError::DieselR2d2Error)
12,701✔
230
    }
12,701✔
231

232
    /// Run database migrations
233
    pub fn migrate(&self, migrations: EmbeddedMigrations) -> Result<String, StorageError> {
226✔
234
        let mut conn = self.get_pooled_connection()?;
226✔
235
        let result: Vec<String> = conn
226✔
236
            .run_pending_migrations(migrations)
226✔
237
            .map(|v| v.into_iter().map(|b| format!("Running migration {b}")).collect())
1,208✔
238
            .map_err(|err| StorageError::DatabaseMigrationFailed(format!("Database migration failed {err}")))?;
226✔
239

240
        Ok(result.join("\r\n"))
226✔
241
    }
226✔
242

243
    #[cfg(test)]
244
    pub(crate) fn db_path(&self) -> PathBuf {
1✔
245
        self.pool.db_path()
1✔
246
    }
1✔
247
}
248

249
impl Drop for DbConnection {
250
    fn drop(&mut self) {
2,528✔
251
        let path = self.pool.db_path();
2,528✔
252

2,528✔
253
        if path.exists() {
2,528✔
254
            if let Some(parent) = path.parent() {
189✔
255
                if parent.starts_with(DbConnection::temp_db_dir()) {
189✔
256
                    debug!(target: LOG_TARGET, "DbConnection - Dropping database: {}", path.display());
189✔
257
                    // Explicitly cleanup and drop the connection pool to ensure all connections are released
258
                    let pool_state = self.pool.cleanup();
189✔
259
                    debug!(target: LOG_TARGET, "DbConnection - Pool stats before cleanup: {pool_state:?}");
189✔
260
                    debug!(target: LOG_TARGET, "DbConnection - Cleaning up tempdir: {}", parent.display());
189✔
261
                    if let Err(e) = fs::remove_dir_all(parent) {
189✔
UNCOV
262
                        error!(target: LOG_TARGET, "Failed to clean up temp dir: {e}");
×
263
                    } else {
264
                        debug!(target: LOG_TARGET, "Temp dir cleaned up: {}", parent.display());
189✔
265
                    }
UNCOV
266
                }
×
UNCOV
267
            }
×
268
        }
2,339✔
269
    }
2,528✔
270
}
271

272
impl PooledDbConnection for DbConnection {
273
    type Error = SqliteStorageError;
274

275
    fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, Self::Error> {
5,033✔
276
        let conn = self.pool.get_pooled_connection()?;
5,033✔
277
        Ok(conn)
5,033✔
278
    }
5,033✔
279
}
280

281
#[cfg(test)]
282
mod test {
283
    use std::sync::Arc;
284

285
    use diesel::{
286
        connection::SimpleConnection,
287
        dsl::sql,
288
        sql_types::{Integer, Text},
289
        RunQueryDsl,
290
    };
291
    use diesel_migrations::embed_migrations;
292
    use tokio::{sync::Barrier, task::JoinSet};
293

294
    use super::*;
295

296
    #[tokio::test]
297
    async fn connect_and_migrate() {
1✔
298
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./test/migrations");
1✔
299

1✔
300
        let db_conn = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
1✔
301
        let path = db_conn.db_path();
1✔
302
        let mut pool_conn = db_conn.get_pooled_connection().unwrap();
1✔
303
        let count: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
1✔
304
            .get_result(&mut pool_conn)
1✔
305
            .unwrap();
1✔
306
        assert_eq!(count, 0);
1✔
307

1✔
308
        // Test temporary file cleanup
1✔
309
        assert!(path.exists());
1✔
310
        drop(pool_conn);
1✔
311
        drop(db_conn);
1✔
312
        assert!(!path.exists());
1✔
313
    }
1✔
314

315
    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
316
    async fn stress_connect_and_migrate_contention() {
1✔
317
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./test/migrations");
1✔
318
        let db = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
1✔
319

1✔
320
        // Force very frequent WAL checkpoints to increase write pressure.
1✔
321
        // The SQLite "PRAGMA wal_autocheckpoint = 1;" executes a SQLite PRAGMA that will checkpoint
1✔
322
        // the Write-Ahead Log (WAL) after every transaction. This increases the frequency of
1✔
323
        // checkpointing, which can help test write contention and durability in high-concurrency
1✔
324
        // scenarios.
1✔
325
        let mut c = db.get_pooled_connection().unwrap();
1✔
326
        sql::<Integer>("PRAGMA wal_autocheckpoint = 1;")
1✔
327
            .execute(&mut c)
1✔
328
            .unwrap();
1✔
329
        let mode: String = sql::<Text>("PRAGMA journal_mode;").get_result(&mut c).unwrap();
1✔
330
        assert!(mode.eq_ignore_ascii_case("wal"));
1✔
331

1✔
332
        let busy: String = sql::<Text>("PRAGMA busy_timeout;").get_result(&mut c).unwrap();
1✔
333
        assert!(busy.parse::<u128>().unwrap() >= PRAGMA_BUSY_TIMEOUT.as_millis());
1✔
334

1✔
335
        // We have 'sqlite_pool_size = Some(10))', so '160 writers + 320 readers' must queue.
1✔
336
        const WRITERS: usize = 160;
1✔
337
        const READERS: usize = 320;
1✔
338
        const HOLD_MS: u64 = 100;
1✔
339

1✔
340
        let barrier = Arc::new(Barrier::new(WRITERS + READERS));
1✔
341
        let mut tasks = JoinSet::new();
1✔
342

1✔
343
        // Writers
1✔
344
        for _ in 0..WRITERS {
161✔
345
            // Let each spawned async task gets its own reference to the same synchronization barrier.
160✔
346
            let synchronization_barrier = barrier.clone();
160✔
347
            let db2 = db.clone();
160✔
348
            tasks.spawn(async move {
160✔
349
                // The synchronization barrier allows all tasks to wait at the barrier and proceed together once all
160✔
350
                // have reached it, enabling coordinated concurrent execution for this test.
160✔
351
                synchronization_barrier.wait().await;
160✔
352
                // IMPORTANT: await the blocking job
1✔
353
                tokio::task::spawn_blocking(move || {
160✔
354
                    let mut conn = db2.get_pooled_connection().expect("writer checkout");
160✔
355
                    // Acquires an immediate exclusive lock on the database for this write
160✔
356
                    conn.batch_execute("BEGIN EXCLUSIVE;").unwrap();
160✔
357
                    sql::<Integer>("INSERT INTO test_table DEFAULT VALUES;")
160✔
358
                        .execute(&mut conn)
160✔
359
                        .unwrap();
160✔
360
                    std::thread::sleep(std::time::Duration::from_millis(HOLD_MS));
160✔
361
                    conn.batch_execute("COMMIT;").unwrap();
160✔
362
                })
160✔
363
                .await
160✔
364
                .expect("writer join");
160✔
365
            });
160✔
366
        }
160✔
367
        // Readers
1✔
368
        for _ in 0..READERS {
321✔
369
            let b = barrier.clone();
320✔
370
            let db2 = db.clone();
320✔
371
            tasks.spawn(async move {
320✔
372
                b.wait().await;
320✔
373
                tokio::task::spawn_blocking(move || {
320✔
374
                    let mut conn = db2.get_pooled_connection().expect("reader checkout");
320✔
375
                    for _ in 0..3 {
1,280✔
376
                        let _: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
960✔
377
                            .get_result(&mut conn)
960✔
378
                            .expect("reader select");
960✔
379
                        // Small pause between reads (async sleep outside blocking isn’t usable here)
960✔
380
                        std::thread::sleep(std::time::Duration::from_millis(10));
960✔
381
                    }
960✔
382
                })
320✔
383
                .await
320✔
384
                .expect("reader join");
320✔
385
            });
320✔
386
        }
320✔
387

1✔
388
        while let Some(res) = tasks.join_next().await {
481✔
389
            res.expect("task panicked");
480✔
390
        }
480✔
391

1✔
392
        // Verify row count
1✔
393
        let mut c = db.get_pooled_connection().unwrap();
1✔
394
        let count: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
1✔
395
            .get_result(&mut c)
1✔
396
            .unwrap();
1✔
397
        assert_eq!(count as usize, WRITERS);
1✔
398
    }
1✔
399
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc