• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16990089413

15 Aug 2025 12:36PM UTC coverage: 54.497% (+0.06%) from 54.441%
16990089413

push

github

web-flow
chore: cleanup indexes (#7411)

Description
---
Forces clean indexs

970 of 2919 new or added lines in 369 files covered. (33.23%)

60 existing lines in 33 files now uncovered.

76698 of 140739 relevant lines covered (54.5%)

193749.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.74
/common_sqlite/src/connection.rs
1
// Copyright 2020. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::TryFrom,
25
    env::temp_dir,
26
    fs,
27
    iter,
28
    path::{Path, PathBuf},
29
    sync::{Arc, RwLock, RwLockWriteGuard},
30
    time::Duration,
31
};
32

33
use diesel::{
34
    r2d2::{ConnectionManager, PooledConnection},
35
    SqliteConnection,
36
};
37
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
38
use log::*;
39
use rand::{distributions::Alphanumeric, thread_rng, Rng};
40
use serde::{Deserialize, Serialize};
41

42
use crate::{
43
    error::{SqliteStorageError, StorageError},
44
    sqlite_connection_pool::{PooledDbConnection, SqliteConnectionPool},
45
};
46

47
const LOG_TARGET: &str = "common_sqlite::connection";
48

49
/// Describes how to connect to the database (currently, SQLite).
50
#[derive(Clone, Debug, Serialize, Deserialize)]
×
51
#[serde(into = "String", try_from = "String")]
52
pub enum DbConnectionUrl {
53
    /// In-memory database. Each connection has it's own database
54
    Memory,
55
    /// In-memory database shared with more than one in-process connection according to the given identifier
56
    MemoryShared(String),
57
    /// Database persisted on disk
58
    File(PathBuf),
59
}
60

61
impl DbConnectionUrl {
62
    /// Use a file to store the database
63
    pub fn file<P: AsRef<Path>>(path: P) -> Self {
×
64
        DbConnectionUrl::File(path.as_ref().to_path_buf())
×
65
    }
×
66

67
    /// Returns a database connection string
68
    pub fn to_url_string(&self) -> String {
408✔
69
        use DbConnectionUrl::{File, Memory, MemoryShared};
70
        match self {
408✔
71
            Memory => ":memory:".to_owned(),
5✔
72
            MemoryShared(identifier) => format!("file:{identifier}?mode=memory&cache=shared"),
215✔
73
            File(path) => path
188✔
74
                .to_str()
188✔
75
                .expect("Invalid non-UTF8 character in database path")
188✔
76
                .to_owned(),
188✔
77
        }
78
    }
408✔
79

80
    /// Sets relative paths to use a common base path
81
    pub fn set_base_path<P: AsRef<Path>>(&mut self, base_path: P) {
×
82
        if let DbConnectionUrl::File(inner) = self {
×
83
            if !inner.is_absolute() {
×
84
                *inner = base_path.as_ref().join(inner.as_path());
×
85
            }
×
86
        }
×
87
    }
×
88
}
89

90
impl From<DbConnectionUrl> for String {
91
    fn from(source: DbConnectionUrl) -> Self {
×
92
        source.to_url_string()
×
93
    }
×
94
}
95

96
impl TryFrom<String> for DbConnectionUrl {
97
    type Error = String;
98

99
    fn try_from(value: String) -> Result<Self, Self::Error> {
×
100
        if value.as_str() == ":memory:" {
×
101
            Ok(Self::Memory)
×
102
        } else {
103
            Ok(Self::File(PathBuf::from(value)))
×
104
        }
105
    }
×
106
}
107

108
lazy_static::lazy_static! {
109
    static ref DB_WRITE_LOCK: Arc<RwLock<()>> = Arc::new(RwLock::new(()));
110
}
111

112
/// An SQLite database connection using the Diesel ORM with its r2d2 connection pool and SQLite WAL backend.
113
/// --------------------------------------------------------------------------------------------------------------------
114
/// Notes on SQLite’s Concurrency Limitations (causes of intermittent “Database is Locked” errors)
115
///
116
/// SQLite allows only one writer at a time, even in WAL mode. Under high concurrency (e.g. many threads doing writes),
117
/// collisions are inevitable – one transaction holds an exclusive write lock while others must wait. If a write lock
118
/// cannot be acquired within the busy_timeout, SQLite returns a SQLITE_BUSY (“database is locked”) error. In WAL mode,
119
/// readers don’t block writers and vice versa, but still only one writer can commit at any given moment. This
120
/// single-writer bottleneck means that bursts of simultaneous writes can lead to contention. If a transaction takes too
121
/// long (holding the lock), queued writers may time out (even with a 60s timeout). In short, heavy write concurrency
122
/// can exceed SQLite’s design limits, causing intermittent “database is locked” errors during high load.
123
///
124
/// “Busy Timeout” Not Always Honored – Deferred Write Pitfall: Even with WAL + a busy timeout, you can still get
125
/// immediate lock errors in certain cases. A known scenario involves deferred transactions upgrading to writes, often
126
/// called the “write-after-read” pattern. By default, BEGIN in SQLite is deferred – the transaction starts as read-only
127
/// if the first statement is a SELECT. If you later issue a write in that same transaction, SQLite will try to upgrade
128
/// it to a write transaction.
129
///
130
/// Mitigations and Best Practices for Write Concurrency with SQLite
131
/// - Use WAL Mode and Busy Timeout
132
/// - Start Write Transactions in IMMEDIATE Mode (`SqliteConnection::immediate_transaction(...)`)
133
/// - Keep Transactions Short and Optimize Write Duration
134
/// - Limit Write Concurrency & Pool Sizing
135
/// - Handle and Retry Busy Errors Gracefully
136
/// -
137
/// --------------------------------------------------------------------------------------------------------------------
138
#[derive(Clone)]
139
pub struct DbConnection {
140
    pool: SqliteConnectionPool,
141
}
142

143
impl DbConnection {
144
    /// Connect using the given [DbConnectionUrl](self::DbConnectionUrl), optionally using the given pool size to
145
    /// override the default setting of 1.
146
    /// Note: See https://github.com/launchbadge/sqlx/issues/362#issuecomment-636661146
147
    pub fn connect_url(db_url: &DbConnectionUrl, sqlite_pool_size: Option<usize>) -> Result<Self, StorageError> {
408✔
148
        debug!(target: LOG_TARGET, "Connecting to database using '{db_url:?}'");
408✔
149

150
        // Ensure the path exists
151
        if let DbConnectionUrl::File(ref path) = db_url {
408✔
152
            if let Some(parent) = path.parent() {
188✔
153
                std::fs::create_dir_all(parent)?;
188✔
154
            }
×
155
        }
220✔
156

157
        let mut pool = SqliteConnectionPool::new(
408✔
158
            db_url.to_url_string(),
408✔
159
            sqlite_pool_size.unwrap_or(1),
408✔
160
            true,
408✔
161
            true,
408✔
162
            Duration::from_secs(60),
408✔
163
        );
408✔
164
        pool.create_pool()?;
408✔
165

166
        Ok(Self::new(pool))
408✔
167
    }
408✔
168

169
    fn acquire_migration_write_lock() -> Result<RwLockWriteGuard<'static, ()>, StorageError> {
223✔
170
        match DB_WRITE_LOCK.write() {
223✔
171
            Ok(value) => Ok(value),
223✔
172
            Err(err) => Err(StorageError::DatabaseMigrationLockError(format!(
×
NEW
173
                "Failed to acquire write lock for database migration: {err}"
×
174
            ))),
×
175
        }
176
    }
223✔
177

178
    /// Connect and migrate the database, once complete, then return a handle to the migrated database.
179
    pub fn connect_and_migrate(
223✔
180
        db_url: &DbConnectionUrl,
223✔
181
        migrations: EmbeddedMigrations,
223✔
182
        sqlite_pool_size: Option<usize>,
223✔
183
    ) -> Result<Self, StorageError> {
223✔
184
        let _lock = Self::acquire_migration_write_lock()?;
223✔
185
        let conn = Self::connect_url(db_url, sqlite_pool_size)?;
223✔
186
        let output = conn.migrate(migrations)?;
223✔
187
        debug!(target: LOG_TARGET, "Database migration: {}", output.trim());
223✔
188
        Ok(conn)
223✔
189
    }
223✔
190

191
    fn temp_db_dir() -> PathBuf {
372✔
192
        temp_dir().join("tari-temp")
372✔
193
    }
372✔
194

195
    /// Connect and migrate the database in a temporary location, then return a handle to the migrated database.
196
    pub fn connect_temp_file_and_migrate(migrations: EmbeddedMigrations) -> Result<Self, StorageError> {
186✔
197
        fn prefixed_string(prefix: &str, len: usize) -> String {
186✔
198
            let mut rng = thread_rng();
186✔
199
            let rand_str = iter::repeat(())
186✔
200
                .map(|_| rng.sample(Alphanumeric) as char)
3,720✔
201
                .take(len)
186✔
202
                .collect::<String>();
186✔
203
            format!("{prefix}{rand_str}")
186✔
204
        }
186✔
205

206
        let path = DbConnection::temp_db_dir().join(prefixed_string("data-", 20));
186✔
207
        fs::create_dir_all(&path)?;
186✔
208
        let db_url = DbConnectionUrl::File(path.join("my_temp.db"));
186✔
209
        DbConnection::connect_and_migrate(&db_url, migrations, Some(10))
186✔
210
    }
186✔
211

212
    fn new(pool: SqliteConnectionPool) -> Self {
408✔
213
        Self { pool }
408✔
214
    }
408✔
215

216
    /// Fetch a connection from the pool. This function synchronously blocks the current thread for up to 60 seconds or
217
    /// until a connection is available.
218
    pub fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, StorageError> {
12,198✔
219
        self.pool.get_pooled_connection().map_err(StorageError::DieselR2d2Error)
12,198✔
220
    }
12,198✔
221

222
    /// Run database migrations
223
    pub fn migrate(&self, migrations: EmbeddedMigrations) -> Result<String, StorageError> {
223✔
224
        let mut conn = self.get_pooled_connection()?;
223✔
225
        let result: Vec<String> = conn
223✔
226
            .run_pending_migrations(migrations)
223✔
227
            .map(|v| v.into_iter().map(|b| format!("Running migration {b}")).collect())
1,018✔
228
            .map_err(|err| StorageError::DatabaseMigrationFailed(format!("Database migration failed {err}")))?;
223✔
229

230
        Ok(result.join("\r\n"))
223✔
231
    }
223✔
232

233
    #[cfg(test)]
234
    pub(crate) fn db_path(&self) -> PathBuf {
1✔
235
        self.pool.db_path()
1✔
236
    }
1✔
237
}
238

239
impl Drop for DbConnection {
240
    fn drop(&mut self) {
2,124✔
241
        let path = self.pool.db_path();
2,124✔
242

2,124✔
243
        if path.exists() {
2,124✔
244
            if let Some(parent) = path.parent() {
186✔
245
                if parent.starts_with(DbConnection::temp_db_dir()) {
186✔
246
                    debug!(target: LOG_TARGET, "DbConnection - Dropping database: {}", path.display());
186✔
247
                    // Explicitly cleanup and drop the connection pool to ensure all connections are released
248
                    let pool_state = self.pool.cleanup();
186✔
249
                    debug!(target: LOG_TARGET, "DbConnection - Pool stats before cleanup: {pool_state:?}");
186✔
250
                    debug!(target: LOG_TARGET, "DbConnection - Cleaning up tempdir: {}", parent.display());
186✔
251
                    if let Err(e) = fs::remove_dir_all(parent) {
186✔
NEW
252
                        error!(target: LOG_TARGET, "Failed to clean up temp dir: {e}");
×
253
                    } else {
254
                        debug!(target: LOG_TARGET, "Temp dir cleaned up: {}", parent.display());
186✔
255
                    }
256
                }
×
257
            }
×
258
        }
1,938✔
259
    }
2,124✔
260
}
261

262
impl PooledDbConnection for DbConnection {
263
    type Error = SqliteStorageError;
264

265
    fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, Self::Error> {
7,398✔
266
        let conn = self.pool.get_pooled_connection()?;
7,398✔
267
        Ok(conn)
7,398✔
268
    }
7,398✔
269
}
270

271
#[cfg(test)]
272
mod test {
273
    use diesel::{dsl::sql, sql_types::Integer, RunQueryDsl};
274
    use diesel_migrations::embed_migrations;
275

276
    use super::*;
277

278
    #[tokio::test]
279
    async fn connect_and_migrate() {
1✔
280
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./test/migrations");
1✔
281

1✔
282
        let db_conn = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
1✔
283
        let path = db_conn.db_path();
1✔
284
        let mut pool_conn = db_conn.get_pooled_connection().unwrap();
1✔
285
        let count: i32 = sql::<Integer>("SELECT COUNT(*) FROM test_table")
1✔
286
            .get_result(&mut pool_conn)
1✔
287
            .unwrap();
1✔
288
        assert_eq!(count, 0);
1✔
289

1✔
290
        // Test temporary file cleanup
1✔
291
        assert!(path.exists());
1✔
292
        drop(pool_conn);
1✔
293
        drop(db_conn);
1✔
294
        assert!(!path.exists());
1✔
295
    }
1✔
296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc