• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19468834672

18 Nov 2025 02:01PM UTC coverage: 51.294% (-0.3%) from 51.544%
19468834672

push

github

web-flow
feat: add coin selection and spending via bins or buckets (#7584)

Description
---

Add range limit coin-join:
- Added an unspent output coin distribution gRPC method
('CoinHistogramRequest'), whereby the wallet will return the amount and
value of coins in a pre-set range of buckets.
- Added a range limit coin-join gRPC method ('RangeLimitCoinJoin') to
the wallet, whereby the user can specify the minimum target amount,
maximum number of inputs, dust lower bound (inclusive), dust upper bound
(exclusive) and fee. Transaction size will be limited to the specified
maximum number of inputs, and multiple outputs will be created according
to the minimum target amount. All the inputs in the range will be spent,
unless the total available amount does not meet the minimum target
amount.

Closes #7582.

Motivation and Context
---
See #7582.

How Has This Been Tested?
---
System-level testing

gRPC **CoinHistogram** method
```
{
  "buckets": [
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "0",
      "upper_bound": "1000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "1000",
      "upper_bound": "100000"
    },
    {
      "count": "2",
      "total_amount": "1165548",
      "lower_bound": "100000",
      "upper_bound": "1000000"
    },
    {
      "count": "158",
      "total_amount": "1455989209",
      "lower_bound": "1000000",
      "upper_bound": "1000000000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "1000000000",
      "upper_bound": "100000000000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "100000000000",
      "upper_bound": "21000000000000000"
    }
  ]
}
```

gRPC **RangeLimitCoinJoin** method

In this example, two transactions were created, bounded by the 350 input
size limit. gRPC client view:

<img width="897" height="396" alt="image"
src="https://github.com/user-attachments/assets/6c5ae857-8a01-4c90-9c55-1eee2fbd... (continued)

0 of 636 new or added lines in 11 files covered. (0.0%)

17 existing lines in 8 files now uncovered.

59180 of 115373 relevant lines covered (51.29%)

7948.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.06
/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{convert::TryFrom, ops::Range, str::FromStr};
24

25
use chrono::{DateTime, NaiveDateTime, Utc};
26
use derivative::Derivative;
27
use diesel::{connection::SimpleConnection, prelude::*, result::Error as DieselError};
28
use log::*;
29
pub use new_output_sql::NewOutputSql;
30
pub use output_sql::OutputSql;
31
use tari_common_sqlite::{sqlite_connection_pool::PooledDbConnection, util::diesel_ext::ExpectedRowsExtension};
32
use tari_common_types::{
33
    transaction::TxId,
34
    types::{CompressedCommitment, FixedHash},
35
};
36
use tari_crypto::tari_utilities::{hex::Hex, ByteArray};
37
use tari_script::{ExecutionStack, TariScript};
38
use tari_transaction_components::{
39
    key_manager::TariKeyId,
40
    transaction_components::{OutputType, TransactionOutput},
41
    MicroMinotari,
42
};
43
use tokio::time::Instant;
44

45
use crate::{
46
    output_manager_service::{
47
        error::OutputManagerStorageError,
48
        service::Balance,
49
        storage::{
50
            database::{DbKey, DbKeyValuePair, DbValue, OutputBackendQuery, OutputManagerBackend, WriteOperation},
51
            models::{DbWalletOutput, KnownOneSidedPaymentScript},
52
            OutputStatus,
53
        },
54
        UtxoSelectionCriteria,
55
    },
56
    schema::{known_one_sided_payment_scripts, outputs, scanned_blocks},
57
    storage::sqlite_utilities::wallet_db_connection::WalletDbConnection,
58
};
59

60
mod new_output_sql;
61
mod output_sql;
62
const LOG_TARGET: &str = "wallet::output_manager_service::database::wallet";
63

64
/// A Sqlite backend for the Output Manager Service. The Backend is accessed via a connection pool to the Sqlite file.
65
#[derive(Clone)]
66
pub struct OutputManagerSqliteDatabase {
67
    database_connection: WalletDbConnection,
68
}
69

70
impl OutputManagerSqliteDatabase {
71
    pub fn new(database_connection: WalletDbConnection) -> Self {
×
72
        Self { database_connection }
×
73
    }
×
74

75
    fn insert(
×
76
        &self,
×
77
        key_value_pair: DbKeyValuePair,
×
78
        conn: &mut SqliteConnection,
×
79
    ) -> Result<(), OutputManagerStorageError> {
×
80
        match key_value_pair {
×
81
            DbKeyValuePair::UnspentOutput(c, o) => {
×
82
                if OutputSql::find_by_commitment_and_cancelled(&c.to_vec(), false, conn).is_ok() {
×
83
                    return Err(OutputManagerStorageError::DuplicateOutput);
×
84
                }
×
85
                let new_output = NewOutputSql::new(*o, Some(OutputStatus::UnspentMinedUnconfirmed), None)?;
×
86
                new_output.commit(conn)?
×
87
            },
88
            DbKeyValuePair::UnspentOutputWithTxId(c, (tx_id, o)) => {
×
89
                if OutputSql::find_by_commitment_and_cancelled(&c.to_vec(), false, conn).is_ok() {
×
90
                    return Err(OutputManagerStorageError::DuplicateOutput);
×
91
                }
×
92
                let new_output = NewOutputSql::new(*o, Some(OutputStatus::UnspentMinedUnconfirmed), Some(tx_id))?;
×
93
                new_output.commit(conn)?
×
94
            },
95
            DbKeyValuePair::OutputToBeReceived(c, (tx_id, o)) => {
×
96
                if OutputSql::find_by_commitment_and_cancelled(&c.to_vec(), false, conn).is_ok() {
×
97
                    return Err(OutputManagerStorageError::DuplicateOutput);
×
98
                }
×
99
                let new_output = NewOutputSql::new(*o, Some(OutputStatus::EncumberedToBeReceived), Some(tx_id))?;
×
100
                new_output.commit(conn)?
×
101
            },
102

103
            DbKeyValuePair::KnownOneSidedPaymentScripts(script) => {
×
104
                let script_sql = KnownOneSidedPaymentScriptSql::from_known_one_sided_payment_script(script)?;
×
105
                if KnownOneSidedPaymentScriptSql::find(&script_sql.script_hash, conn).is_ok() {
×
106
                    return Err(OutputManagerStorageError::DuplicateScript);
×
107
                }
×
108
                script_sql.commit(conn)?
×
109
            },
110
        }
111
        Ok(())
×
112
    }
×
113
}
114

115
impl OutputManagerBackend for OutputManagerSqliteDatabase {
116
    #[allow(clippy::cognitive_complexity)]
117
    #[allow(clippy::too_many_lines)]
118
    fn fetch(&self, key: &DbKey) -> Result<Option<DbValue>, OutputManagerStorageError> {
×
119
        let start = Instant::now();
×
120
        let mut conn = self.database_connection.get_pooled_connection()?;
×
121
        let acquire_lock = start.elapsed();
×
122

123
        let result = match key {
×
124
            DbKey::SpentOutput(k) => match OutputSql::find_status(k, OutputStatus::Spent, &mut conn) {
×
125
                Ok(o) => Some(DbValue::SpentOutput(Box::new(o.to_db_wallet_output()?))),
×
126
                Err(e) => {
×
127
                    match e {
×
128
                        OutputManagerStorageError::DieselError(DieselError::NotFound) => (),
×
129
                        e => return Err(e),
×
130
                    };
131
                    None
×
132
                },
133
            },
134
            DbKey::UnspentOutput(k) => match OutputSql::find_status(k, OutputStatus::Unspent, &mut conn) {
×
135
                Ok(o) => Some(DbValue::UnspentOutput(Box::new(o.to_db_wallet_output()?))),
×
136
                Err(e) => {
×
137
                    match e {
×
138
                        OutputManagerStorageError::DieselError(DieselError::NotFound) => (),
×
139
                        e => return Err(e),
×
140
                    };
141
                    None
×
142
                },
143
            },
144
            DbKey::UnspentOutputHash(hash) => {
×
145
                match OutputSql::find_by_hash(hash.as_slice(), OutputStatus::Unspent, &mut conn) {
×
146
                    Ok(o) => Some(DbValue::UnspentOutput(Box::new(o.to_db_wallet_output()?))),
×
147
                    Err(e) => {
×
148
                        match e {
×
149
                            OutputManagerStorageError::DieselError(DieselError::NotFound) => (),
×
150
                            e => return Err(e),
×
151
                        };
152
                        None
×
153
                    },
154
                }
155
            },
156
            DbKey::AnyOutputByCommitment(commitment) => {
×
157
                match OutputSql::find_by_commitment(&commitment.to_vec(), &mut conn) {
×
158
                    Ok(o) => Some(DbValue::AnyOutput(Box::new(o.to_db_wallet_output()?))),
×
159
                    Err(e) => {
×
160
                        match e {
×
161
                            OutputManagerStorageError::DieselError(DieselError::NotFound) => (),
×
162
                            e => return Err(e),
×
163
                        };
164
                        None
×
165
                    },
166
                }
167
            },
168
            DbKey::OutputsByTxIdAndStatus(tx_id, status) => {
×
169
                let outputs = OutputSql::find_by_tx_id_and_status(*tx_id, *status, &mut conn)?;
×
170

171
                Some(DbValue::AnyOutputs(
172
                    outputs
×
173
                        .iter()
×
174
                        .map(|o| o.clone().to_db_wallet_output())
×
175
                        .collect::<Result<Vec<_>, _>>()?,
×
176
                ))
177
            },
178
            DbKey::UnspentOutputs => {
179
                let outputs = OutputSql::index_status(
×
180
                    vec![OutputStatus::Unspent, OutputStatus::UnspentMinedUnconfirmed],
×
181
                    &mut conn,
×
182
                )?;
×
183

184
                Some(DbValue::UnspentOutputs(
185
                    outputs
×
186
                        .iter()
×
187
                        .map(|o| o.clone().to_db_wallet_output())
×
188
                        .collect::<Result<Vec<_>, _>>()?,
×
189
                ))
190
            },
191
            DbKey::SpentOutputs => {
192
                let outputs = OutputSql::index_status(vec![OutputStatus::Spent], &mut conn)?;
×
193

194
                Some(DbValue::SpentOutputs(
195
                    outputs
×
196
                        .iter()
×
197
                        .map(|o| o.clone().to_db_wallet_output())
×
198
                        .collect::<Result<Vec<_>, _>>()?,
×
199
                ))
200
            },
201
            DbKey::TimeLockedUnspentOutputs(tip) => {
×
202
                let outputs = OutputSql::index_time_locked(*tip, &mut conn)?;
×
203

204
                Some(DbValue::UnspentOutputs(
205
                    outputs
×
206
                        .iter()
×
207
                        .map(|o| o.clone().to_db_wallet_output())
×
208
                        .collect::<Result<Vec<_>, _>>()?,
×
209
                ))
210
            },
211
            DbKey::InvalidOutputs => {
212
                let outputs = OutputSql::index_status(vec![OutputStatus::Invalid], &mut conn)?;
×
213

214
                Some(DbValue::InvalidOutputs(
215
                    outputs
×
216
                        .iter()
×
217
                        .map(|o| o.clone().to_db_wallet_output())
×
218
                        .collect::<Result<Vec<_>, _>>()?,
×
219
                ))
220
            },
221
            DbKey::KnownOneSidedPaymentScripts => {
222
                let known_one_sided_payment_scripts = KnownOneSidedPaymentScriptSql::index(&mut conn)?;
×
223

224
                Some(DbValue::KnownOneSidedPaymentScripts(
225
                    known_one_sided_payment_scripts
×
226
                        .iter()
×
227
                        .map(|script| script.clone().to_known_one_sided_payment_script())
×
228
                        .collect::<Result<Vec<_>, _>>()?,
×
229
                ))
230
            },
231
        };
232
        if start.elapsed().as_millis() > 0 {
×
233
            trace!(
×
234
                target: LOG_TARGET,
×
235
                "sqlite profile - fetch '{}': lock {} + db_op {} = {} ms",
×
236
                key,
237
                acquire_lock.as_millis(),
×
238
                (start.elapsed() - acquire_lock).as_millis(),
×
239
                start.elapsed().as_millis()
×
240
            );
241
        }
×
242

243
        Ok(result)
×
244
    }
×
245

246
    fn fetch_with_features(&self, output_type: OutputType) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
247
        let mut conn = self.database_connection.get_pooled_connection()?;
×
248
        let outputs = OutputSql::index_by_output_type(output_type, &mut conn)?;
×
249

250
        outputs
×
251
            .iter()
×
252
            .map(|o| o.clone().to_db_wallet_output())
×
253
            .collect::<Result<Vec<_>, _>>()
×
254
    }
×
255

256
    fn fetch_sorted_unspent_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
257
        let mut conn = self.database_connection.get_pooled_connection()?;
×
258
        let outputs = OutputSql::index_unspent(&mut conn)?;
×
259

260
        outputs
×
261
            .into_iter()
×
262
            .map(|o| o.to_db_wallet_output())
×
263
            .collect::<Result<Vec<_>, _>>()
×
264
    }
×
265

266
    fn fetch_mined_unspent_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
267
        let start = Instant::now();
×
268
        let mut conn = self.database_connection.get_pooled_connection()?;
×
269
        let acquire_lock = start.elapsed();
×
270
        let outputs = OutputSql::index_marked_deleted_in_block_is_null(&mut conn)?;
×
271

272
        if start.elapsed().as_millis() > 0 {
×
273
            trace!(
×
274
                target: LOG_TARGET,
×
275
                "sqlite profile - fetch_mined_unspent_outputs: lock {} + db_op {} = {} ms",
×
276
                acquire_lock.as_millis(),
×
277
                (start.elapsed() - acquire_lock).as_millis(),
×
278
                start.elapsed().as_millis()
×
279
            );
280
        }
×
281

282
        outputs
×
283
            .into_iter()
×
284
            .map(|o| o.to_db_wallet_output())
×
285
            .collect::<Result<Vec<_>, _>>()
×
286
    }
×
287

288
    fn fetch_invalid_outputs(&self, timestamp: i64) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
289
        let start = Instant::now();
×
290
        let mut conn = self.database_connection.get_pooled_connection()?;
×
291
        let acquire_lock = start.elapsed();
×
292
        let outputs = OutputSql::index_invalid(
×
293
            &DateTime::<Utc>::from_timestamp(timestamp, 0).unwrap().naive_utc(),
×
294
            &mut conn,
×
295
        )?;
×
296

297
        if start.elapsed().as_millis() > 0 {
×
298
            trace!(
×
299
                target: LOG_TARGET,
×
300
                "sqlite profile - fetch_invalid_outputs: lock {} + db_op {} = {} ms",
×
301
                acquire_lock.as_millis(),
×
302
                (start.elapsed() - acquire_lock).as_millis(),
×
303
                start.elapsed().as_millis()
×
304
            );
305
        }
×
306

307
        outputs
×
308
            .into_iter()
×
309
            .map(|o| o.to_db_wallet_output())
×
310
            .collect::<Result<Vec<_>, _>>()
×
311
    }
×
312

313
    fn fetch_many_outputs(&self, outputs: &[FixedHash]) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
314
        let start = Instant::now();
×
315
        let mut conn = self.database_connection.get_pooled_connection()?;
×
316
        let acquire_lock = start.elapsed();
×
317
        let outputs = OutputSql::index_by_output_hashes(&mut conn, outputs)?;
×
318

319
        if start.elapsed().as_millis() > 0 {
×
320
            trace!(
×
321
                target: LOG_TARGET,
×
322
                "sqlite profile - fetch_many_outputs: lock {} + db_op {} = {} ms",
×
323
                acquire_lock.as_millis(),
×
324
                (start.elapsed() - acquire_lock).as_millis(),
×
325
                start.elapsed().as_millis()
×
326
            );
327
        }
×
328

329
        outputs.into_iter().map(|o| o.to_db_wallet_output()).collect()
×
330
    }
×
331

332
    fn fetch_unspent_mined_unconfirmed_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
333
        let start = Instant::now();
×
334
        let mut conn = self.database_connection.get_pooled_connection()?;
×
335
        let acquire_lock = start.elapsed();
×
336
        let outputs = OutputSql::index_unconfirmed(&mut conn)?;
×
337

338
        if start.elapsed().as_millis() > 0 {
×
339
            trace!(
×
340
                target: LOG_TARGET,
×
341
                "sqlite profile - fetch_unspent_mined_unconfirmed_outputs: lock {} + db_op {} = {} ms",
×
342
                acquire_lock.as_millis(),
×
343
                (start.elapsed() - acquire_lock).as_millis(),
×
344
                start.elapsed().as_millis()
×
345
            );
346
        }
×
347

348
        outputs
×
349
            .into_iter()
×
350
            .map(|o| o.to_db_wallet_output())
×
351
            .collect::<Result<Vec<_>, _>>()
×
352
    }
×
353

354
    fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, OutputManagerStorageError> {
×
355
        let start = Instant::now();
×
356
        let mut conn = self.database_connection.get_pooled_connection()?;
×
357
        let acquire_lock = start.elapsed();
×
358

359
        let mut msg = "".to_string();
×
360
        let result = match op {
×
361
            WriteOperation::Insert(kvp) => {
×
362
                msg.push_str("Insert");
×
363
                self.insert(kvp, &mut conn)?;
×
364
                Ok(None)
×
365
            },
366
            WriteOperation::Remove(k) => match k {
×
367
                DbKey::AnyOutputByCommitment(commitment) => {
×
368
                    conn.transaction::<_, _, _>(|conn| {
×
369
                        msg.push_str("Remove");
×
370
                        // Used by coinbase when mining.
371
                        match OutputSql::find_by_commitment(&commitment.to_vec(), conn) {
×
372
                            Ok(o) => {
×
373
                                o.delete(conn)?;
×
374
                                Ok(Some(DbValue::AnyOutput(Box::new(o.to_db_wallet_output()?))))
×
375
                            },
376
                            Err(e) => match e {
×
377
                                OutputManagerStorageError::DieselError(DieselError::NotFound) => Ok(None),
×
378
                                e => Err(e),
×
379
                            },
380
                        }
381
                    })
×
382
                },
383
                DbKey::SpentOutput(_s) => Err(OutputManagerStorageError::OperationNotSupported),
×
384
                DbKey::UnspentOutputHash(_h) => Err(OutputManagerStorageError::OperationNotSupported),
×
385
                DbKey::UnspentOutput(_k) => Err(OutputManagerStorageError::OperationNotSupported),
×
386
                DbKey::UnspentOutputs => Err(OutputManagerStorageError::OperationNotSupported),
×
387
                DbKey::SpentOutputs => Err(OutputManagerStorageError::OperationNotSupported),
×
388
                DbKey::InvalidOutputs => Err(OutputManagerStorageError::OperationNotSupported),
×
389
                DbKey::TimeLockedUnspentOutputs(_) => Err(OutputManagerStorageError::OperationNotSupported),
×
390
                DbKey::KnownOneSidedPaymentScripts => Err(OutputManagerStorageError::OperationNotSupported),
×
391
                DbKey::OutputsByTxIdAndStatus(_, _) => Err(OutputManagerStorageError::OperationNotSupported),
×
392
            },
393
        };
394
        if start.elapsed().as_millis() > 0 {
×
395
            trace!(
×
396
                target: LOG_TARGET,
×
397
                "sqlite profile - write {}: lock {} + db_op {} = {} ms",
×
398
                msg,
399
                acquire_lock.as_millis(),
×
400
                (start.elapsed() - acquire_lock).as_millis(),
×
401
                start.elapsed().as_millis()
×
402
            );
403
        }
×
404

405
        result
×
406
    }
×
407

408
    fn fetch_pending_incoming_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
409
        let start = Instant::now();
×
410
        let mut conn = self.database_connection.get_pooled_connection()?;
×
411
        let acquire_lock = start.elapsed();
×
412

413
        let outputs = OutputSql::index_status(
×
414
            vec![
×
415
                OutputStatus::EncumberedToBeReceived,
×
416
                OutputStatus::UnspentMinedUnconfirmed,
×
417
                OutputStatus::ShortTermEncumberedToBeReceived,
×
418
            ],
419
            &mut conn,
×
420
        )?;
×
421

422
        if start.elapsed().as_millis() > 0 {
×
423
            trace!(
×
424
                target: LOG_TARGET,
×
425
                "sqlite profile - fetch_pending_incoming_outputs: lock {} + db_op {} = {} ms",
×
426
                acquire_lock.as_millis(),
×
427
                (start.elapsed() - acquire_lock).as_millis(),
×
428
                start.elapsed().as_millis()
×
429
            );
430
        }
×
431
        outputs
×
432
            .iter()
×
433
            .map(|o| o.clone().to_db_wallet_output())
×
434
            .collect::<Result<Vec<_>, _>>()
×
435
    }
×
436

437
    // Perform a batch update of the received outputs; this is more efficient than updating each output individually.
438
    fn set_received_outputs_mined_height_and_statuses(
×
439
        &self,
×
440
        updates: Vec<ReceivedOutputInfoForBatch>,
×
441
    ) -> Result<(), OutputManagerStorageError> {
×
442
        let start = Instant::now();
×
443
        let mut conn = self.database_connection.get_pooled_connection()?;
×
444
        let acquire_lock = start.elapsed();
×
445

446
        let commitments: Vec<CompressedCommitment> = updates.iter().map(|update| update.commitment.clone()).collect();
×
447
        if !OutputSql::verify_outputs_exist(&commitments, &mut conn)? {
×
448
            return Err(OutputManagerStorageError::ValuesNotFound);
×
449
        }
×
450

451
        // This SQL query is a dummy `INSERT INTO` statement combined with an `ON CONFLICT` clause and `UPDATE` action.
452
        // It specifies what action should be taken if a unique constraint violation occurs during the execution of the
453
        // `INSERT INTO` statement. The `INSERT INTO` statement must list all columns that cannot be NULL should it
454
        // succeed. We provide `commitment` values that will cause a unique constraint violation, triggering the
455
        // `ON CONFLICT` clause. The `ON CONFLICT` clause ensures that if a row with a matching commitment already
456
        // exists, the specified columns (`mined_height`, `mined_in_block`, `status`, `mined_timestamp`,
457
        // `marked_deleted_at_height`, `marked_deleted_in_block`, `last_validation_timestamp`) will be updated with the
458
        // provided values. The `UPDATE` action updates the existing row with the new values provided by the
459
        // `INSERT INTO` statement. The `excluded` keyword refers to the new data being inserted or updated and allows
460
        // accessing the values provided in the `VALUES` clause of the `INSERT INTO` statement.
461
        // Note:
462
        //   `diesel` does not support batch updates, so we have to do it manually. For example, this
463
        //   `diesel::insert_into(...).values(&...).on_conflict(outputs::hash).do_update().set((...)).execute(&mut
464
        // conn)?;`   errors with
465
        //   `the trait bound `BatchInsert<Vec<....>` is not satisfied`
466
        let mut query = String::from(
×
467
            "INSERT INTO outputs ( commitment, mined_height, mined_in_block, status, mined_timestamp, spending_key, \
468
             value, output_type, maturity, hash, script, input_data, script_private_key, sender_offset_public_key, \
469
             metadata_signature_ephemeral_commitment, metadata_signature_ephemeral_pubkey, metadata_signature_u_a, \
470
             metadata_signature_u_x, metadata_signature_u_y, spending_priority, covenant, encrypted_data, \
471
             minimum_value_promise
472
            )
473
             VALUES ",
474
        );
475

476
        query.push_str(
×
477
            &updates
×
478
                .iter()
×
479
                .map(|update| {
×
480
                    format!(
×
481
                        "(x'{}', {}, x'{}', {}, '{}', 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)",
×
482
                        update.commitment.to_hex(),
×
483
                        update.mined_height as i64,
×
484
                        update.mined_in_block.to_hex(),
×
485
                        if update.confirmed {
×
486
                            OutputStatus::Unspent as i32
×
487
                        } else {
488
                            OutputStatus::UnspentMinedUnconfirmed as i32
×
489
                        },
490
                        if let Some(val) = DateTime::from_timestamp(update.mined_timestamp as i64, 0) {
×
491
                            val.naive_utc().to_string()
×
492
                        } else {
493
                            "NULL".to_string()
×
494
                        },
495
                    )
496
                })
×
497
                .collect::<Vec<String>>()
×
498
                .join(", "),
×
499
        );
500

501
        query.push_str(
×
502
            " ON CONFLICT (commitment) DO UPDATE SET mined_height = excluded.mined_height, mined_in_block = \
×
503
             excluded.mined_in_block, status = excluded.status, mined_timestamp = excluded.mined_timestamp, \
×
504
             marked_deleted_at_height = NULL, marked_deleted_in_block = NULL, last_validation_timestamp = NULL",
×
505
        );
506

507
        conn.batch_execute(&query)?;
×
508

509
        if start.elapsed().as_millis() > 0 {
×
510
            trace!(
×
511
                target: LOG_TARGET,
×
512
                "sqlite profile - set_received_outputs_mined_height_and_statuses: lock {} + db_op {} = {} ms \
×
513
                ({} outputs)",
×
514
                acquire_lock.as_millis(),
×
515
                (start.elapsed() - acquire_lock).as_millis(),
×
516
                start.elapsed().as_millis(),
×
517
                updates.len()
×
518
            );
519
        }
×
520

521
        Ok(())
×
522
    }
×
523

524
    fn set_outputs_to_unmined_and_invalid(&self, hashes: Vec<FixedHash>) -> Result<(), OutputManagerStorageError> {
×
525
        let start = Instant::now();
×
526
        let mut conn = self.database_connection.get_pooled_connection()?;
×
527
        let acquire_lock = start.elapsed();
×
528

529
        diesel::update(outputs::table.filter(outputs::hash.eq_any(hashes.iter().map(|hash| hash.to_vec()))))
×
530
            .set((
×
531
                outputs::mined_height.eq::<Option<i64>>(None),
×
532
                outputs::mined_in_block.eq::<Option<Vec<u8>>>(None),
×
533
                outputs::status.eq(OutputStatus::Invalid as i32),
×
534
                outputs::mined_timestamp.eq::<Option<NaiveDateTime>>(None),
×
535
                outputs::marked_deleted_at_height.eq::<Option<i64>>(None),
×
536
                outputs::marked_deleted_in_block.eq::<Option<Vec<u8>>>(None),
×
537
            ))
×
538
            .execute(&mut conn)
×
539
            .num_rows_affected_or_not_found(hashes.len())?;
×
540

541
        if start.elapsed().as_millis() > 0 {
×
542
            trace!(
×
543
                target: LOG_TARGET,
×
544
                "sqlite profile - set_outputs_to_unmined_and_invalid: lock {} + db_op {} = {} ms ({} outputs)",
×
545
                acquire_lock.as_millis(),
×
546
                (start.elapsed() - acquire_lock).as_millis(),
×
547
                start.elapsed().as_millis(),
×
548
                hashes.len()
×
549
            );
550
        }
×
551

552
        Ok(())
×
553
    }
×
554

555
    fn get_last_scanned_height(&self) -> Result<Option<u64>, OutputManagerStorageError> {
×
556
        let start = Instant::now();
×
557
        let mut conn = self.database_connection.get_pooled_connection()?;
×
558
        let acquire_lock = start.elapsed();
×
559

560
        let last_scanned_height: Option<i64> = scanned_blocks::table
×
561
            .order_by(scanned_blocks::height.desc())
×
562
            .select(scanned_blocks::height)
×
563
            .first(&mut conn)
×
564
            .optional()?;
×
565
        if start.elapsed().as_millis() > 0 {
×
566
            trace!(
×
567
                target: LOG_TARGET,
×
568
                "sqlite profile - get_last_scanned_height: lock {} + db_op {} = {} ms",
×
569
                acquire_lock.as_millis(),
×
570
                (start.elapsed() - acquire_lock).as_millis(),
×
571
                start.elapsed().as_millis()
×
572
            );
573
        }
×
574

575
        Ok(last_scanned_height.map(|h| h as u64))
×
576
    }
×
577

578
    fn save_last_scanned_height(
×
579
        &self,
×
580
        scanned_block: crate::utxo_scanner_service::service::ScannedBlock,
×
581
    ) -> Result<(), OutputManagerStorageError> {
×
582
        let mut conn = self.database_connection.get_pooled_connection()?;
×
583
        Ok(crate::storage::sqlite_db::scanned_blocks::ScannedBlockSql::from(scanned_block).commit(&mut conn)?)
×
584
    }
×
585

586
    fn set_outputs_to_be_revalidated(&self) -> Result<(), OutputManagerStorageError> {
×
587
        let start = Instant::now();
×
588
        let mut conn = self.database_connection.get_pooled_connection()?;
×
589
        let acquire_lock = start.elapsed();
×
590
        let result = diesel::update(outputs::table)
×
591
            .set((
×
592
                outputs::mined_height.eq::<Option<i64>>(None),
×
593
                outputs::mined_in_block.eq::<Option<Vec<u8>>>(None),
×
594
                outputs::status.eq(OutputStatus::Invalid as i32),
×
595
                outputs::mined_timestamp.eq::<Option<NaiveDateTime>>(None),
×
596
                outputs::marked_deleted_at_height.eq::<Option<i64>>(None),
×
597
                outputs::marked_deleted_in_block.eq::<Option<Vec<u8>>>(None),
×
598
            ))
×
599
            .execute(&mut conn)?;
×
600

601
        trace!(target: LOG_TARGET, "rows updated: {result:?}");
×
602
        if start.elapsed().as_millis() > 0 {
×
603
            trace!(
×
604
                target: LOG_TARGET,
×
605
                "sqlite profile - set_outputs_to_be_revalidated: lock {} + db_op {} = {} ms",
×
606
                acquire_lock.as_millis(),
×
607
                (start.elapsed() - acquire_lock).as_millis(),
×
608
                start.elapsed().as_millis()
×
609
            );
610
        }
×
611

612
        Ok(())
×
613
    }
×
614

615
    fn update_last_validation_timestamps(
×
616
        &self,
×
617
        commitments: Vec<CompressedCommitment>,
×
618
    ) -> Result<(), OutputManagerStorageError> {
×
619
        let start = Instant::now();
×
620
        let mut conn = self.database_connection.get_pooled_connection()?;
×
621
        let acquire_lock = start.elapsed();
×
622

623
        if !OutputSql::verify_outputs_exist(&commitments, &mut conn)? {
×
624
            return Err(OutputManagerStorageError::ValuesNotFound);
×
625
        }
×
626

627
        let last_validation_timestamp = Utc::now().naive_utc();
×
628

629
        // Three queries were evaluated to determine the most efficient way to update the last validation timestamp
630
        // during system-level stress testing:
631
        // - Using `diesel`:
632
        //   - `diesel::update(outputs::table.filter(outputs::hash.eq_any(hashes)).set(...).execute(&mut conn)`
633
        //   - Note: `diesel` does not support batch updates, so we have to do it manually.
634
        // - Using a raw query that mimicked the `diesel` query:
635
        //   - `UPDATE outputs SET last_validation_timestamp = '{}' WHERE hash IN ({})`
636
        //   - 20% faster than `diesel` on average
637
        // - Using a raw query with a batch insert (as implemented below):
638
        //   - `INSERT INTO outputs (..) VALUES (...) ON CONFLICT (commitment) DO UPDATE SET ...`
639
        //   - 1011% faster than `diesel` on average
640

641
        let mut query = String::from(
×
642
            "INSERT INTO outputs ( commitment, last_validation_timestamp, mined_height, mined_in_block, status, \
643
             mined_timestamp, spending_key, value, output_type, maturity, hash, script, input_data, \
644
             script_private_key, sender_offset_public_key, metadata_signature_ephemeral_commitment, \
645
             metadata_signature_ephemeral_pubkey, metadata_signature_u_a, metadata_signature_u_x, \
646
             metadata_signature_u_y, spending_priority, covenant, encrypted_data, minimum_value_promise
647
                    )
648
                     VALUES ",
649
        );
650

651
        query.push_str(
×
652
            &commitments
×
653
                .iter()
×
654
                .map(|commitment| {
×
655
                    format!(
×
656
                        "(x'{}', '{}', 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)",
×
657
                        commitment.to_hex(),
×
658
                        last_validation_timestamp,
659
                    )
660
                })
×
661
                .collect::<Vec<String>>()
×
662
                .join(", "),
×
663
        );
664

665
        query.push_str(
×
666
            " ON CONFLICT (commitment) DO UPDATE SET last_validation_timestamp = excluded.last_validation_timestamp",
×
667
        );
668

669
        conn.batch_execute(&query)?;
×
670

671
        if start.elapsed().as_millis() > 0 {
×
672
            trace!(
×
673
                target: LOG_TARGET,
×
674
                "sqlite profile - update_last_validation_timestamps: lock {} + db_op {} = {} ms ({} outputs)",
×
675
                acquire_lock.as_millis(),
×
676
                (start.elapsed() - acquire_lock).as_millis(),
×
677
                start.elapsed().as_millis(),
×
678
                commitments.len(),
×
679
            );
680
        }
×
681

682
        Ok(())
×
683
    }
×
684

685
    // Perform a batch update of the spent outputs; this is more efficient than updating each output individually.
686
    fn mark_outputs_as_spent(&self, updates: Vec<SpentOutputInfoForBatch>) -> Result<(), OutputManagerStorageError> {
×
687
        let start = Instant::now();
×
688
        let mut conn = self.database_connection.get_pooled_connection()?;
×
689
        let acquire_lock = start.elapsed();
×
690

691
        let commitments: Vec<CompressedCommitment> = updates.iter().map(|update| update.commitment.clone()).collect();
×
692
        if !OutputSql::verify_outputs_exist(&commitments, &mut conn)? {
×
693
            return Err(OutputManagerStorageError::ValuesNotFound);
×
694
        }
×
695

696
        // This SQL query is a dummy `INSERT INTO` statement combined with an `ON CONFLICT` clause and `UPDATE` action.
697
        // It specifies what action should be taken if a unique constraint violation occurs during the execution of the
698
        // `INSERT INTO` statement. The `INSERT INTO` statement must list all columns that cannot be NULL should it
699
        // succeed. We provide `commitment` values that will cause a unique constraint violation, triggering the
700
        // `ON CONFLICT` clause. The `ON CONFLICT` clause ensures that if a row with a matching commitment already
701
        // exists, the specified columns (`mined_height`, `mined_in_block`, `status`, `mined_timestamp`,
702
        // `marked_deleted_at_height`, `marked_deleted_in_block`, `last_validation_timestamp`) will be updated with the
703
        // provided values. The `UPDATE` action updates the existing row with the new values provided by the
704
        // `INSERT INTO` statement. The `excluded` keyword refers to the new data being inserted or updated and allows
705
        // accessing the values provided in the `VALUES` clause of the `INSERT INTO` statement.
706
        // Note:
707
        //   `diesel` does not support batch updates, so we have to do it manually. For example, this
708
        //   `diesel::insert_into(...).values(&...).on_conflict(outputs::hash).do_update().set((...)).execute(&mut
709
        // conn)?;`   errors with
710
        //   `the trait bound `BatchInsert<Vec<....>` is not satisfied`
711
        let mut query = String::from(
×
712
            "INSERT INTO outputs ( commitment, marked_deleted_at_height, marked_deleted_in_block, status, \
713
             mined_height, mined_in_block, mined_timestamp, spending_key, value, output_type, maturity, hash, script, \
714
             input_data, script_private_key, sender_offset_public_key, metadata_signature_ephemeral_commitment, \
715
             metadata_signature_ephemeral_pubkey, metadata_signature_u_a, metadata_signature_u_x, \
716
             metadata_signature_u_y, spending_priority, covenant, encrypted_data, minimum_value_promise ) VALUES ",
717
        );
718

719
        query.push_str(
×
720
            &updates
×
721
                .iter()
×
722
                .map(|update| {
×
723
                    format!(
×
724
                        "(x'{}', {}, x'{}', {}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)",
×
725
                        update.commitment.to_hex(),
×
726
                        update.mark_deleted_at_height as i64,
×
727
                        update.mark_deleted_in_block.to_hex(),
×
728
                        if update.confirmed {
×
729
                            OutputStatus::Spent as i32
×
730
                        } else {
731
                            OutputStatus::SpentMinedUnconfirmed as i32
×
732
                        }
733
                    )
734
                })
×
735
                .collect::<Vec<String>>()
×
736
                .join(", "),
×
737
        );
738

739
        query.push_str(
×
740
            " ON CONFLICT (commitment) DO UPDATE SET marked_deleted_at_height = excluded.marked_deleted_at_height, \
×
741
             marked_deleted_in_block = excluded.marked_deleted_in_block, status = excluded.status",
×
742
        );
743

744
        conn.batch_execute(&query)?;
×
745

746
        if start.elapsed().as_millis() > 0 {
×
747
            trace!(
×
748
                target: LOG_TARGET,
×
749
                "sqlite profile - mark_outputs_as_spent: lock {} + db_op {} = {} ms ({} outputs)",
×
750
                acquire_lock.as_millis(),
×
751
                (start.elapsed() - acquire_lock).as_millis(),
×
752
                start.elapsed().as_millis(),
×
753
                updates.len()
×
754
            );
755
        }
×
756

757
        Ok(())
×
758
    }
×
759

760
    fn mark_outputs_as_unspent(&self, hashes: Vec<(FixedHash, bool)>) -> Result<(), OutputManagerStorageError> {
×
761
        let start = Instant::now();
×
762
        let mut conn = self.database_connection.get_pooled_connection()?;
×
763
        let acquire_lock = start.elapsed();
×
764
        // Split out the confirmed and unconfirmed outputs so that we can handle each of them as a separate batch
765
        // operation
766
        let confirmed_hashes = hashes
×
767
            .iter()
×
768
            .filter(|(_hash, confirmed)| *confirmed)
×
769
            .map(|(hash, _confirmed)| hash)
×
770
            .collect::<Vec<_>>();
×
771
        let unconfirmed_hashes = hashes
×
772
            .iter()
×
773
            .filter(|(_hash, confirmed)| !*confirmed)
×
774
            .map(|(hash, _confirmed)| hash)
×
775
            .collect::<Vec<_>>();
×
776

777
        if !confirmed_hashes.is_empty() {
×
778
            diesel::update(
×
779
                outputs::table.filter(outputs::hash.eq_any(confirmed_hashes.iter().map(|hash| hash.to_vec()))),
×
780
            )
781
            .set((
×
782
                outputs::marked_deleted_at_height.eq::<Option<i64>>(None),
×
783
                outputs::marked_deleted_in_block.eq::<Option<Vec<u8>>>(None),
×
784
                outputs::status.eq(OutputStatus::Unspent as i32),
×
785
            ))
×
786
            .execute(&mut conn)
×
787
            .num_rows_affected_or_not_found(confirmed_hashes.len())?;
×
788
        }
×
789

790
        if !unconfirmed_hashes.is_empty() {
×
791
            diesel::update(
×
792
                outputs::table.filter(outputs::hash.eq_any(unconfirmed_hashes.iter().map(|hash| hash.to_vec()))),
×
793
            )
794
            .set((
×
795
                outputs::marked_deleted_at_height.eq::<Option<i64>>(None),
×
796
                outputs::marked_deleted_in_block.eq::<Option<Vec<u8>>>(None),
×
797
                outputs::status.eq(OutputStatus::UnspentMinedUnconfirmed as i32),
×
798
            ))
×
799
            .execute(&mut conn)
×
800
            .num_rows_affected_or_not_found(unconfirmed_hashes.len())?;
×
801
        }
×
802

803
        debug!(target: LOG_TARGET, "mark_outputs_as_unspent: Unspent {}, UnspentMinedUnconfirmed {}", confirmed_hashes.len(), unconfirmed_hashes.len());
×
804
        if start.elapsed().as_millis() > 0 {
×
805
            trace!(
×
806
                target: LOG_TARGET,
×
807
                "sqlite profile - mark_outputs_as_unspent: lock {} + db_op {} = {} ms (Unspent {}, UnspentMinedUnconfirmed {})",
×
808
                acquire_lock.as_millis(),
×
809
                (start.elapsed() - acquire_lock).as_millis(),
×
810
                start.elapsed().as_millis(),
×
811
                confirmed_hashes.len(), unconfirmed_hashes.len()
×
812
            );
813
        }
×
814

815
        Ok(())
×
816
    }
×
817

818
    fn short_term_encumber_outputs(
×
819
        &self,
×
820
        tx_id: TxId,
×
821
        outputs_to_send: &[DbWalletOutput],
×
822
        outputs_to_receive: &[DbWalletOutput],
×
823
    ) -> Result<(), OutputManagerStorageError> {
×
824
        let start = Instant::now();
×
825
        let mut conn = self.database_connection.get_pooled_connection()?;
×
826
        let acquire_lock = start.elapsed();
×
827

828
        let mut commitments = Vec::with_capacity(outputs_to_send.len());
×
829
        for output in outputs_to_send {
×
830
            commitments.push(output.commitment.as_bytes());
×
831
        }
×
832
        conn.transaction::<_, _, _>(|conn| {
×
833
            // Any output in the list without the `Unspent` status will invalidate the encumberance
834
            if !OutputSql::find_by_commitments_excluding_status(commitments.clone(), OutputStatus::Unspent, conn)?
×
835
                .is_empty()
×
836
            {
837
                return Err(OutputManagerStorageError::OutputAlreadySpent);
×
838
            };
×
839

840
            let count = OutputSql::update_by_commitments(
×
841
                commitments,
×
842
                UpdateOutput {
×
843
                    status: Some(OutputStatus::ShortTermEncumberedToBeSpent),
×
844
                    spent_in_tx_id: Some(Some(tx_id)),
×
845
                    ..Default::default()
×
846
                },
×
847
                conn,
×
848
            )?;
×
849
            if count != outputs_to_send.len() {
×
850
                let msg = format!(
×
851
                    "Inconsistent short term encumbering! Lengths do not match - {} vs {}",
×
852
                    count,
853
                    outputs_to_send.len()
×
854
                );
855
                error!(target: LOG_TARGET, "{msg}");
×
856
                return Err(OutputManagerStorageError::UnexpectedResult(msg));
×
857
            }
×
858

859
            Ok(())
×
860
        })?;
×
861

862
        for co in outputs_to_receive {
×
863
            let new_output = NewOutputSql::new(
×
864
                co.clone(),
×
865
                Some(OutputStatus::ShortTermEncumberedToBeReceived),
×
866
                Some(tx_id),
×
867
            )?;
×
868
            new_output.commit(&mut conn)?;
×
869
        }
870
        if start.elapsed().as_millis() > 0 {
×
871
            trace!(
×
872
                target: LOG_TARGET,
×
873
                "sqlite profile - short_term_encumber_outputs (TxId: {}): lock {} + db_op {} = {} ms",
×
874
                tx_id,
875
                acquire_lock.as_millis(),
×
876
                (start.elapsed() - acquire_lock).as_millis(),
×
877
                start.elapsed().as_millis()
×
878
            );
879
        }
×
880

881
        Ok(())
×
882
    }
×
883

884
    fn confirm_encumbered_outputs(
×
885
        &self,
×
886
        tx_id: TxId,
×
887
        tx_id_update: Option<TxId>,
×
888
        change_outputs_to_add: &[DbWalletOutput],
×
889
    ) -> Result<(), OutputManagerStorageError> {
×
890
        let start = Instant::now();
×
891
        let mut tx_id = tx_id;
×
892
        let mut conn = self.database_connection.get_pooled_connection()?;
×
893
        let acquire_lock = start.elapsed();
×
894

895
        conn.immediate_transaction::<_, OutputManagerStorageError, _>(|conn| {
×
896
            if let Some(tx_id_new) = tx_id_update {
×
897
                replace_tx_id(conn, tx_id, tx_id_new)?;
×
898
                tx_id = tx_id_new;
×
899
            }
×
900

901
            // Add the change outputs to be correct
902
            for output in change_outputs_to_add {
×
903
                let new_output =
×
904
                    NewOutputSql::new(output.clone(), Some(OutputStatus::EncumberedToBeReceived), Some(tx_id))?;
×
905
                new_output.commit(conn)?;
×
906
            }
907

908
            update_outputs_with_tx_id_and_status_to_new_status(
×
909
                conn,
×
910
                tx_id,
×
911
                OutputStatus::ShortTermEncumberedToBeReceived,
×
912
                OutputStatus::EncumberedToBeReceived,
×
913
            )?;
×
914

915
            update_outputs_with_tx_id_and_status_to_new_status(
×
916
                conn,
×
917
                tx_id,
×
918
                OutputStatus::ShortTermEncumberedToBeSpent,
×
919
                OutputStatus::EncumberedToBeSpent,
×
920
            )
921
        })?;
×
922

923
        if start.elapsed().as_millis() > 0 {
×
924
            trace!(
×
925
                target: LOG_TARGET,
×
926
                "sqlite profile - confirm_encumbered_outputs (TxId: {}): lock {} + db_op {} = {} ms",
×
927
                tx_id,
928
                acquire_lock.as_millis(),
×
929
                (start.elapsed() - acquire_lock).as_millis(),
×
930
                start.elapsed().as_millis()
×
931
            );
932
        }
×
933

934
        Ok(())
×
935
    }
×
936

937
    fn clear_short_term_encumberances(&self) -> Result<(), OutputManagerStorageError> {
×
938
        let start = Instant::now();
×
939
        let mut conn = self.database_connection.get_pooled_connection()?;
×
940
        let acquire_lock = start.elapsed();
×
941

942
        conn.transaction::<_, _, _>(|conn| {
×
943
            diesel::update(
×
944
                outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeReceived as i32)),
×
945
            )
946
            .set((
×
947
                outputs::status.eq(OutputStatus::CancelledInbound as i32),
×
948
                outputs::last_validation_timestamp
×
949
                    .eq(DateTime::from_timestamp(Utc::now().timestamp(), 0).unwrap().naive_utc()),
×
950
            ))
×
951
            .execute(conn)?;
×
952

953
            diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeSpent as i32)))
×
954
                .set((outputs::status.eq(OutputStatus::Unspent as i32),))
×
955
                .execute(conn)
×
956
        })?;
×
957

958
        if start.elapsed().as_millis() > 0 {
×
959
            trace!(
×
960
                target: LOG_TARGET,
×
961
                "sqlite profile - clear_short_term_encumberances: lock {} + db_op {} = {} ms",
×
962
                acquire_lock.as_millis(),
×
963
                (start.elapsed() - acquire_lock).as_millis(),
×
964
                start.elapsed().as_millis()
×
965
            );
966
        }
×
967

968
        Ok(())
×
969
    }
×
970

971
    fn get_last_mined_output(&self) -> Result<Option<DbWalletOutput>, OutputManagerStorageError> {
×
972
        let start = Instant::now();
×
973
        let mut conn = self.database_connection.get_pooled_connection()?;
×
974
        let acquire_lock = start.elapsed();
×
975
        let output = OutputSql::first_by_mined_height_desc(&mut conn)?;
×
976
        if start.elapsed().as_millis() > 0 {
×
977
            trace!(
×
978
                target: LOG_TARGET,
×
979
                "sqlite profile - get_last_mined_output: lock {} + db_op {} = {} ms",
×
980
                acquire_lock.as_millis(),
×
981
                (start.elapsed() - acquire_lock).as_millis(),
×
982
                start.elapsed().as_millis()
×
983
            );
984
        }
×
985
        match output {
×
986
            Some(o) => Ok(Some(o.to_db_wallet_output()?)),
×
987
            None => Ok(None),
×
988
        }
989
    }
×
990

991
    fn get_last_spent_output(&self) -> Result<Option<DbWalletOutput>, OutputManagerStorageError> {
×
992
        let start = Instant::now();
×
993
        let mut conn = self.database_connection.get_pooled_connection()?;
×
994
        let acquire_lock = start.elapsed();
×
995

996
        let output = OutputSql::first_by_marked_deleted_height_desc(&mut conn)?;
×
997
        if start.elapsed().as_millis() > 0 {
×
998
            trace!(
×
999
                target: LOG_TARGET,
×
1000
                "sqlite profile - get_last_spent_output: lock {} + db_op {} = {} ms",
×
1001
                acquire_lock.as_millis(),
×
1002
                (start.elapsed() - acquire_lock).as_millis(),
×
1003
                start.elapsed().as_millis()
×
1004
            );
1005
        }
×
1006
        match output {
×
1007
            Some(o) => Ok(Some(o.to_db_wallet_output()?)),
×
1008
            None => Ok(None),
×
1009
        }
1010
    }
×
1011

1012
    fn get_balance(
×
1013
        &self,
×
1014
        current_tip_for_time_lock_calculation: Option<u64>,
×
1015
    ) -> Result<Balance, OutputManagerStorageError> {
×
1016
        let start = Instant::now();
×
1017
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1018
        let acquire_lock = start.elapsed();
×
1019

1020
        let result = OutputSql::get_balance(current_tip_for_time_lock_calculation, &mut conn);
×
1021
        if start.elapsed().as_millis() > 0 {
×
1022
            trace!(
×
1023
                target: LOG_TARGET,
×
1024
                "sqlite profile - get_balance: lock {} + db_op {} = {} ms",
×
1025
                acquire_lock.as_millis(),
×
1026
                (start.elapsed() - acquire_lock).as_millis(),
×
1027
                start.elapsed().as_millis()
×
1028
            );
1029
        }
×
1030
        result
×
1031
    }
×
1032

NEW
1033
    fn count_outputs_in_ranges(
×
NEW
1034
        &self,
×
NEW
1035
        ranges: Vec<Range<u64>>,
×
NEW
1036
        tip_height: Option<u64>,
×
NEW
1037
    ) -> Result<Vec<CoinBucket>, OutputManagerStorageError> {
×
NEW
1038
        let start = Instant::now();
×
NEW
1039
        let mut conn = self.database_connection.get_pooled_connection()?;
×
NEW
1040
        let acquire_lock = start.elapsed();
×
1041

NEW
1042
        let result =
×
NEW
1043
            OutputSql::count_outputs_in_ranges(&UtxoSelectionCriteria::default(), &ranges, tip_height, &mut conn);
×
NEW
1044
        if start.elapsed().as_millis() > 0 {
×
NEW
1045
            trace!(
×
NEW
1046
                target: LOG_TARGET,
×
NEW
1047
                "sqlite profile - count_outputs_in_ranges: lock {} + db_op {} = {} ms",
×
NEW
1048
                acquire_lock.as_millis(),
×
NEW
1049
                (start.elapsed() - acquire_lock).as_millis(),
×
NEW
1050
                start.elapsed().as_millis()
×
1051
            );
NEW
1052
        }
×
NEW
1053
        result
×
NEW
1054
    }
×
1055

1056
    fn get_balance_payment_id(
×
1057
        &self,
×
1058
        current_tip_for_time_lock_calculation: Option<u64>,
×
1059
        payment_id: Vec<u8>,
×
1060
    ) -> Result<Balance, OutputManagerStorageError> {
×
1061
        let start = Instant::now();
×
1062
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1063
        let acquire_lock = start.elapsed();
×
1064

1065
        let result = OutputSql::get_balance_payment_id(current_tip_for_time_lock_calculation, payment_id, &mut conn);
×
1066
        if start.elapsed().as_millis() > 0 {
×
1067
            trace!(
×
1068
                target: LOG_TARGET,
×
1069
                "sqlite profile - get_balance_payment_id: lock {} + db_op {} = {} ms",
×
1070
                acquire_lock.as_millis(),
×
1071
                (start.elapsed() - acquire_lock).as_millis(),
×
1072
                start.elapsed().as_millis()
×
1073
            );
1074
        }
×
1075
        result
×
1076
    }
×
1077

1078
    fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
×
1079
        let start = Instant::now();
×
1080
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1081
        let acquire_lock = start.elapsed();
×
1082

1083
        conn.transaction::<_, _, _>(|conn| {
×
1084
            let outputs = OutputSql::find_by_tx_id_and_encumbered(tx_id, conn)?;
×
1085

1086
            if outputs.is_empty() {
×
1087
                return Err(OutputManagerStorageError::ValueNotFound);
×
1088
            }
×
1089

1090
            for output in &outputs {
×
1091
                if output.received_in_tx_id == Some(tx_id.as_i64_wrapped()) {
×
1092
                    info!(
×
1093
                        target: LOG_TARGET,
×
1094
                        "Cancelling pending inbound output with Commitment: {} - from TxId: {}",
×
1095
                        output.commitment.to_hex(),
×
1096
                        tx_id
1097
                    );
1098
                    output.update(
×
1099
                        UpdateOutput {
×
1100
                            status: Some(OutputStatus::CancelledInbound),
×
1101
                            last_validation_timestamp: Some(Some(
×
1102
                                DateTime::from_timestamp(Utc::now().timestamp(), 0).unwrap().naive_utc(),
×
1103
                            )),
×
1104
                            ..Default::default()
×
1105
                        },
×
1106
                        conn,
×
1107
                    )?;
×
1108
                } else if output.spent_in_tx_id == Some(tx_id.as_i64_wrapped()) {
×
1109
                    info!(
×
1110
                        target: LOG_TARGET,
×
1111
                        "Cancelling pending outbound output with Commitment: {} - from TxId: {}",
×
1112
                        output.commitment.to_hex(),
×
1113
                        tx_id
1114
                    );
1115
                    output.update(
×
1116
                        UpdateOutput {
×
1117
                            status: Some(OutputStatus::Unspent),
×
1118
                            spent_in_tx_id: Some(None),
×
1119
                            // We clear these so that the output will be revalidated the next time a validation is done.
×
1120
                            mined_height: Some(None),
×
1121
                            mined_in_block: Some(None),
×
1122
                            ..Default::default()
×
1123
                        },
×
1124
                        conn,
×
1125
                    )?;
×
1126
                } else {
×
1127
                    // can only be one of the two
×
1128
                }
×
1129
            }
1130

1131
            Ok(())
×
1132
        })?;
×
1133

1134
        if start.elapsed().as_millis() > 0 {
×
1135
            trace!(
×
1136
                target: LOG_TARGET,
×
1137
                "sqlite profile - cancel_pending_transaction: lock {} + db_op {} = {} ms",
×
1138
                acquire_lock.as_millis(),
×
1139
                (start.elapsed() - acquire_lock).as_millis(),
×
1140
                start.elapsed().as_millis()
×
1141
            );
1142
        }
×
1143

1144
        Ok(())
×
1145
    }
×
1146

1147
    // This is typically used by a receiver after the finalized transaction has been broadcast/returned by the sender
1148
    // as the sender has to finalize the signature that was partially constructed by the receiver
1149
    fn update_output_metadata_signature(&self, output: &TransactionOutput) -> Result<(), OutputManagerStorageError> {
×
1150
        let start = Instant::now();
×
1151
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1152
        let acquire_lock = start.elapsed();
×
1153

1154
        conn.transaction::<_, OutputManagerStorageError, _>(|conn| {
×
1155
            let db_output = OutputSql::find_by_commitment_and_cancelled(&output.commitment.to_vec(), false, conn)?;
×
1156
            db_output.update(
×
1157
                // Note: Only the `ephemeral_pubkey` and `u_y` portion needs to be updated at this time as the rest was
1158
                // already correct
1159
                UpdateOutput {
×
1160
                    metadata_signature_ephemeral_pubkey: Some(output.metadata_signature.ephemeral_pubkey().to_vec()),
×
1161
                    metadata_signature_u_y: Some(output.metadata_signature.u_y().to_vec()),
×
1162
                    hash: Some(output.hash().to_vec()),
×
1163
                    ..Default::default()
×
1164
                },
×
1165
                conn,
×
1166
            )?;
×
1167

1168
            Ok(())
×
1169
        })?;
×
1170
        if start.elapsed().as_millis() > 0 {
×
1171
            trace!(
×
1172
                target: LOG_TARGET,
×
1173
                "sqlite profile - update_output_metadata_signature: lock {} + db_op {} = {} ms",
×
1174
                acquire_lock.as_millis(),
×
1175
                (start.elapsed() - acquire_lock).as_millis(),
×
1176
                start.elapsed().as_millis()
×
1177
            );
1178
        }
×
1179

1180
        Ok(())
×
1181
    }
×
1182

1183
    fn revalidate_unspent_output(&self, commitment: &CompressedCommitment) -> Result<(), OutputManagerStorageError> {
×
1184
        let start = Instant::now();
×
1185
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1186
        let acquire_lock = start.elapsed();
×
1187

1188
        conn.transaction::<_, _, _>(|conn| {
×
1189
            let output = OutputSql::find_by_commitment_and_cancelled(&commitment.to_vec(), false, conn)?;
×
1190

1191
            if OutputStatus::try_from(output.status)? != OutputStatus::Invalid {
×
1192
                return Err(OutputManagerStorageError::ValuesNotFound);
×
1193
            }
×
1194
            output.update(
×
1195
                UpdateOutput {
×
1196
                    status: Some(OutputStatus::Unspent),
×
1197
                    ..Default::default()
×
1198
                },
×
1199
                conn,
×
1200
            )?;
×
1201

1202
            Ok(())
×
1203
        })?;
×
1204
        if start.elapsed().as_millis() > 0 {
×
1205
            trace!(
×
1206
                target: LOG_TARGET,
×
1207
                "sqlite profile - revalidate_unspent_output: lock {} + db_op {} = {} ms",
×
1208
                acquire_lock.as_millis(),
×
1209
                (start.elapsed() - acquire_lock).as_millis(),
×
1210
                start.elapsed().as_millis()
×
1211
            );
1212
        }
×
1213
        Ok(())
×
1214
    }
×
1215

1216
    fn reinstate_cancelled_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
×
1217
        let start = Instant::now();
×
1218
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1219
        let acquire_lock = start.elapsed();
×
1220

1221
        update_outputs_with_tx_id_and_status_to_new_status(
×
1222
            &mut conn,
×
1223
            tx_id,
×
1224
            OutputStatus::CancelledInbound,
×
1225
            OutputStatus::EncumberedToBeReceived,
×
1226
        )?;
×
1227

1228
        if start.elapsed().as_millis() > 0 {
×
1229
            trace!(
×
1230
                target: LOG_TARGET,
×
1231
                "sqlite profile - reinstate_cancelled_inbound_output: lock {} + db_op {} = {} ms",
×
1232
                acquire_lock.as_millis(),
×
1233
                (start.elapsed() - acquire_lock).as_millis(),
×
1234
                start.elapsed().as_millis()
×
1235
            );
1236
        }
×
1237
        Ok(())
×
1238
    }
×
1239

1240
    fn add_unvalidated_output(&self, output: DbWalletOutput, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
×
1241
        let start = Instant::now();
×
1242
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1243
        let acquire_lock = start.elapsed();
×
1244

1245
        if OutputSql::find_by_commitment_and_cancelled(&output.commitment.to_vec(), false, &mut conn).is_ok() {
×
1246
            return Err(OutputManagerStorageError::DuplicateOutput);
×
1247
        }
×
1248
        let new_output = NewOutputSql::new(output, Some(OutputStatus::UnspentMinedUnconfirmed), Some(tx_id))?;
×
1249
        new_output.commit(&mut conn)?;
×
1250

1251
        if start.elapsed().as_millis() > 0 {
×
1252
            trace!(
×
1253
                target: LOG_TARGET,
×
1254
                "sqlite profile - add_unvalidated_output: lock {} + db_op {} = {} ms",
×
1255
                acquire_lock.as_millis(),
×
1256
                (start.elapsed() - acquire_lock).as_millis(),
×
1257
                start.elapsed().as_millis()
×
1258
            );
1259
        }
×
1260
        Ok(())
×
1261
    }
×
1262

1263
    /// Retrieves UTXOs within a specified limited range with minimum target amount for spending
NEW
1264
    fn get_range_limited_outputs_for_spending(
×
NEW
1265
        &self,
×
NEW
1266
        selection_criteria: &UtxoSelectionCriteria,
×
NEW
1267
        tip_height: Option<u64>,
×
NEW
1268
    ) -> Result<(Vec<DbWalletOutput>, MicroMinotari), OutputManagerStorageError> {
×
NEW
1269
        let start = Instant::now();
×
NEW
1270
        let mut conn = self.database_connection.get_pooled_connection()?;
×
NEW
1271
        let acquire_lock = start.elapsed();
×
1272

NEW
1273
        let (outputs, total) =
×
NEW
1274
            OutputSql::get_range_limited_outputs_for_spending(selection_criteria, tip_height, &mut conn)?;
×
1275

NEW
1276
        trace!(
×
NEW
1277
            target: LOG_TARGET,
×
NEW
1278
            "sqlite profile - get_range_limited_outputs_for_spending: lock {} + db_op {} = {} ms",
×
NEW
1279
            acquire_lock.as_millis(),
×
NEW
1280
            (start.elapsed() - acquire_lock).as_millis(),
×
NEW
1281
            start.elapsed().as_millis()
×
1282
        );
1283
        Ok((
NEW
1284
            outputs
×
NEW
1285
                .iter()
×
NEW
1286
                .map(|o| o.clone().to_db_wallet_output())
×
NEW
1287
                .collect::<Result<Vec<_>, _>>()?,
×
NEW
1288
            total,
×
1289
        ))
NEW
1290
    }
×
1291

1292
    /// Retrieves UTXOs than can be spent, sorted by priority, then value from smallest to largest.
1293
    fn fetch_unspent_outputs_for_spending(
×
1294
        &self,
×
1295
        selection_criteria: &UtxoSelectionCriteria,
×
1296
        amount: u64,
×
1297
        tip_height: Option<u64>,
×
1298
    ) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
1299
        let start = Instant::now();
×
1300
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1301
        let acquire_lock = start.elapsed();
×
1302

1303
        let outputs = OutputSql::fetch_unspent_outputs_for_spending(selection_criteria, amount, tip_height, &mut conn)?;
×
1304

1305
        trace!(
×
1306
            target: LOG_TARGET,
×
1307
            "sqlite profile - fetch_unspent_outputs_for_spending: lock {} + db_op {} = {} ms",
×
1308
            acquire_lock.as_millis(),
×
1309
            (start.elapsed() - acquire_lock).as_millis(),
×
1310
            start.elapsed().as_millis()
×
1311
        );
1312
        outputs
×
1313
            .iter()
×
1314
            .map(|o| o.clone().to_db_wallet_output())
×
1315
            .collect::<Result<Vec<_>, _>>()
×
1316
    }
×
1317

1318
    fn fetch_outputs_by_tx_id(&self, tx_id: TxId) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
1319
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1320
        let outputs = OutputSql::find_by_tx_id(tx_id, &mut conn)?;
×
1321

1322
        outputs
×
1323
            .iter()
×
1324
            .map(|o| o.clone().to_db_wallet_output())
×
1325
            .collect::<Result<Vec<_>, _>>()
×
1326
    }
×
1327

1328
    fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
×
1329
        let mut conn = self.database_connection.get_pooled_connection()?;
×
1330
        Ok(OutputSql::fetch_outputs_by_query(q, &mut conn)?
×
1331
            .into_iter()
×
1332
            .filter_map(|x| {
×
1333
                x.to_db_wallet_output()
×
1334
                    .inspect_err(|e| {
×
1335
                        error!(
×
1336
                            target: LOG_TARGET,
×
1337
                            "failed to convert `OutputSql` to `DbWalletOutput`: {e:#?}"
×
1338
                        );
1339
                    })
×
1340
                    .ok()
×
1341
            })
×
1342
            .collect())
×
1343
    }
×
1344
}
1345

1346
fn replace_tx_id(
×
1347
    conn: &mut SqliteConnection,
×
1348
    tx_id_old: TxId,
×
1349
    tx_id_new: TxId,
×
1350
) -> Result<(), OutputManagerStorageError> {
×
1351
    let tx_id_new_i64 = tx_id_new.as_i64_wrapped();
×
1352
    let tx_id_old_i64 = tx_id_old.as_i64_wrapped();
×
1353
    debug!(target: LOG_TARGET, "Replacing temp tx_id in outputs '{tx_id_old_i64}' with '{tx_id_new_i64}'");
×
1354

1355
    diesel::update(outputs::table.filter(outputs::spent_in_tx_id.eq(Some(tx_id_old_i64))))
×
1356
        .set(outputs::spent_in_tx_id.eq(Some(tx_id_new_i64)))
×
1357
        .execute(conn)?;
×
1358

1359
    diesel::update(outputs::table.filter(outputs::received_in_tx_id.eq(Some(tx_id_old_i64))))
×
1360
        .set(outputs::received_in_tx_id.eq(Some(tx_id_new_i64)))
×
1361
        .execute(conn)?;
×
1362

1363
    Ok(())
×
1364
}
×
1365

1366
/// These are the fields to be set for the received outputs batch mode update
1367
#[derive(Clone, Debug, Default)]
1368
pub struct ReceivedOutputInfoForBatch {
1369
    /// The Pedersen commitment of the output
1370
    pub commitment: CompressedCommitment,
1371
    /// The height at which the output was mined
1372
    pub mined_height: u64,
1373
    /// The block hash in which the output was mined
1374
    pub mined_in_block: FixedHash,
1375
    /// Whether the output is confirmed
1376
    pub confirmed: bool,
1377
    /// The timestamp at which the output was mined
1378
    pub mined_timestamp: u64,
1379
}
1380

1381
/// These are the fields to be set for the spent outputs batch mode update
1382
#[derive(Clone, Debug, Default)]
1383
pub struct SpentOutputInfoForBatch {
1384
    /// The hash of the output
1385
    pub commitment: CompressedCommitment,
1386
    /// Whether the output is confirmed
1387
    pub confirmed: bool,
1388
    /// The height at which the output was marked as deleted
1389
    pub mark_deleted_at_height: u64,
1390
    /// The block hash in which the output was marked as deleted
1391
    pub mark_deleted_in_block: FixedHash,
1392
}
1393

1394
fn update_outputs_with_tx_id_and_status_to_new_status(
×
1395
    conn: &mut SqliteConnection,
×
1396
    tx_id: TxId,
×
1397
    from_status: OutputStatus,
×
1398
    to_status: OutputStatus,
×
1399
) -> Result<(), OutputManagerStorageError> {
×
1400
    diesel::update(
×
1401
        outputs::table
×
1402
            .filter(
×
1403
                outputs::received_in_tx_id
×
1404
                    .eq(Some(tx_id.as_u64() as i64))
×
1405
                    .or(outputs::spent_in_tx_id.eq(Some(tx_id.as_u64() as i64))),
×
1406
            )
1407
            .filter(outputs::status.eq(from_status as i32)),
×
1408
    )
1409
    .set(outputs::status.eq(to_status as i32))
×
1410
    .execute(conn)?;
×
1411
    Ok(())
×
1412
}
×
1413

1414
/// These are the fields that can be updated for an Output
1415
#[derive(Clone, Default)]
1416
pub struct UpdateOutput {
1417
    status: Option<OutputStatus>,
1418
    hash: Option<Vec<u8>>,
1419
    received_in_tx_id: Option<Option<TxId>>,
1420
    spent_in_tx_id: Option<Option<TxId>>,
1421
    metadata_signature_ephemeral_commitment: Option<Vec<u8>>,
1422
    metadata_signature_ephemeral_pubkey: Option<Vec<u8>>,
1423
    metadata_signature_u_a: Option<Vec<u8>>,
1424
    metadata_signature_u_x: Option<Vec<u8>>,
1425
    metadata_signature_u_y: Option<Vec<u8>>,
1426
    mined_height: Option<Option<u64>>,
1427
    mined_in_block: Option<Option<Vec<u8>>>,
1428
    last_validation_timestamp: Option<Option<NaiveDateTime>>,
1429
    encrypted_data: Option<Vec<u8>>,
1430
}
1431

1432
#[derive(AsChangeset)]
1433
#[diesel(table_name = outputs)]
1434
pub struct UpdateOutputSql {
1435
    status: Option<i32>,
1436
    hash: Option<Vec<u8>>,
1437
    received_in_tx_id: Option<Option<i64>>,
1438
    spent_in_tx_id: Option<Option<i64>>,
1439
    metadata_signature_ephemeral_commitment: Option<Vec<u8>>,
1440
    metadata_signature_ephemeral_pubkey: Option<Vec<u8>>,
1441
    metadata_signature_u_a: Option<Vec<u8>>,
1442
    metadata_signature_u_x: Option<Vec<u8>>,
1443
    metadata_signature_u_y: Option<Vec<u8>>,
1444
    mined_height: Option<Option<i64>>,
1445
    mined_in_block: Option<Option<Vec<u8>>>,
1446
    encrypted_data: Option<Vec<u8>>,
1447
    last_validation_timestamp: Option<Option<NaiveDateTime>>,
1448
}
1449

1450
/// Map a Rust friendly UpdateOutput to the Sql data type form
1451
impl From<UpdateOutput> for UpdateOutputSql {
1452
    fn from(u: UpdateOutput) -> Self {
2✔
1453
        Self {
1454
            status: u.status.map(|t| t as i32),
2✔
1455
            hash: u.hash,
2✔
1456
            metadata_signature_ephemeral_commitment: u.metadata_signature_ephemeral_commitment,
2✔
1457
            metadata_signature_ephemeral_pubkey: u.metadata_signature_ephemeral_pubkey,
2✔
1458
            metadata_signature_u_a: u.metadata_signature_u_a,
2✔
1459
            metadata_signature_u_x: u.metadata_signature_u_x,
2✔
1460
            metadata_signature_u_y: u.metadata_signature_u_y,
2✔
1461
            received_in_tx_id: u.received_in_tx_id.map(|o| o.map(TxId::as_i64_wrapped)),
2✔
1462
            spent_in_tx_id: u.spent_in_tx_id.map(|o| o.map(TxId::as_i64_wrapped)),
2✔
1463
            mined_height: u.mined_height.map(|t| t.map(|h| h as i64)),
2✔
1464
            mined_in_block: u.mined_in_block,
2✔
1465
            encrypted_data: u.encrypted_data,
2✔
1466
            last_validation_timestamp: u.last_validation_timestamp,
2✔
1467
        }
1468
    }
2✔
1469
}
1470

1471
#[derive(Clone, Derivative, Queryable, Insertable, Identifiable, PartialEq, AsChangeset)]
×
1472
#[derivative(Debug)]
1473
#[diesel(table_name = known_one_sided_payment_scripts)]
1474
#[diesel(primary_key(script_hash))]
1475
// #[identifiable_options(primary_key(hash))]
1476
pub struct KnownOneSidedPaymentScriptSql {
1477
    pub script_hash: Vec<u8>,
1478
    pub private_key: String,
1479
    pub script: Vec<u8>,
1480
    pub input: Vec<u8>,
1481
    pub script_lock_height: i64,
1482
}
1483

1484
/// These are the fields that can be updated for an Output
1485
#[derive(AsChangeset)]
1486
#[diesel(table_name = known_one_sided_payment_scripts)]
1487
pub struct UpdateKnownOneSidedPaymentScript {
1488
    script: Option<Vec<u8>>,
1489
    input: Option<Vec<u8>>,
1490
}
1491

1492
impl KnownOneSidedPaymentScriptSql {
1493
    /// Write this struct to the database
1494
    pub fn commit(&self, conn: &mut SqliteConnection) -> Result<(), OutputManagerStorageError> {
×
1495
        diesel::insert_into(known_one_sided_payment_scripts::table)
×
1496
            .values(self.clone())
×
1497
            .execute(conn)?;
×
1498
        Ok(())
×
1499
    }
×
1500

1501
    /// Find a particular script, if it exists
1502
    pub fn find(
×
1503
        hash: &[u8],
×
1504
        conn: &mut SqliteConnection,
×
1505
    ) -> Result<KnownOneSidedPaymentScriptSql, OutputManagerStorageError> {
×
1506
        Ok(known_one_sided_payment_scripts::table
×
1507
            .filter(known_one_sided_payment_scripts::script_hash.eq(hash))
×
1508
            .first::<KnownOneSidedPaymentScriptSql>(conn)?)
×
1509
    }
×
1510

1511
    /// Return all known scripts
1512
    pub fn index(conn: &mut SqliteConnection) -> Result<Vec<KnownOneSidedPaymentScriptSql>, OutputManagerStorageError> {
×
1513
        Ok(known_one_sided_payment_scripts::table.load::<KnownOneSidedPaymentScriptSql>(conn)?)
×
1514
    }
×
1515

1516
    pub fn delete(&self, conn: &mut SqliteConnection) -> Result<(), OutputManagerStorageError> {
×
1517
        let num_deleted = diesel::delete(
×
1518
            known_one_sided_payment_scripts::table
×
1519
                .filter(known_one_sided_payment_scripts::script_hash.eq(&self.script_hash)),
×
1520
        )
1521
        .execute(conn)?;
×
1522

1523
        if num_deleted == 0 {
×
1524
            return Err(OutputManagerStorageError::ValuesNotFound);
×
1525
        }
×
1526

1527
        Ok(())
×
1528
    }
×
1529

1530
    pub fn update(
×
1531
        &self,
×
1532
        updated_known_script: UpdateKnownOneSidedPaymentScript,
×
1533
        conn: &mut SqliteConnection,
×
1534
    ) -> Result<KnownOneSidedPaymentScriptSql, OutputManagerStorageError> {
×
1535
        diesel::update(
×
1536
            known_one_sided_payment_scripts::table
×
1537
                .filter(known_one_sided_payment_scripts::script_hash.eq(&self.script_hash)),
×
1538
        )
1539
        .set(updated_known_script)
×
1540
        .execute(conn)
×
1541
        .num_rows_affected_or_not_found(1)?;
×
1542

1543
        KnownOneSidedPaymentScriptSql::find(&self.script_hash, conn)
×
1544
    }
×
1545

1546
    /// Conversion from an KnownOneSidedPaymentScriptSQL to the datatype form
1547
    pub fn to_known_one_sided_payment_script(self) -> Result<KnownOneSidedPaymentScript, OutputManagerStorageError> {
×
1548
        let script_hash = self.script_hash.clone();
×
1549
        let private_key =
×
1550
            TariKeyId::from_str(&self.private_key).map_err(|_| OutputManagerStorageError::ConversionError {
×
1551
                reason: "Could not convert private key to TariKeyId".to_string(),
×
1552
            })?;
×
1553

1554
        let script = TariScript::from_bytes(&self.script).map_err(|_| {
×
1555
            error!(target: LOG_TARGET, "Could not create tari script from stored bytes");
×
1556
            OutputManagerStorageError::ConversionError {
×
1557
                reason: "Tari Script could not be converted from bytes".to_string(),
×
1558
            }
×
1559
        })?;
×
1560
        let input = ExecutionStack::from_bytes(&self.input).map_err(|_| {
×
1561
            error!(target: LOG_TARGET, "Could not create execution stack from stored bytes");
×
1562
            OutputManagerStorageError::ConversionError {
×
1563
                reason: "ExecutionStack could not be converted from bytes".to_string(),
×
1564
            }
×
1565
        })?;
×
1566
        let script_lock_height = self.script_lock_height as u64;
×
1567

1568
        Ok(KnownOneSidedPaymentScript {
×
1569
            script_hash,
×
1570
            script_key_id: private_key,
×
1571
            script,
×
1572
            input,
×
1573
            script_lock_height,
×
1574
        })
×
1575
    }
×
1576

1577
    /// Conversion from an KnownOneSidedPaymentScriptSQL to the datatype form
1578
    pub fn from_known_one_sided_payment_script(
×
1579
        known_script: KnownOneSidedPaymentScript,
×
1580
    ) -> Result<Self, OutputManagerStorageError> {
×
1581
        let script_lock_height = known_script.script_lock_height as i64;
×
1582
        let script_hash = known_script.script_hash;
×
1583
        let private_key = known_script.script_key_id.to_string();
×
1584
        let script = known_script.script.to_bytes().to_vec();
×
1585
        let input = known_script.input.to_bytes().to_vec();
×
1586

1587
        let payment_script = KnownOneSidedPaymentScriptSql {
×
1588
            script_hash,
×
1589
            private_key,
×
1590
            script,
×
1591
            input,
×
1592
            script_lock_height,
×
1593
        };
×
1594

1595
        Ok(payment_script)
×
1596
    }
×
1597
}
1598

1599
/// A summary of coins in a particular range
1600
#[derive(Clone, Debug)]
1601
pub struct CoinBucket {
1602
    /// The number of outputs in this range
1603
    pub number_of_outputs: u64,
1604
    /// The total value of outputs in this range
1605
    pub total_value: u64,
1606
    /// The range that this bucket covers
1607
    pub range: Range<u64>,
1608
}
1609

1610
#[cfg(test)]
1611
mod test {
1612
    #![allow(clippy::indexing_slicing)]
1613

1614
    use diesel::{sql_query, Connection, RunQueryDsl, SqliteConnection};
1615
    use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
1616
    use rand::{rngs::OsRng, RngCore};
1617
    use tari_script::script;
1618
    use tari_test_utils::random;
1619
    use tari_transaction_components::{
1620
        test_helpers::{create_wallet_output_with_data, TestParams},
1621
        transaction_components::{OutputFeatures, TransactionInput, WalletOutput},
1622
        MicroMinotari,
1623
    };
1624
    use tari_transaction_key_manager::{create_memory_db_key_manager, MemoryDbKeyManager};
1625
    use tempfile::tempdir;
1626

1627
    use crate::output_manager_service::storage::{
1628
        models::DbWalletOutput,
1629
        sqlite_db::{new_output_sql::NewOutputSql, output_sql::OutputSql, OutputStatus, UpdateOutput},
1630
        OutputSource,
1631
    };
1632

1633
    pub async fn make_input(
5✔
1634
        val: MicroMinotari,
5✔
1635
        key_manager: &mut MemoryDbKeyManager,
5✔
1636
    ) -> (TransactionInput, WalletOutput) {
5✔
1637
        let test_params = TestParams::new(key_manager).await;
5✔
1638

1639
        let wallet_output = create_wallet_output_with_data(
5✔
1640
            script!(Nop).unwrap(),
5✔
1641
            OutputFeatures::default(),
5✔
1642
            &test_params,
5✔
1643
            val,
5✔
1644
            key_manager,
5✔
1645
        )
5✔
1646
        .await
5✔
1647
        .unwrap();
5✔
1648
        let input = wallet_output.to_transaction_input(key_manager).await.unwrap();
5✔
1649

1650
        (input, wallet_output)
5✔
1651
    }
5✔
1652

1653
    #[allow(clippy::too_many_lines)]
1654
    #[tokio::test]
1655
    async fn test_crud() {
1✔
1656
        let db_name = format!("{}.sqlite3", random::string(8).as_str());
1✔
1657
        let temp_dir = tempdir().unwrap();
1✔
1658
        let db_folder = temp_dir.path().to_str().unwrap().to_string();
1✔
1659
        let db_path = format!("{db_folder}{db_name}");
1✔
1660

1661
        const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
1662

1663
        let mut conn =
1✔
1664
            SqliteConnection::establish(&db_path).unwrap_or_else(|_| panic!("Error connecting to {db_path}"));
1✔
1665

1666
        conn.run_pending_migrations(MIGRATIONS)
1✔
1667
            .map(|v| {
1✔
1668
                v.into_iter()
1✔
1669
                    .map(|b| {
22✔
1670
                        let m = format!("Running migration {b}");
22✔
1671
                        m
22✔
1672
                    })
22✔
1673
                    .collect::<Vec<String>>()
1✔
1674
            })
1✔
1675
            .expect("Migrations failed");
1✔
1676

1677
        sql_query("PRAGMA foreign_keys = ON").execute(&mut conn).unwrap();
1✔
1678

1679
        let mut outputs = Vec::new();
1✔
1680
        let mut outputs_spent = Vec::new();
1✔
1681
        let mut outputs_unspent = Vec::new();
1✔
1682

1683
        let mut key_manager = create_memory_db_key_manager().await.unwrap();
1✔
1684
        for _i in 0..2 {
3✔
1685
            let (_, uo) = make_input(MicroMinotari::from(100 + OsRng.next_u64() % 1000), &mut key_manager).await;
2✔
1686
            let uo = DbWalletOutput::from_wallet_output(uo, None, OutputSource::Standard, None, None);
2✔
1687
            let o = NewOutputSql::new(uo, Some(OutputStatus::Unspent), None).unwrap();
2✔
1688
            outputs.push(o.clone());
2✔
1689
            outputs_unspent.push(o.clone());
2✔
1690
            o.commit(&mut conn).unwrap();
2✔
1691
        }
1692

1693
        for _i in 0..3 {
4✔
1694
            let (_, uo) = make_input(MicroMinotari::from(100 + OsRng.next_u64() % 1000), &mut key_manager).await;
3✔
1695
            let uo = DbWalletOutput::from_wallet_output(uo, None, OutputSource::Standard, None, None);
3✔
1696
            let o = NewOutputSql::new(uo, Some(OutputStatus::Spent), None).unwrap();
3✔
1697
            outputs.push(o.clone());
3✔
1698
            outputs_spent.push(o.clone());
3✔
1699
            o.commit(&mut conn).unwrap();
3✔
1700
        }
1701

1702
        assert_eq!(
1✔
1703
            OutputSql::index(&mut conn)
1✔
1704
                .unwrap()
1✔
1705
                .iter()
1✔
1706
                .map(|o| o.spending_key.clone())
5✔
1707
                .collect::<Vec<String>>(),
1✔
1708
            outputs.iter().map(|o| o.spending_key.clone()).collect::<Vec<String>>()
5✔
1709
        );
1710
        assert_eq!(
1✔
1711
            OutputSql::index_status(vec!(OutputStatus::Unspent), &mut conn)
1✔
1712
                .unwrap()
1✔
1713
                .iter()
1✔
1714
                .map(|o| o.spending_key.clone())
2✔
1715
                .collect::<Vec<String>>(),
1✔
1716
            outputs_unspent
1✔
1717
                .iter()
1✔
1718
                .map(|o| o.spending_key.clone())
2✔
1719
                .collect::<Vec<String>>()
1✔
1720
        );
1721
        assert_eq!(
1✔
1722
            OutputSql::index_status(vec!(OutputStatus::Spent), &mut conn)
1✔
1723
                .unwrap()
1✔
1724
                .iter()
1✔
1725
                .map(|o| o.spending_key.clone())
3✔
1726
                .collect::<Vec<String>>(),
1✔
1727
            outputs_spent
1✔
1728
                .iter()
1✔
1729
                .map(|o| o.spending_key.clone())
3✔
1730
                .collect::<Vec<String>>()
1✔
1731
        );
1732

1733
        assert_eq!(
1✔
1734
            OutputSql::find(&outputs[0].spending_key, &mut conn)
1✔
1735
                .unwrap()
1✔
1736
                .spending_key,
1737
            outputs[0].spending_key
1✔
1738
        );
1739

1740
        assert_eq!(
1✔
1741
            OutputSql::find_status(&outputs[0].spending_key, OutputStatus::Unspent, &mut conn)
1✔
1742
                .unwrap()
1✔
1743
                .spending_key,
1744
            outputs[0].spending_key
1✔
1745
        );
1746

1747
        assert!(OutputSql::find_status(&outputs[0].spending_key, OutputStatus::Spent, &mut conn).is_err());
1✔
1748

1749
        let _result = OutputSql::find(&outputs[4].spending_key, &mut conn)
1✔
1750
            .unwrap()
1✔
1751
            .delete(&mut conn);
1✔
1752

1753
        assert_eq!(OutputSql::index(&mut conn).unwrap().len(), 4);
1✔
1754

1755
        let _updated1 = OutputSql::find(&outputs[0].spending_key, &mut conn)
1✔
1756
            .unwrap()
1✔
1757
            .update(
1✔
1758
                UpdateOutput {
1✔
1759
                    status: Some(OutputStatus::Unspent),
1✔
1760
                    received_in_tx_id: Some(Some(44u64.into())),
1✔
1761
                    ..Default::default()
1✔
1762
                },
1✔
1763
                &mut conn,
1✔
1764
            )
1765
            .unwrap();
1✔
1766

1767
        let _updated2 = OutputSql::find(&outputs[1].spending_key, &mut conn)
1✔
1768
            .unwrap()
1✔
1769
            .update(
1✔
1770
                UpdateOutput {
1✔
1771
                    status: Some(OutputStatus::EncumberedToBeReceived),
1✔
1772
                    received_in_tx_id: Some(Some(44u64.into())),
1✔
1773
                    ..Default::default()
1✔
1774
                },
1✔
1775
                &mut conn,
1✔
1776
            )
1777
            .unwrap();
1✔
1778

1779
        let result = OutputSql::find_by_tx_id_and_encumbered(44u64.into(), &mut conn).unwrap();
1✔
1780
        assert_eq!(result.len(), 1);
1✔
1781
        assert_eq!(result[0].spending_key, outputs[1].spending_key);
1✔
1782
    }
1✔
1783
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc