• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 8248404462

12 Mar 2024 12:01PM UTC coverage: 76.009% (+0.3%) from 75.719%
8248404462

push

github

web-flow
chore: new release v1.0.0-pre.11a (#6205)

Description
---
new release with windows terminal fix

75213 of 98953 relevant lines covered (76.01%)

312193.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.86
/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{convert::TryFrom, str::FromStr};
24

25
use chrono::{NaiveDateTime, Utc};
26
use derivative::Derivative;
27
use diesel::{
28
    prelude::*,
29
    r2d2::{ConnectionManager, PooledConnection},
30
    result::Error as DieselError,
31
    SqliteConnection,
32
};
33
use log::*;
34
pub use new_output_sql::NewOutputSql;
35
pub use output_sql::OutputSql;
36
use tari_common_sqlite::{sqlite_connection_pool::PooledDbConnection, util::diesel_ext::ExpectedRowsExtension};
37
use tari_common_types::{
38
    transaction::TxId,
39
    types::{Commitment, FixedHash},
40
};
41
use tari_core::transactions::{
42
    key_manager::TariKeyId,
43
    transaction_components::{OutputType, TransactionOutput},
44
};
45
use tari_crypto::tari_utilities::{hex::Hex, ByteArray};
46
use tari_script::{ExecutionStack, TariScript};
47
use tokio::time::Instant;
48

49
use crate::{
50
    output_manager_service::{
51
        error::OutputManagerStorageError,
52
        service::Balance,
53
        storage::{
54
            database::{DbKey, DbKeyValuePair, DbValue, OutputBackendQuery, OutputManagerBackend, WriteOperation},
55
            models::{DbWalletOutput, KnownOneSidedPaymentScript},
56
            OutputStatus,
57
        },
58
        UtxoSelectionCriteria,
59
    },
60
    schema::{known_one_sided_payment_scripts, outputs},
61
    storage::sqlite_utilities::wallet_db_connection::WalletDbConnection,
62
};
63
mod new_output_sql;
64
mod output_sql;
65
const LOG_TARGET: &str = "wallet::output_manager_service::database::wallet";
66

67
/// A Sqlite backend for the Output Manager Service. The Backend is accessed via a connection pool to the Sqlite file.
68
#[derive(Clone)]
37✔
69
pub struct OutputManagerSqliteDatabase {
70
    database_connection: WalletDbConnection,
71
}
72

73
impl OutputManagerSqliteDatabase {
74
    pub fn new(database_connection: WalletDbConnection) -> Self {
71✔
75
        Self { database_connection }
71✔
76
    }
71✔
77

78
    fn insert(
8,548✔
79
        &self,
8,548✔
80
        key_value_pair: DbKeyValuePair,
8,548✔
81
        conn: &mut SqliteConnection,
8,548✔
82
    ) -> Result<(), OutputManagerStorageError> {
8,548✔
83
        match key_value_pair {
8,548✔
84
            DbKeyValuePair::UnspentOutput(c, o) => {
8,511✔
85
                if OutputSql::find_by_commitment_and_cancelled(&c.to_vec(), false, conn).is_ok() {
8,511✔
86
                    return Err(OutputManagerStorageError::DuplicateOutput);
1✔
87
                }
8,510✔
88
                let new_output = NewOutputSql::new(*o, Some(OutputStatus::UnspentMinedUnconfirmed), None)?;
8,510✔
89
                new_output.commit(conn)?
8,510✔
90
            },
91
            DbKeyValuePair::UnspentOutputWithTxId(c, (tx_id, o)) => {
18✔
92
                if OutputSql::find_by_commitment_and_cancelled(&c.to_vec(), false, conn).is_ok() {
18✔
93
                    return Err(OutputManagerStorageError::DuplicateOutput);
1✔
94
                }
17✔
95
                let new_output = NewOutputSql::new(*o, Some(OutputStatus::UnspentMinedUnconfirmed), Some(tx_id))?;
17✔
96
                new_output.commit(conn)?
17✔
97
            },
98
            DbKeyValuePair::OutputToBeReceived(c, (tx_id, o)) => {
18✔
99
                if OutputSql::find_by_commitment_and_cancelled(&c.to_vec(), false, conn).is_ok() {
18✔
100
                    return Err(OutputManagerStorageError::DuplicateOutput);
×
101
                }
18✔
102
                let new_output = NewOutputSql::new(*o, Some(OutputStatus::EncumberedToBeReceived), Some(tx_id))?;
18✔
103
                new_output.commit(conn)?
18✔
104
            },
105

106
            DbKeyValuePair::KnownOneSidedPaymentScripts(script) => {
1✔
107
                let script_sql = KnownOneSidedPaymentScriptSql::from_known_one_sided_payment_script(script)?;
1✔
108
                if KnownOneSidedPaymentScriptSql::find(&script_sql.script_hash, conn).is_ok() {
1✔
109
                    return Err(OutputManagerStorageError::DuplicateScript);
×
110
                }
1✔
111
                script_sql.commit(conn)?
1✔
112
            },
113
        }
114
        Ok(())
8,545✔
115
    }
8,548✔
116
}
117

118
impl OutputManagerBackend for OutputManagerSqliteDatabase {
119
    #[allow(clippy::cognitive_complexity)]
120
    #[allow(clippy::too_many_lines)]
121
    fn fetch(&self, key: &DbKey) -> Result<Option<DbValue>, OutputManagerStorageError> {
32✔
122
        let start = Instant::now();
32✔
123
        let mut conn = self.database_connection.get_pooled_connection()?;
32✔
124
        let acquire_lock = start.elapsed();
32✔
125

126
        let result = match key {
32✔
127
            DbKey::SpentOutput(k) => match OutputSql::find_status(k, OutputStatus::Spent, &mut conn) {
×
128
                Ok(o) => Some(DbValue::SpentOutput(Box::new(o.to_db_wallet_output()?))),
×
129
                Err(e) => {
×
130
                    match e {
×
131
                        OutputManagerStorageError::DieselError(DieselError::NotFound) => (),
×
132
                        e => return Err(e),
×
133
                    };
134
                    None
×
135
                },
136
            },
137
            DbKey::UnspentOutput(k) => match OutputSql::find_status(k, OutputStatus::Unspent, &mut conn) {
×
138
                Ok(o) => Some(DbValue::UnspentOutput(Box::new(o.to_db_wallet_output()?))),
×
139
                Err(e) => {
×
140
                    match e {
×
141
                        OutputManagerStorageError::DieselError(DieselError::NotFound) => (),
×
142
                        e => return Err(e),
×
143
                    };
144
                    None
×
145
                },
146
            },
147
            DbKey::UnspentOutputHash(hash) => {
×
148
                match OutputSql::find_by_hash(hash.as_slice(), OutputStatus::Unspent, &mut conn) {
×
149
                    Ok(o) => Some(DbValue::UnspentOutput(Box::new(o.to_db_wallet_output()?))),
×
150
                    Err(e) => {
×
151
                        match e {
×
152
                            OutputManagerStorageError::DieselError(DieselError::NotFound) => (),
×
153
                            e => return Err(e),
×
154
                        };
155
                        None
×
156
                    },
157
                }
158
            },
159
            DbKey::AnyOutputByCommitment(commitment) => {
9✔
160
                match OutputSql::find_by_commitment(&commitment.to_vec(), &mut conn) {
9✔
161
                    Ok(o) => Some(DbValue::AnyOutput(Box::new(o.to_db_wallet_output()?))),
2✔
162
                    Err(e) => {
7✔
163
                        match e {
7✔
164
                            OutputManagerStorageError::DieselError(DieselError::NotFound) => (),
7✔
165
                            e => return Err(e),
×
166
                        };
167
                        None
7✔
168
                    },
169
                }
170
            },
171
            DbKey::OutputsByTxIdAndStatus(tx_id, status) => {
×
172
                let outputs = OutputSql::find_by_tx_id_and_status(*tx_id, *status, &mut conn)?;
×
173

174
                Some(DbValue::AnyOutputs(
175
                    outputs
×
176
                        .iter()
×
177
                        .map(|o| o.clone().to_db_wallet_output())
×
178
                        .collect::<Result<Vec<_>, _>>()?,
×
179
                ))
180
            },
181
            DbKey::UnspentOutputs => {
182
                let outputs = OutputSql::index_status(
14✔
183
                    vec![OutputStatus::Unspent, OutputStatus::UnspentMinedUnconfirmed],
14✔
184
                    &mut conn,
14✔
185
                )?;
14✔
186

187
                Some(DbValue::UnspentOutputs(
188
                    outputs
14✔
189
                        .iter()
14✔
190
                        .map(|o| o.clone().to_db_wallet_output())
78✔
191
                        .collect::<Result<Vec<_>, _>>()?,
14✔
192
                ))
193
            },
194
            DbKey::SpentOutputs => {
195
                let outputs = OutputSql::index_status(vec![OutputStatus::Spent], &mut conn)?;
2✔
196

197
                Some(DbValue::SpentOutputs(
198
                    outputs
2✔
199
                        .iter()
2✔
200
                        .map(|o| o.clone().to_db_wallet_output())
4✔
201
                        .collect::<Result<Vec<_>, _>>()?,
2✔
202
                ))
203
            },
204
            DbKey::TimeLockedUnspentOutputs(tip) => {
2✔
205
                let outputs = OutputSql::index_time_locked(*tip, &mut conn)?;
2✔
206

207
                Some(DbValue::UnspentOutputs(
208
                    outputs
2✔
209
                        .iter()
2✔
210
                        .map(|o| o.clone().to_db_wallet_output())
2✔
211
                        .collect::<Result<Vec<_>, _>>()?,
2✔
212
                ))
213
            },
214
            DbKey::InvalidOutputs => {
215
                let outputs = OutputSql::index_status(vec![OutputStatus::Invalid], &mut conn)?;
1✔
216

217
                Some(DbValue::InvalidOutputs(
218
                    outputs
1✔
219
                        .iter()
1✔
220
                        .map(|o| o.clone().to_db_wallet_output())
1✔
221
                        .collect::<Result<Vec<_>, _>>()?,
1✔
222
                ))
223
            },
224
            DbKey::KnownOneSidedPaymentScripts => {
225
                let known_one_sided_payment_scripts = KnownOneSidedPaymentScriptSql::index(&mut conn)?;
4✔
226

227
                Some(DbValue::KnownOneSidedPaymentScripts(
228
                    known_one_sided_payment_scripts
4✔
229
                        .iter()
4✔
230
                        .map(|script| script.clone().to_known_one_sided_payment_script())
4✔
231
                        .collect::<Result<Vec<_>, _>>()?,
4✔
232
                ))
233
            },
234
        };
235
        if start.elapsed().as_millis() > 0 {
32✔
236
            trace!(
18✔
237
                target: LOG_TARGET,
238
                "sqlite profile - fetch '{}': lock {} + db_op {} = {} ms",
×
239
                key,
×
240
                acquire_lock.as_millis(),
×
241
                (start.elapsed() - acquire_lock).as_millis(),
×
242
                start.elapsed().as_millis()
×
243
            );
244
        }
14✔
245

246
        Ok(result)
32✔
247
    }
32✔
248

249
    fn fetch_with_features(&self, output_type: OutputType) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
250
        let mut conn = self.database_connection.get_pooled_connection()?;
×
251
        let outputs = OutputSql::index_by_output_type(output_type, &mut conn)?;
×
252

253
        outputs
×
254
            .iter()
×
255
            .map(|o| o.clone().to_db_wallet_output())
×
256
            .collect::<Result<Vec<_>, _>>()
×
257
    }
×
258

259
    fn fetch_sorted_unspent_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
3✔
260
        let mut conn = self.database_connection.get_pooled_connection()?;
3✔
261
        let outputs = OutputSql::index_unspent(&mut conn)?;
3✔
262

263
        outputs
3✔
264
            .into_iter()
3✔
265
            .map(|o| o.to_db_wallet_output())
21✔
266
            .collect::<Result<Vec<_>, _>>()
3✔
267
    }
3✔
268

269
    fn fetch_mined_unspent_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
14✔
270
        let start = Instant::now();
14✔
271
        let mut conn = self.database_connection.get_pooled_connection()?;
14✔
272
        let acquire_lock = start.elapsed();
14✔
273
        let outputs = OutputSql::index_marked_deleted_in_block_is_null(&mut conn)?;
14✔
274

275
        if start.elapsed().as_millis() > 0 {
14✔
276
            trace!(
×
277
                target: LOG_TARGET,
278
                "sqlite profile - fetch_mined_unspent_outputs: lock {} + db_op {} = {} ms",
×
279
                acquire_lock.as_millis(),
×
280
                (start.elapsed() - acquire_lock).as_millis(),
×
281
                start.elapsed().as_millis()
×
282
            );
283
        }
14✔
284

285
        outputs
14✔
286
            .into_iter()
14✔
287
            .map(|o| o.to_db_wallet_output())
36✔
288
            .collect::<Result<Vec<_>, _>>()
14✔
289
    }
14✔
290

291
    fn fetch_invalid_outputs(&self, timestamp: i64) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
6✔
292
        let start = Instant::now();
6✔
293
        let mut conn = self.database_connection.get_pooled_connection()?;
6✔
294
        let acquire_lock = start.elapsed();
6✔
295
        let outputs = OutputSql::index_invalid(&NaiveDateTime::from_timestamp_opt(timestamp, 0).unwrap(), &mut conn)?;
6✔
296

297
        if start.elapsed().as_millis() > 0 {
6✔
298
            trace!(
1✔
299
                target: LOG_TARGET,
300
                "sqlite profile - fetch_invalid_outputs: lock {} + db_op {} = {} ms",
×
301
                acquire_lock.as_millis(),
×
302
                (start.elapsed() - acquire_lock).as_millis(),
×
303
                start.elapsed().as_millis()
×
304
            );
305
        }
5✔
306

307
        outputs
6✔
308
            .into_iter()
6✔
309
            .map(|o| o.to_db_wallet_output())
6✔
310
            .collect::<Result<Vec<_>, _>>()
6✔
311
    }
6✔
312

313
    fn fetch_unspent_mined_unconfirmed_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
12✔
314
        let start = Instant::now();
12✔
315
        let mut conn = self.database_connection.get_pooled_connection()?;
12✔
316
        let acquire_lock = start.elapsed();
12✔
317
        let outputs = OutputSql::index_unconfirmed(&mut conn)?;
12✔
318

319
        if start.elapsed().as_millis() > 0 {
12✔
320
            trace!(
1✔
321
                target: LOG_TARGET,
322
                "sqlite profile - fetch_unspent_mined_unconfirmed_outputs: lock {} + db_op {} = {} ms",
×
323
                acquire_lock.as_millis(),
×
324
                (start.elapsed() - acquire_lock).as_millis(),
×
325
                start.elapsed().as_millis()
×
326
            );
327
        }
11✔
328

329
        outputs
12✔
330
            .into_iter()
12✔
331
            .map(|o| o.to_db_wallet_output())
47✔
332
            .collect::<Result<Vec<_>, _>>()
12✔
333
    }
12✔
334

335
    fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, OutputManagerStorageError> {
8,549✔
336
        let start = Instant::now();
8,549✔
337
        let mut conn = self.database_connection.get_pooled_connection()?;
8,549✔
338
        let acquire_lock = start.elapsed();
8,549✔
339

8,549✔
340
        let mut msg = "".to_string();
8,549✔
341
        let result = match op {
8,549✔
342
            WriteOperation::Insert(kvp) => {
8,548✔
343
                msg.push_str("Insert");
8,548✔
344
                self.insert(kvp, &mut conn)?;
8,548✔
345
                Ok(None)
8,545✔
346
            },
347
            WriteOperation::Remove(k) => match k {
1✔
348
                DbKey::AnyOutputByCommitment(commitment) => {
1✔
349
                    conn.transaction::<_, _, _>(|conn| {
1✔
350
                        msg.push_str("Remove");
1✔
351
                        // Used by coinbase when mining.
1✔
352
                        match OutputSql::find_by_commitment(&commitment.to_vec(), conn) {
1✔
353
                            Ok(o) => {
1✔
354
                                o.delete(conn)?;
1✔
355
                                Ok(Some(DbValue::AnyOutput(Box::new(o.to_db_wallet_output()?))))
1✔
356
                            },
357
                            Err(e) => match e {
×
358
                                OutputManagerStorageError::DieselError(DieselError::NotFound) => Ok(None),
×
359
                                e => Err(e),
×
360
                            },
361
                        }
362
                    })
1✔
363
                },
364
                DbKey::SpentOutput(_s) => Err(OutputManagerStorageError::OperationNotSupported),
×
365
                DbKey::UnspentOutputHash(_h) => Err(OutputManagerStorageError::OperationNotSupported),
×
366
                DbKey::UnspentOutput(_k) => Err(OutputManagerStorageError::OperationNotSupported),
×
367
                DbKey::UnspentOutputs => Err(OutputManagerStorageError::OperationNotSupported),
×
368
                DbKey::SpentOutputs => Err(OutputManagerStorageError::OperationNotSupported),
×
369
                DbKey::InvalidOutputs => Err(OutputManagerStorageError::OperationNotSupported),
×
370
                DbKey::TimeLockedUnspentOutputs(_) => Err(OutputManagerStorageError::OperationNotSupported),
×
371
                DbKey::KnownOneSidedPaymentScripts => Err(OutputManagerStorageError::OperationNotSupported),
×
372
                DbKey::OutputsByTxIdAndStatus(_, _) => Err(OutputManagerStorageError::OperationNotSupported),
×
373
            },
374
        };
375
        if start.elapsed().as_millis() > 0 {
8,546✔
376
            trace!(
8,038✔
377
                target: LOG_TARGET,
378
                "sqlite profile - write {}: lock {} + db_op {} = {} ms",
×
379
                msg,
×
380
                acquire_lock.as_millis(),
×
381
                (start.elapsed() - acquire_lock).as_millis(),
×
382
                start.elapsed().as_millis()
×
383
            );
384
        }
508✔
385

386
        result
8,546✔
387
    }
8,549✔
388

389
    fn fetch_pending_incoming_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
1✔
390
        let start = Instant::now();
1✔
391
        let mut conn = self.database_connection.get_pooled_connection()?;
1✔
392
        let acquire_lock = start.elapsed();
1✔
393

394
        let outputs = OutputSql::index_status(
1✔
395
            vec![
1✔
396
                OutputStatus::EncumberedToBeReceived,
1✔
397
                OutputStatus::UnspentMinedUnconfirmed,
1✔
398
                OutputStatus::ShortTermEncumberedToBeReceived,
1✔
399
            ],
1✔
400
            &mut conn,
1✔
401
        )?;
1✔
402

403
        if start.elapsed().as_millis() > 0 {
1✔
404
            trace!(
×
405
                target: LOG_TARGET,
406
                "sqlite profile - fetch_pending_incoming_outputs: lock {} + db_op {} = {} ms",
×
407
                acquire_lock.as_millis(),
×
408
                (start.elapsed() - acquire_lock).as_millis(),
×
409
                start.elapsed().as_millis()
×
410
            );
411
        }
1✔
412
        outputs
1✔
413
            .iter()
1✔
414
            .map(|o| o.clone().to_db_wallet_output())
2✔
415
            .collect::<Result<Vec<_>, _>>()
1✔
416
    }
1✔
417

418
    fn set_received_output_mined_height_and_status(
25✔
419
        &self,
25✔
420
        hash: FixedHash,
25✔
421
        mined_height: u64,
25✔
422
        mined_in_block: FixedHash,
25✔
423
        confirmed: bool,
25✔
424
        mined_timestamp: u64,
25✔
425
    ) -> Result<(), OutputManagerStorageError> {
25✔
426
        let start = Instant::now();
25✔
427
        let mut conn = self.database_connection.get_pooled_connection()?;
25✔
428
        let acquire_lock = start.elapsed();
25✔
429
        let status = if confirmed {
25✔
430
            OutputStatus::Unspent as i32
15✔
431
        } else {
432
            OutputStatus::UnspentMinedUnconfirmed as i32
10✔
433
        };
434
        debug!(
25✔
435
            target: LOG_TARGET,
436
            "`set_received_output_mined_height` status: {}", status
×
437
        );
438
        let hash = hash.to_vec();
25✔
439
        let mined_in_block = mined_in_block.to_vec();
25✔
440
        let timestamp = NaiveDateTime::from_timestamp_opt(mined_timestamp as i64, 0).ok_or(
25✔
441
            OutputManagerStorageError::ConversionError {
25✔
442
                reason: format!("Could not create timestamp mined_timestamp: {}", mined_timestamp),
25✔
443
            },
25✔
444
        )?;
25✔
445
        diesel::update(outputs::table.filter(outputs::hash.eq(hash)))
25✔
446
            .set((
25✔
447
                outputs::mined_height.eq(mined_height as i64),
25✔
448
                outputs::mined_in_block.eq(mined_in_block),
25✔
449
                outputs::status.eq(status),
25✔
450
                outputs::mined_timestamp.eq(timestamp),
25✔
451
                outputs::marked_deleted_at_height.eq::<Option<i64>>(None),
25✔
452
                outputs::marked_deleted_in_block.eq::<Option<Vec<u8>>>(None),
25✔
453
                outputs::last_validation_timestamp.eq::<Option<NaiveDateTime>>(None),
25✔
454
            ))
25✔
455
            .execute(&mut conn)
25✔
456
            .num_rows_affected_or_not_found(1)?;
25✔
457
        if start.elapsed().as_millis() > 0 {
25✔
458
            trace!(
3✔
459
                target: LOG_TARGET,
460
                "sqlite profile - set_received_output_mined_height: lock {} + db_op {} = {} ms",
×
461
                acquire_lock.as_millis(),
×
462
                (start.elapsed() - acquire_lock).as_millis(),
×
463
                start.elapsed().as_millis()
×
464
            );
465
        }
22✔
466

467
        Ok(())
25✔
468
    }
25✔
469

470
    fn set_output_to_unmined_and_invalid(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError> {
3✔
471
        let start = Instant::now();
3✔
472
        let mut conn = self.database_connection.get_pooled_connection()?;
3✔
473
        let acquire_lock = start.elapsed();
3✔
474
        let hash = hash.to_vec();
3✔
475
        diesel::update(outputs::table.filter(outputs::hash.eq(hash)))
3✔
476
            .set((
3✔
477
                outputs::mined_height.eq::<Option<i64>>(None),
3✔
478
                outputs::mined_in_block.eq::<Option<Vec<u8>>>(None),
3✔
479
                outputs::status.eq(OutputStatus::Invalid as i32),
3✔
480
                outputs::mined_timestamp.eq::<Option<NaiveDateTime>>(None),
3✔
481
                outputs::marked_deleted_at_height.eq::<Option<i64>>(None),
3✔
482
                outputs::marked_deleted_in_block.eq::<Option<Vec<u8>>>(None),
3✔
483
            ))
3✔
484
            .execute(&mut conn)
3✔
485
            .num_rows_affected_or_not_found(1)?;
3✔
486
        if start.elapsed().as_millis() > 0 {
3✔
487
            trace!(
×
488
                target: LOG_TARGET,
489
                "sqlite profile - set_output_to_unmined: lock {} + db_op {} = {} ms",
×
490
                acquire_lock.as_millis(),
×
491
                (start.elapsed() - acquire_lock).as_millis(),
×
492
                start.elapsed().as_millis()
×
493
            );
494
        }
3✔
495

496
        Ok(())
3✔
497
    }
3✔
498

499
    fn set_outputs_to_be_revalidated(&self) -> Result<(), OutputManagerStorageError> {
2✔
500
        let start = Instant::now();
2✔
501
        let mut conn = self.database_connection.get_pooled_connection()?;
2✔
502
        let acquire_lock = start.elapsed();
2✔
503
        let result = diesel::update(outputs::table)
2✔
504
            .set((
2✔
505
                outputs::mined_height.eq::<Option<i64>>(None),
2✔
506
                outputs::mined_in_block.eq::<Option<Vec<u8>>>(None),
2✔
507
                outputs::status.eq(OutputStatus::Invalid as i32),
2✔
508
                outputs::mined_timestamp.eq::<Option<NaiveDateTime>>(None),
2✔
509
                outputs::marked_deleted_at_height.eq::<Option<i64>>(None),
2✔
510
                outputs::marked_deleted_in_block.eq::<Option<Vec<u8>>>(None),
2✔
511
            ))
2✔
512
            .execute(&mut conn)?;
2✔
513

514
        trace!(target: LOG_TARGET, "rows updated: {:?}", result);
2✔
515
        if start.elapsed().as_millis() > 0 {
2✔
516
            trace!(
×
517
                target: LOG_TARGET,
518
                "sqlite profile - set_outputs_to_be_revalidated: lock {} + db_op {} = {} ms",
×
519
                acquire_lock.as_millis(),
×
520
                (start.elapsed() - acquire_lock).as_millis(),
×
521
                start.elapsed().as_millis()
×
522
            );
523
        }
2✔
524

525
        Ok(())
2✔
526
    }
2✔
527

528
    fn update_last_validation_timestamp(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError> {
1✔
529
        let start = Instant::now();
1✔
530
        let mut conn = self.database_connection.get_pooled_connection()?;
1✔
531
        let acquire_lock = start.elapsed();
1✔
532
        let hash = hash.to_vec();
1✔
533
        diesel::update(outputs::table.filter(outputs::hash.eq(hash)))
1✔
534
            .set((outputs::last_validation_timestamp
1✔
535
                .eq::<Option<NaiveDateTime>>(NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0)),))
1✔
536
            .execute(&mut conn)
1✔
537
            .num_rows_affected_or_not_found(1)?;
1✔
538
        if start.elapsed().as_millis() > 0 {
1✔
539
            trace!(
×
540
                target: LOG_TARGET,
541
                "sqlite profile - set_output_to_be_revalidated_in_the_future: lock {} + db_op {} = {} ms",
×
542
                acquire_lock.as_millis(),
×
543
                (start.elapsed() - acquire_lock).as_millis(),
×
544
                start.elapsed().as_millis()
×
545
            );
546
        }
1✔
547

548
        Ok(())
1✔
549
    }
1✔
550

551
    fn mark_output_as_spent(
14✔
552
        &self,
14✔
553
        hash: FixedHash,
14✔
554
        mark_deleted_at_height: u64,
14✔
555
        mark_deleted_in_block: FixedHash,
14✔
556
        confirmed: bool,
14✔
557
    ) -> Result<(), OutputManagerStorageError> {
14✔
558
        let start = Instant::now();
14✔
559
        let mut conn = self.database_connection.get_pooled_connection()?;
14✔
560
        let acquire_lock = start.elapsed();
14✔
561
        let hash = hash.to_vec();
14✔
562
        let mark_deleted_in_block = mark_deleted_in_block.to_vec();
14✔
563
        let status = if confirmed {
14✔
564
            OutputStatus::Spent as i32
6✔
565
        } else {
566
            OutputStatus::SpentMinedUnconfirmed as i32
8✔
567
        };
568
        diesel::update(outputs::table.filter(outputs::hash.eq(hash)))
14✔
569
            .set((
14✔
570
                outputs::marked_deleted_at_height.eq(mark_deleted_at_height as i64),
14✔
571
                outputs::marked_deleted_in_block.eq(mark_deleted_in_block),
14✔
572
                outputs::status.eq(status),
14✔
573
            ))
14✔
574
            .execute(&mut conn)
14✔
575
            .num_rows_affected_or_not_found(1)?;
14✔
576
        if start.elapsed().as_millis() > 0 {
14✔
577
            trace!(
×
578
                target: LOG_TARGET,
579
                "sqlite profile - mark_output_as_spent: lock {} + db_op {} = {} ms",
×
580
                acquire_lock.as_millis(),
×
581
                (start.elapsed() - acquire_lock).as_millis(),
×
582
                start.elapsed().as_millis()
×
583
            );
584
        }
14✔
585

586
        Ok(())
14✔
587
    }
14✔
588

589
    fn mark_output_as_unspent(&self, hash: FixedHash, confirmed: bool) -> Result<(), OutputManagerStorageError> {
8,511✔
590
        let start = Instant::now();
8,511✔
591
        let mut conn = self.database_connection.get_pooled_connection()?;
8,511✔
592
        let acquire_lock = start.elapsed();
8,511✔
593
        let hash = hash.to_vec();
8,511✔
594
        let status = if confirmed {
8,511✔
595
            OutputStatus::Unspent
8,511✔
596
        } else {
597
            OutputStatus::UnspentMinedUnconfirmed
×
598
        };
599
        debug!(target: LOG_TARGET, "mark_output_as_unspent({})", hash.to_hex());
8,511✔
600
        diesel::update(outputs::table.filter(outputs::hash.eq(hash)))
8,511✔
601
            .set((
8,511✔
602
                outputs::marked_deleted_at_height.eq::<Option<i64>>(None),
8,511✔
603
                outputs::marked_deleted_in_block.eq::<Option<Vec<u8>>>(None),
8,511✔
604
                outputs::status.eq(status as i32),
8,511✔
605
            ))
8,511✔
606
            .execute(&mut conn)
8,511✔
607
            .num_rows_affected_or_not_found(1)?;
8,511✔
608
        if start.elapsed().as_millis() > 0 {
8,511✔
609
            trace!(
6,423✔
610
                target: LOG_TARGET,
611
                "sqlite profile - mark_output_as_unspent: lock {} + db_op {} = {} ms",
×
612
                acquire_lock.as_millis(),
×
613
                (start.elapsed() - acquire_lock).as_millis(),
×
614
                start.elapsed().as_millis()
×
615
            );
616
        }
2,088✔
617

618
        Ok(())
8,511✔
619
    }
8,511✔
620

621
    fn short_term_encumber_outputs(
51✔
622
        &self,
51✔
623
        tx_id: TxId,
51✔
624
        outputs_to_send: &[DbWalletOutput],
51✔
625
        outputs_to_receive: &[DbWalletOutput],
51✔
626
    ) -> Result<(), OutputManagerStorageError> {
51✔
627
        let start = Instant::now();
51✔
628
        let mut conn = self.database_connection.get_pooled_connection()?;
51✔
629
        let acquire_lock = start.elapsed();
51✔
630

51✔
631
        let mut commitments = Vec::with_capacity(outputs_to_send.len());
51✔
632
        for output in outputs_to_send {
8,148✔
633
            commitments.push(output.commitment.as_bytes());
8,097✔
634
        }
8,097✔
635
        conn.transaction::<_, _, _>(|conn| {
51✔
636
            // Any output in the list without the `Unspent` status will invalidate the encumberance
51✔
637
            if !OutputSql::find_by_commitments_excluding_status(commitments.clone(), OutputStatus::Unspent, conn)?
51✔
638
                .is_empty()
49✔
639
            {
640
                return Err(OutputManagerStorageError::OutputAlreadySpent);
×
641
            };
49✔
642

643
            let count = OutputSql::update_by_commitments(
49✔
644
                commitments,
49✔
645
                UpdateOutput {
49✔
646
                    status: Some(OutputStatus::ShortTermEncumberedToBeSpent),
49✔
647
                    spent_in_tx_id: Some(Some(tx_id)),
49✔
648
                    ..Default::default()
49✔
649
                },
49✔
650
                conn,
49✔
651
            )?;
49✔
652
            if count != outputs_to_send.len() {
49✔
653
                let msg = format!(
×
654
                    "Inconsistent short term encumbering! Lengths do not match - {} vs {}",
×
655
                    count,
×
656
                    outputs_to_send.len()
×
657
                );
×
658
                error!(target: LOG_TARGET, "{}", msg,);
×
659
                return Err(OutputManagerStorageError::UnexpectedResult(msg));
×
660
            }
49✔
661

49✔
662
            Ok(())
49✔
663
        })?;
51✔
664

665
        for co in outputs_to_receive {
631✔
666
            let new_output = NewOutputSql::new(
583✔
667
                co.clone(),
583✔
668
                Some(OutputStatus::ShortTermEncumberedToBeReceived),
583✔
669
                Some(tx_id),
583✔
670
            )?;
583✔
671
            new_output.commit(&mut conn)?;
583✔
672
        }
673
        if start.elapsed().as_millis() > 0 {
48✔
674
            trace!(
45✔
675
                target: LOG_TARGET,
676
                "sqlite profile - short_term_encumber_outputs (TxId: {}): lock {} + db_op {} = {} ms",
×
677
                tx_id,
×
678
                acquire_lock.as_millis(),
×
679
                (start.elapsed() - acquire_lock).as_millis(),
×
680
                start.elapsed().as_millis()
×
681
            );
682
        }
3✔
683

684
        Ok(())
48✔
685
    }
51✔
686

687
    fn confirm_encumbered_outputs(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
34✔
688
        let start = Instant::now();
34✔
689
        let mut conn = self.database_connection.get_pooled_connection()?;
34✔
690
        let acquire_lock = start.elapsed();
34✔
691

34✔
692
        conn.transaction::<_, _, _>(|conn| {
34✔
693
            update_outputs_with_tx_id_and_status_to_new_status(
34✔
694
                conn,
34✔
695
                tx_id,
34✔
696
                OutputStatus::ShortTermEncumberedToBeReceived,
34✔
697
                OutputStatus::EncumberedToBeReceived,
34✔
698
            )?;
34✔
699

700
            update_outputs_with_tx_id_and_status_to_new_status(
34✔
701
                conn,
34✔
702
                tx_id,
34✔
703
                OutputStatus::ShortTermEncumberedToBeSpent,
34✔
704
                OutputStatus::EncumberedToBeSpent,
34✔
705
            )
34✔
706
        })?;
34✔
707

708
        if start.elapsed().as_millis() > 0 {
34✔
709
            trace!(
4✔
710
                target: LOG_TARGET,
711
                "sqlite profile - confirm_encumbered_outputs (TxId: {}): lock {} + db_op {} = {} ms",
×
712
                tx_id,
×
713
                acquire_lock.as_millis(),
×
714
                (start.elapsed() - acquire_lock).as_millis(),
×
715
                start.elapsed().as_millis()
×
716
            );
717
        }
30✔
718

719
        Ok(())
34✔
720
    }
34✔
721

722
    fn clear_short_term_encumberances(&self) -> Result<(), OutputManagerStorageError> {
71✔
723
        let start = Instant::now();
71✔
724
        let mut conn = self.database_connection.get_pooled_connection()?;
71✔
725
        let acquire_lock = start.elapsed();
71✔
726

71✔
727
        conn.transaction::<_, _, _>(|conn| {
71✔
728
            diesel::update(
71✔
729
                outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeReceived as i32)),
71✔
730
            )
71✔
731
            .set((
71✔
732
                outputs::status.eq(OutputStatus::CancelledInbound as i32),
71✔
733
                outputs::last_validation_timestamp
71✔
734
                    .eq(NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0).unwrap()),
71✔
735
            ))
71✔
736
            .execute(conn)?;
71✔
737

738
            diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeSpent as i32)))
71✔
739
                .set((outputs::status.eq(OutputStatus::Unspent as i32),))
71✔
740
                .execute(conn)
71✔
741
        })?;
71✔
742

743
        if start.elapsed().as_millis() > 0 {
71✔
744
            trace!(
4✔
745
                target: LOG_TARGET,
746
                "sqlite profile - clear_short_term_encumberances: lock {} + db_op {} = {} ms",
×
747
                acquire_lock.as_millis(),
×
748
                (start.elapsed() - acquire_lock).as_millis(),
×
749
                start.elapsed().as_millis()
×
750
            );
751
        }
67✔
752

753
        Ok(())
71✔
754
    }
71✔
755

756
    fn get_last_mined_output(&self) -> Result<Option<DbWalletOutput>, OutputManagerStorageError> {
16✔
757
        let start = Instant::now();
16✔
758
        let mut conn = self.database_connection.get_pooled_connection()?;
16✔
759
        let acquire_lock = start.elapsed();
16✔
760

761
        let output = OutputSql::first_by_mined_height_desc(&mut conn)?;
16✔
762
        if start.elapsed().as_millis() > 0 {
16✔
763
            trace!(
×
764
                target: LOG_TARGET,
765
                "sqlite profile - get_last_mined_output: lock {} + db_op {} = {} ms",
×
766
                acquire_lock.as_millis(),
×
767
                (start.elapsed() - acquire_lock).as_millis(),
×
768
                start.elapsed().as_millis()
×
769
            );
770
        }
16✔
771
        match output {
16✔
772
            Some(o) => Ok(Some(o.to_db_wallet_output()?)),
9✔
773
            None => Ok(None),
7✔
774
        }
775
    }
16✔
776

777
    fn get_last_spent_output(&self) -> Result<Option<DbWalletOutput>, OutputManagerStorageError> {
13✔
778
        let start = Instant::now();
13✔
779
        let mut conn = self.database_connection.get_pooled_connection()?;
13✔
780
        let acquire_lock = start.elapsed();
13✔
781

782
        let output = OutputSql::first_by_marked_deleted_height_desc(&mut conn)?;
13✔
783
        if start.elapsed().as_millis() > 0 {
13✔
784
            trace!(
1✔
785
                target: LOG_TARGET,
786
                "sqlite profile - get_last_spent_output: lock {} + db_op {} = {} ms",
×
787
                acquire_lock.as_millis(),
×
788
                (start.elapsed() - acquire_lock).as_millis(),
×
789
                start.elapsed().as_millis()
×
790
            );
791
        }
12✔
792
        match output {
13✔
793
            Some(o) => Ok(Some(o.to_db_wallet_output()?)),
3✔
794
            None => Ok(None),
10✔
795
        }
796
    }
13✔
797

798
    fn get_balance(
58✔
799
        &self,
58✔
800
        current_tip_for_time_lock_calculation: Option<u64>,
58✔
801
    ) -> Result<Balance, OutputManagerStorageError> {
58✔
802
        let start = Instant::now();
58✔
803
        let mut conn = self.database_connection.get_pooled_connection()?;
58✔
804
        let acquire_lock = start.elapsed();
58✔
805

58✔
806
        let result = OutputSql::get_balance(current_tip_for_time_lock_calculation, &mut conn);
58✔
807
        if start.elapsed().as_millis() > 0 {
58✔
808
            trace!(
6✔
809
                target: LOG_TARGET,
810
                "sqlite profile - get_balance: lock {} + db_op {} = {} ms",
×
811
                acquire_lock.as_millis(),
×
812
                (start.elapsed() - acquire_lock).as_millis(),
×
813
                start.elapsed().as_millis()
×
814
            );
815
        }
52✔
816
        result
58✔
817
    }
58✔
818

819
    fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
10✔
820
        let start = Instant::now();
10✔
821
        let mut conn = self.database_connection.get_pooled_connection()?;
10✔
822
        let acquire_lock = start.elapsed();
10✔
823

10✔
824
        conn.transaction::<_, _, _>(|conn| {
10✔
825
            let outputs = OutputSql::find_by_tx_id_and_encumbered(tx_id, conn)?;
10✔
826

827
            if outputs.is_empty() {
10✔
828
                return Err(OutputManagerStorageError::ValueNotFound);
2✔
829
            }
8✔
830

831
            for output in &outputs {
2,801✔
832
                if output.received_in_tx_id == Some(tx_id.as_i64_wrapped()) {
2,793✔
833
                    info!(
9✔
834
                        target: LOG_TARGET,
835
                        "Cancelling pending inbound output with Commitment: {} - from TxId: {}",
×
836
                        output.commitment.to_hex(),
×
837
                        tx_id
838
                    );
839
                    output.update(
9✔
840
                        UpdateOutput {
9✔
841
                            status: Some(OutputStatus::CancelledInbound),
9✔
842
                            last_validation_timestamp: Some(Some(
9✔
843
                                NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0).unwrap(),
9✔
844
                            )),
9✔
845
                            ..Default::default()
9✔
846
                        },
9✔
847
                        conn,
9✔
848
                    )?;
9✔
849
                } else if output.spent_in_tx_id == Some(tx_id.as_i64_wrapped()) {
2,784✔
850
                    info!(
2,784✔
851
                        target: LOG_TARGET,
852
                        "Cancelling pending outbound output with Commitment: {} - from TxId: {}",
×
853
                        output.commitment.to_hex(),
×
854
                        tx_id
855
                    );
856
                    output.update(
2,784✔
857
                        UpdateOutput {
2,784✔
858
                            status: Some(OutputStatus::Unspent),
2,784✔
859
                            spent_in_tx_id: Some(None),
2,784✔
860
                            // We clear these so that the output will be revalidated the next time a validation is done.
2,784✔
861
                            mined_height: Some(None),
2,784✔
862
                            mined_in_block: Some(None),
2,784✔
863
                            ..Default::default()
2,784✔
864
                        },
2,784✔
865
                        conn,
2,784✔
866
                    )?;
2,784✔
867
                } else {
×
868
                    // can only be one of the two
×
869
                }
×
870
            }
871

872
            Ok(())
8✔
873
        })?;
10✔
874

875
        if start.elapsed().as_millis() > 0 {
8✔
876
            trace!(
8✔
877
                target: LOG_TARGET,
878
                "sqlite profile - cancel_pending_transaction: lock {} + db_op {} = {} ms",
×
879
                acquire_lock.as_millis(),
×
880
                (start.elapsed() - acquire_lock).as_millis(),
×
881
                start.elapsed().as_millis()
×
882
            );
883
        }
×
884

885
        Ok(())
8✔
886
    }
10✔
887

888
    // This is typically used by a receiver after the finalized transaction has been broadcast/returned by the sender
889
    // as the sender has to finalize the signature that was partially constructed by the receiver
890
    fn update_output_metadata_signature(&self, output: &TransactionOutput) -> Result<(), OutputManagerStorageError> {
6✔
891
        let start = Instant::now();
6✔
892
        let mut conn = self.database_connection.get_pooled_connection()?;
6✔
893
        let acquire_lock = start.elapsed();
6✔
894

6✔
895
        conn.transaction::<_, OutputManagerStorageError, _>(|conn| {
6✔
896
            let db_output = OutputSql::find_by_commitment_and_cancelled(&output.commitment.to_vec(), false, conn)?;
6✔
897
            db_output.update(
5✔
898
                // Note: Only the `ephemeral_pubkey` and `u_y` portion needs to be updated at this time as the rest was
5✔
899
                // already correct
5✔
900
                UpdateOutput {
5✔
901
                    metadata_signature_ephemeral_pubkey: Some(output.metadata_signature.ephemeral_pubkey().to_vec()),
5✔
902
                    metadata_signature_u_y: Some(output.metadata_signature.u_y().to_vec()),
5✔
903
                    hash: Some(output.hash().to_vec()),
5✔
904
                    ..Default::default()
5✔
905
                },
5✔
906
                conn,
5✔
907
            )?;
5✔
908

909
            Ok(())
5✔
910
        })?;
6✔
911
        if start.elapsed().as_millis() > 0 {
5✔
912
            trace!(
4✔
913
                target: LOG_TARGET,
914
                "sqlite profile - update_output_metadata_signature: lock {} + db_op {} = {} ms",
×
915
                acquire_lock.as_millis(),
×
916
                (start.elapsed() - acquire_lock).as_millis(),
×
917
                start.elapsed().as_millis()
×
918
            );
919
        }
1✔
920

921
        Ok(())
5✔
922
    }
6✔
923

924
    fn revalidate_unspent_output(&self, commitment: &Commitment) -> Result<(), OutputManagerStorageError> {
×
925
        let start = Instant::now();
×
926
        let mut conn = self.database_connection.get_pooled_connection()?;
×
927
        let acquire_lock = start.elapsed();
×
928

×
929
        conn.transaction::<_, _, _>(|conn| {
×
930
            let output = OutputSql::find_by_commitment_and_cancelled(&commitment.to_vec(), false, conn)?;
×
931

932
            if OutputStatus::try_from(output.status)? != OutputStatus::Invalid {
×
933
                return Err(OutputManagerStorageError::ValuesNotFound);
×
934
            }
×
935
            output.update(
×
936
                UpdateOutput {
×
937
                    status: Some(OutputStatus::Unspent),
×
938
                    ..Default::default()
×
939
                },
×
940
                conn,
×
941
            )?;
×
942

943
            Ok(())
×
944
        })?;
×
945
        if start.elapsed().as_millis() > 0 {
×
946
            trace!(
×
947
                target: LOG_TARGET,
948
                "sqlite profile - revalidate_unspent_output: lock {} + db_op {} = {} ms",
×
949
                acquire_lock.as_millis(),
×
950
                (start.elapsed() - acquire_lock).as_millis(),
×
951
                start.elapsed().as_millis()
×
952
            );
953
        }
×
954
        Ok(())
×
955
    }
×
956

957
    fn reinstate_cancelled_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
1✔
958
        let start = Instant::now();
1✔
959
        let mut conn = self.database_connection.get_pooled_connection()?;
1✔
960
        let acquire_lock = start.elapsed();
1✔
961

1✔
962
        update_outputs_with_tx_id_and_status_to_new_status(
1✔
963
            &mut conn,
1✔
964
            tx_id,
1✔
965
            OutputStatus::CancelledInbound,
1✔
966
            OutputStatus::EncumberedToBeReceived,
1✔
967
        )?;
1✔
968

969
        if start.elapsed().as_millis() > 0 {
1✔
970
            trace!(
×
971
                target: LOG_TARGET,
972
                "sqlite profile - reinstate_cancelled_inbound_output: lock {} + db_op {} = {} ms",
×
973
                acquire_lock.as_millis(),
×
974
                (start.elapsed() - acquire_lock).as_millis(),
×
975
                start.elapsed().as_millis()
×
976
            );
977
        }
1✔
978
        Ok(())
1✔
979
    }
1✔
980

981
    fn add_unvalidated_output(&self, output: DbWalletOutput, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
2✔
982
        let start = Instant::now();
2✔
983
        let mut conn = self.database_connection.get_pooled_connection()?;
2✔
984
        let acquire_lock = start.elapsed();
2✔
985

2✔
986
        if OutputSql::find_by_commitment_and_cancelled(&output.commitment.to_vec(), false, &mut conn).is_ok() {
2✔
987
            return Err(OutputManagerStorageError::DuplicateOutput);
×
988
        }
2✔
989
        let new_output = NewOutputSql::new(output, Some(OutputStatus::EncumberedToBeReceived), Some(tx_id))?;
2✔
990
        new_output.commit(&mut conn)?;
2✔
991

992
        if start.elapsed().as_millis() > 0 {
2✔
993
            trace!(
2✔
994
                target: LOG_TARGET,
995
                "sqlite profile - add_unvalidated_output: lock {} + db_op {} = {} ms",
×
996
                acquire_lock.as_millis(),
×
997
                (start.elapsed() - acquire_lock).as_millis(),
×
998
                start.elapsed().as_millis()
×
999
            );
1000
        }
×
1001
        Ok(())
2✔
1002
    }
2✔
1003

1004
    /// Retrieves UTXOs than can be spent, sorted by priority, then value from smallest to largest.
1005
    fn fetch_unspent_outputs_for_spending(
60✔
1006
        &self,
60✔
1007
        selection_criteria: &UtxoSelectionCriteria,
60✔
1008
        amount: u64,
60✔
1009
        tip_height: Option<u64>,
60✔
1010
    ) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
60✔
1011
        let start = Instant::now();
60✔
1012
        let mut conn = self.database_connection.get_pooled_connection()?;
60✔
1013
        let acquire_lock = start.elapsed();
60✔
1014

1015
        let outputs = OutputSql::fetch_unspent_outputs_for_spending(selection_criteria, amount, tip_height, &mut conn)?;
60✔
1016

1017
        trace!(
60✔
1018
            target: LOG_TARGET,
1019
            "sqlite profile - fetch_unspent_outputs_for_spending: lock {} + db_op {} = {} ms",
×
1020
            acquire_lock.as_millis(),
×
1021
            (start.elapsed() - acquire_lock).as_millis(),
×
1022
            start.elapsed().as_millis()
×
1023
        );
1024
        outputs
60✔
1025
            .iter()
60✔
1026
            .map(|o| o.clone().to_db_wallet_output())
9,211✔
1027
            .collect::<Result<Vec<_>, _>>()
60✔
1028
    }
60✔
1029

1030
    fn fetch_outputs_by_tx_id(&self, tx_id: TxId) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
5✔
1031
        let mut conn = self.database_connection.get_pooled_connection()?;
5✔
1032
        let outputs = OutputSql::find_by_tx_id(tx_id, &mut conn)?;
5✔
1033

1034
        outputs
5✔
1035
            .iter()
5✔
1036
            .map(|o| o.clone().to_db_wallet_output())
5✔
1037
            .collect::<Result<Vec<_>, _>>()
5✔
1038
    }
5✔
1039

1040
    fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
1041
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1042
        Ok(OutputSql::fetch_outputs_by_query(q, &mut conn)?
×
1043
            .into_iter()
×
1044
            .filter_map(|x| {
×
1045
                x.to_db_wallet_output()
×
1046
                    .map_err(|e| {
×
1047
                        error!(
×
1048
                            target: LOG_TARGET,
1049
                            "failed to convert `OutputSql` to `DbWalletOutput`: {:#?}", e
×
1050
                        );
1051
                        e
×
1052
                    })
×
1053
                    .ok()
×
1054
            })
×
1055
            .collect())
×
1056
    }
×
1057
}
1058

1059
fn update_outputs_with_tx_id_and_status_to_new_status(
1060
    conn: &mut PooledConnection<ConnectionManager<SqliteConnection>>,
1061
    tx_id: TxId,
1062
    from_status: OutputStatus,
1063
    to_status: OutputStatus,
1064
) -> Result<(), OutputManagerStorageError> {
1065
    diesel::update(
69✔
1066
        outputs::table
69✔
1067
            .filter(
69✔
1068
                outputs::received_in_tx_id
69✔
1069
                    .eq(Some(tx_id.as_u64() as i64))
69✔
1070
                    .or(outputs::spent_in_tx_id.eq(Some(tx_id.as_u64() as i64))),
69✔
1071
            )
69✔
1072
            .filter(outputs::status.eq(from_status as i32)),
69✔
1073
    )
69✔
1074
    .set(outputs::status.eq(to_status as i32))
69✔
1075
    .execute(conn)?;
69✔
1076
    Ok(())
69✔
1077
}
69✔
1078

1079
/// These are the fields that can be updated for an Output
1080
#[derive(Clone, Default)]
2,849✔
1081
pub struct UpdateOutput {
1082
    status: Option<OutputStatus>,
1083
    hash: Option<Vec<u8>>,
1084
    received_in_tx_id: Option<Option<TxId>>,
1085
    spent_in_tx_id: Option<Option<TxId>>,
1086
    metadata_signature_ephemeral_commitment: Option<Vec<u8>>,
1087
    metadata_signature_ephemeral_pubkey: Option<Vec<u8>>,
1088
    metadata_signature_u_a: Option<Vec<u8>>,
1089
    metadata_signature_u_x: Option<Vec<u8>>,
1090
    metadata_signature_u_y: Option<Vec<u8>>,
1091
    mined_height: Option<Option<u64>>,
1092
    mined_in_block: Option<Option<Vec<u8>>>,
1093
    last_validation_timestamp: Option<Option<NaiveDateTime>>,
1094
}
1095

1096
#[derive(AsChangeset)]
2,849✔
1097
#[diesel(table_name = outputs)]
1098
pub struct UpdateOutputSql {
1099
    status: Option<i32>,
1100
    hash: Option<Vec<u8>>,
1101
    received_in_tx_id: Option<Option<i64>>,
1102
    spent_in_tx_id: Option<Option<i64>>,
1103
    metadata_signature_ephemeral_commitment: Option<Vec<u8>>,
1104
    metadata_signature_ephemeral_pubkey: Option<Vec<u8>>,
1105
    metadata_signature_u_a: Option<Vec<u8>>,
1106
    metadata_signature_u_x: Option<Vec<u8>>,
1107
    metadata_signature_u_y: Option<Vec<u8>>,
1108
    mined_height: Option<Option<i64>>,
1109
    mined_in_block: Option<Option<Vec<u8>>>,
1110
    last_validation_timestamp: Option<Option<NaiveDateTime>>,
1111
}
1112

1113
/// Map a Rust friendly UpdateOutput to the Sql data type form
1114
impl From<UpdateOutput> for UpdateOutputSql {
1115
    fn from(u: UpdateOutput) -> Self {
2,849✔
1116
        Self {
2,849✔
1117
            status: u.status.map(|t| t as i32),
2,849✔
1118
            hash: u.hash,
2,849✔
1119
            metadata_signature_ephemeral_commitment: u.metadata_signature_ephemeral_commitment,
2,849✔
1120
            metadata_signature_ephemeral_pubkey: u.metadata_signature_ephemeral_pubkey,
2,849✔
1121
            metadata_signature_u_a: u.metadata_signature_u_a,
2,849✔
1122
            metadata_signature_u_x: u.metadata_signature_u_x,
2,849✔
1123
            metadata_signature_u_y: u.metadata_signature_u_y,
2,849✔
1124
            received_in_tx_id: u.received_in_tx_id.map(|o| o.map(TxId::as_i64_wrapped)),
2,849✔
1125
            spent_in_tx_id: u.spent_in_tx_id.map(|o| o.map(TxId::as_i64_wrapped)),
2,849✔
1126
            mined_height: u.mined_height.map(|t| t.map(|h| h as i64)),
2,849✔
1127
            mined_in_block: u.mined_in_block,
2,849✔
1128
            last_validation_timestamp: u.last_validation_timestamp,
2,849✔
1129
        }
2,849✔
1130
    }
2,849✔
1131
}
1132

1133
#[derive(Clone, Derivative, Queryable, Insertable, Identifiable, PartialEq, AsChangeset)]
3✔
1134
#[derivative(Debug)]
×
1135
#[diesel(table_name = known_one_sided_payment_scripts)]
×
1136
#[diesel(primary_key(script_hash))]
×
1137
// #[identifiable_options(primary_key(hash))]
×
1138
pub struct KnownOneSidedPaymentScriptSql {
×
1139
    pub script_hash: Vec<u8>,
×
1140
    pub private_key: String,
×
1141
    pub script: Vec<u8>,
×
1142
    pub input: Vec<u8>,
×
1143
    pub script_lock_height: i64,
×
1144
}
×
1145

1146
/// These are the fields that can be updated for an Output
1147
#[derive(AsChangeset)]
×
1148
#[diesel(table_name = known_one_sided_payment_scripts)]
1149
pub struct UpdateKnownOneSidedPaymentScript {
1150
    script: Option<Vec<u8>>,
1151
    input: Option<Vec<u8>>,
1152
}
1153

1154
impl KnownOneSidedPaymentScriptSql {
1155
    /// Write this struct to the database
1156
    pub fn commit(&self, conn: &mut SqliteConnection) -> Result<(), OutputManagerStorageError> {
1157
        diesel::insert_into(known_one_sided_payment_scripts::table)
1✔
1158
            .values(self.clone())
1✔
1159
            .execute(conn)?;
1✔
1160
        Ok(())
1✔
1161
    }
1✔
1162

1163
    /// Find a particular script, if it exists
1164
    pub fn find(
1✔
1165
        hash: &[u8],
1✔
1166
        conn: &mut SqliteConnection,
1✔
1167
    ) -> Result<KnownOneSidedPaymentScriptSql, OutputManagerStorageError> {
1✔
1168
        Ok(known_one_sided_payment_scripts::table
1✔
1169
            .filter(known_one_sided_payment_scripts::script_hash.eq(hash))
1✔
1170
            .first::<KnownOneSidedPaymentScriptSql>(conn)?)
1✔
1171
    }
1✔
1172

1173
    /// Return all known scripts
1174
    pub fn index(conn: &mut SqliteConnection) -> Result<Vec<KnownOneSidedPaymentScriptSql>, OutputManagerStorageError> {
4✔
1175
        Ok(known_one_sided_payment_scripts::table.load::<KnownOneSidedPaymentScriptSql>(conn)?)
4✔
1176
    }
4✔
1177

1178
    pub fn delete(&self, conn: &mut SqliteConnection) -> Result<(), OutputManagerStorageError> {
×
1179
        let num_deleted = diesel::delete(
×
1180
            known_one_sided_payment_scripts::table
×
1181
                .filter(known_one_sided_payment_scripts::script_hash.eq(&self.script_hash)),
×
1182
        )
×
1183
        .execute(conn)?;
×
1184

1185
        if num_deleted == 0 {
×
1186
            return Err(OutputManagerStorageError::ValuesNotFound);
×
1187
        }
×
1188

×
1189
        Ok(())
×
1190
    }
×
1191

1192
    pub fn update(
1193
        &self,
1194
        updated_known_script: UpdateKnownOneSidedPaymentScript,
1195
        conn: &mut SqliteConnection,
1196
    ) -> Result<KnownOneSidedPaymentScriptSql, OutputManagerStorageError> {
1197
        diesel::update(
×
1198
            known_one_sided_payment_scripts::table
×
1199
                .filter(known_one_sided_payment_scripts::script_hash.eq(&self.script_hash)),
×
1200
        )
×
1201
        .set(updated_known_script)
×
1202
        .execute(conn)
×
1203
        .num_rows_affected_or_not_found(1)?;
×
1204

1205
        KnownOneSidedPaymentScriptSql::find(&self.script_hash, conn)
×
1206
    }
×
1207

1208
    /// Conversion from an KnownOneSidedPaymentScriptSQL to the datatype form
1209
    pub fn to_known_one_sided_payment_script(self) -> Result<KnownOneSidedPaymentScript, OutputManagerStorageError> {
2✔
1210
        let script_hash = self.script_hash.clone();
2✔
1211
        let private_key =
2✔
1212
            TariKeyId::from_str(&self.private_key).map_err(|_| OutputManagerStorageError::ConversionError {
2✔
1213
                reason: "Could not convert private key to TariKeyId".to_string(),
×
1214
            })?;
2✔
1215

1216
        let script = TariScript::from_bytes(&self.script).map_err(|_| {
2✔
1217
            error!(target: LOG_TARGET, "Could not create tari script from stored bytes");
×
1218
            OutputManagerStorageError::ConversionError {
×
1219
                reason: "Tari Script could not be converted from bytes".to_string(),
×
1220
            }
×
1221
        })?;
2✔
1222
        let input = ExecutionStack::from_bytes(&self.input).map_err(|_| {
2✔
1223
            error!(target: LOG_TARGET, "Could not create execution stack from stored bytes");
×
1224
            OutputManagerStorageError::ConversionError {
×
1225
                reason: "ExecutionStack could not be converted from bytes".to_string(),
×
1226
            }
×
1227
        })?;
2✔
1228
        let script_lock_height = self.script_lock_height as u64;
2✔
1229

2✔
1230
        Ok(KnownOneSidedPaymentScript {
2✔
1231
            script_hash,
2✔
1232
            script_key_id: private_key,
2✔
1233
            script,
2✔
1234
            input,
2✔
1235
            script_lock_height,
2✔
1236
        })
2✔
1237
    }
2✔
1238

1239
    /// Conversion from an KnownOneSidedPaymentScriptSQL to the datatype form
1240
    pub fn from_known_one_sided_payment_script(
1✔
1241
        known_script: KnownOneSidedPaymentScript,
1✔
1242
    ) -> Result<Self, OutputManagerStorageError> {
1✔
1243
        let script_lock_height = known_script.script_lock_height as i64;
1✔
1244
        let script_hash = known_script.script_hash;
1✔
1245
        let private_key = known_script.script_key_id.to_string();
1✔
1246
        let script = known_script.script.to_bytes().to_vec();
1✔
1247
        let input = known_script.input.to_bytes().to_vec();
1✔
1248

1✔
1249
        let payment_script = KnownOneSidedPaymentScriptSql {
1✔
1250
            script_hash,
1✔
1251
            private_key,
1✔
1252
            script,
1✔
1253
            input,
1✔
1254
            script_lock_height,
1✔
1255
        };
1✔
1256

1✔
1257
        Ok(payment_script)
1✔
1258
    }
1✔
1259
}
1260

1261
#[cfg(test)]
1262
mod test {
1263

1264
    use diesel::{sql_query, Connection, RunQueryDsl, SqliteConnection};
1265
    use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
1266
    use rand::{rngs::OsRng, RngCore};
1267
    use tari_core::transactions::{
1268
        key_manager::{create_memory_db_key_manager, MemoryDbKeyManager},
1269
        tari_amount::MicroMinotari,
1270
        test_helpers::{create_wallet_output_with_data, TestParams},
1271
        transaction_components::{OutputFeatures, TransactionInput, WalletOutput},
1272
    };
1273
    use tari_script::script;
1274
    use tari_test_utils::random;
1275
    use tempfile::tempdir;
1276

1277
    use crate::output_manager_service::storage::{
1278
        models::DbWalletOutput,
1279
        sqlite_db::{new_output_sql::NewOutputSql, output_sql::OutputSql, OutputStatus, UpdateOutput},
1280
        OutputSource,
1281
    };
1282

1283
    pub async fn make_input(val: MicroMinotari, key_manager: &MemoryDbKeyManager) -> (TransactionInput, WalletOutput) {
5✔
1284
        let test_params = TestParams::new(key_manager).await;
5✔
1285

1286
        let wallet_output =
5✔
1287
            create_wallet_output_with_data(script!(Nop), OutputFeatures::default(), &test_params, val, key_manager)
5✔
1288
                .await
1✔
1289
                .unwrap();
5✔
1290
        let input = wallet_output.to_transaction_input(key_manager).await.unwrap();
5✔
1291

5✔
1292
        (input, wallet_output)
5✔
1293
    }
5✔
1294

1295
    #[allow(clippy::too_many_lines)]
1296
    #[tokio::test]
1✔
1297
    async fn test_crud() {
1✔
1298
        let db_name = format!("{}.sqlite3", random::string(8).as_str());
1✔
1299
        let temp_dir = tempdir().unwrap();
1✔
1300
        let db_folder = temp_dir.path().to_str().unwrap().to_string();
1✔
1301
        let db_path = format!("{}{}", db_folder, db_name);
1✔
1302

1✔
1303
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
1✔
1304

1✔
1305
        let mut conn =
1✔
1306
            SqliteConnection::establish(&db_path).unwrap_or_else(|_| panic!("Error connecting to {}", db_path));
1✔
1307

1✔
1308
        conn.run_pending_migrations(MIGRATIONS)
1✔
1309
            .map(|v| {
1✔
1310
                v.into_iter()
1✔
1311
                    .map(|b| {
12✔
1312
                        let m = format!("Running migration {}", b);
12✔
1313
                        // std::io::stdout()
12✔
1314
                        //     .write_all(m.as_ref())
12✔
1315
                        //     .expect("Couldn't write migration number to stdout");
12✔
1316
                        m
12✔
1317
                    })
12✔
1318
                    .collect::<Vec<String>>()
1✔
1319
            })
1✔
1320
            .expect("Migrations failed");
1✔
1321

1✔
1322
        sql_query("PRAGMA foreign_keys = ON").execute(&mut conn).unwrap();
1✔
1323

1✔
1324
        let mut outputs = Vec::new();
1✔
1325
        let mut outputs_spent = Vec::new();
1✔
1326
        let mut outputs_unspent = Vec::new();
1✔
1327

1✔
1328
        let key_manager = create_memory_db_key_manager();
1✔
1329
        for _i in 0..2 {
3✔
1330
            let (_, uo) = make_input(MicroMinotari::from(100 + OsRng.next_u64() % 1000), &key_manager).await;
2✔
1331
            let uo = DbWalletOutput::from_wallet_output(uo, &key_manager, None, OutputSource::Standard, None, None)
2✔
1332
                .await
×
1333
                .unwrap();
2✔
1334
            let o = NewOutputSql::new(uo, Some(OutputStatus::Unspent), None).unwrap();
2✔
1335
            outputs.push(o.clone());
2✔
1336
            outputs_unspent.push(o.clone());
2✔
1337
            o.commit(&mut conn).unwrap();
2✔
1338
        }
1339

1340
        for _i in 0..3 {
4✔
1341
            let (_, uo) = make_input(MicroMinotari::from(100 + OsRng.next_u64() % 1000), &key_manager).await;
3✔
1342
            let uo = DbWalletOutput::from_wallet_output(uo, &key_manager, None, OutputSource::Standard, None, None)
3✔
1343
                .await
×
1344
                .unwrap();
3✔
1345
            let o = NewOutputSql::new(uo, Some(OutputStatus::Spent), None).unwrap();
3✔
1346
            outputs.push(o.clone());
3✔
1347
            outputs_spent.push(o.clone());
3✔
1348
            o.commit(&mut conn).unwrap();
3✔
1349
        }
1350

1351
        assert_eq!(
1✔
1352
            OutputSql::index(&mut conn)
1✔
1353
                .unwrap()
1✔
1354
                .iter()
1✔
1355
                .map(|o| o.spending_key.clone())
5✔
1356
                .collect::<Vec<String>>(),
1✔
1357
            outputs.iter().map(|o| o.spending_key.clone()).collect::<Vec<String>>()
5✔
1358
        );
1✔
1359
        assert_eq!(
1✔
1360
            OutputSql::index_status(vec!(OutputStatus::Unspent), &mut conn)
1✔
1361
                .unwrap()
1✔
1362
                .iter()
1✔
1363
                .map(|o| o.spending_key.clone())
2✔
1364
                .collect::<Vec<String>>(),
1✔
1365
            outputs_unspent
1✔
1366
                .iter()
1✔
1367
                .map(|o| o.spending_key.clone())
2✔
1368
                .collect::<Vec<String>>()
1✔
1369
        );
1✔
1370
        assert_eq!(
1✔
1371
            OutputSql::index_status(vec!(OutputStatus::Spent), &mut conn)
1✔
1372
                .unwrap()
1✔
1373
                .iter()
1✔
1374
                .map(|o| o.spending_key.clone())
3✔
1375
                .collect::<Vec<String>>(),
1✔
1376
            outputs_spent
1✔
1377
                .iter()
1✔
1378
                .map(|o| o.spending_key.clone())
3✔
1379
                .collect::<Vec<String>>()
1✔
1380
        );
1✔
1381

1382
        assert_eq!(
1✔
1383
            OutputSql::find(&outputs[0].spending_key, &mut conn)
1✔
1384
                .unwrap()
1✔
1385
                .spending_key,
1✔
1386
            outputs[0].spending_key
1✔
1387
        );
1✔
1388

1389
        assert_eq!(
1✔
1390
            OutputSql::find_status(&outputs[0].spending_key, OutputStatus::Unspent, &mut conn)
1✔
1391
                .unwrap()
1✔
1392
                .spending_key,
1✔
1393
            outputs[0].spending_key
1✔
1394
        );
1✔
1395

1396
        assert!(OutputSql::find_status(&outputs[0].spending_key, OutputStatus::Spent, &mut conn).is_err());
1✔
1397

1398
        let _result = OutputSql::find(&outputs[4].spending_key, &mut conn)
1✔
1399
            .unwrap()
1✔
1400
            .delete(&mut conn);
1✔
1401

1✔
1402
        assert_eq!(OutputSql::index(&mut conn).unwrap().len(), 4);
1✔
1403

1404
        let _updated1 = OutputSql::find(&outputs[0].spending_key, &mut conn)
1✔
1405
            .unwrap()
1✔
1406
            .update(
1✔
1407
                UpdateOutput {
1✔
1408
                    status: Some(OutputStatus::Unspent),
1✔
1409
                    received_in_tx_id: Some(Some(44u64.into())),
1✔
1410
                    ..Default::default()
1✔
1411
                },
1✔
1412
                &mut conn,
1✔
1413
            )
1✔
1414
            .unwrap();
1✔
1415

1✔
1416
        let _updated2 = OutputSql::find(&outputs[1].spending_key, &mut conn)
1✔
1417
            .unwrap()
1✔
1418
            .update(
1✔
1419
                UpdateOutput {
1✔
1420
                    status: Some(OutputStatus::EncumberedToBeReceived),
1✔
1421
                    received_in_tx_id: Some(Some(44u64.into())),
1✔
1422
                    ..Default::default()
1✔
1423
                },
1✔
1424
                &mut conn,
1✔
1425
            )
1✔
1426
            .unwrap();
1✔
1427

1✔
1428
        let result = OutputSql::find_by_tx_id_and_encumbered(44u64.into(), &mut conn).unwrap();
1✔
1429
        assert_eq!(result.len(), 1);
1✔
1430
        assert_eq!(result[0].spending_key, outputs[1].spending_key);
1✔
1431
    }
1432
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc