• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18659805528

20 Oct 2025 05:25PM UTC coverage: 37.563% (+1.5%) from 36.045%
18659805528

push

github

0xmichalis
chore: more log cleanup

0 of 2 new or added lines in 1 file covered. (0.0%)

129 existing lines in 4 files now uncovered.

1714 of 4563 relevant lines covered (37.56%)

7.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/bin/server.rs
1
use clap::Parser;
2
use dotenv::dotenv;
3
use std::env;
4
use std::io::Write;
5
use std::net::SocketAddr;
6
use std::sync::atomic::{AtomicBool, Ordering};
7
use std::sync::Arc;
8
use tokio::signal;
9
use tokio::sync::mpsc;
10
use tracing::{error, info};
11

12
use nftbk::envvar::is_defined;
13
use nftbk::logging;
14
use nftbk::logging::LogLevel;
15
use nftbk::server::auth::x402::X402Config;
16
use nftbk::server::auth::{load_auth_config, JwtCredential};
17
use nftbk::server::pin_monitor::run_pin_monitor;
18
use nftbk::server::pruner::run_pruner;
19
use nftbk::server::router::build_router;
20
use nftbk::server::{
21
    recover_incomplete_tasks, spawn_backup_workers, AppState, BackupTaskOrShutdown,
22
};
23

24
#[derive(Parser, Debug)]
25
#[command(author, version, about, long_about = None)]
26
struct Args {
27
    /// The address to listen on
28
    #[arg(long, default_value = "127.0.0.1:8080")]
29
    listen_address: String,
30

31
    /// The path to the chains configuration file
32
    #[arg(short = 'c', long, default_value = "config_chains.toml")]
33
    chain_config: String,
34

35
    /// The base directory to save the backup to
36
    #[arg(long, default_value = "/tmp")]
37
    base_dir: String,
38

39
    /// Set the log level
40
    #[arg(short, long, value_enum, default_value = "info")]
41
    log_level: LogLevel,
42

43
    /// Skip checksum verification
44
    #[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
45
    unsafe_skip_checksum_check: bool,
46

47
    /// Enable the pruner thread
48
    #[arg(long, default_value_t = true, action = clap::ArgAction::Set)]
49
    enable_pruner: bool,
50

51
    /// Pruner retention period in days
52
    #[arg(long, default_value_t = 3)]
53
    pruner_retention_days: u64,
54

55
    /// Pruner interval in seconds
56
    #[arg(long, default_value_t = 3600)]
57
    pruner_interval_seconds: u64,
58

59
    /// Pruner regex pattern for file names to prune
60
    #[arg(long, default_value = "^nftbk-")]
61
    pruner_pattern: String,
62

63
    /// Pin monitor interval in seconds
64
    #[arg(long, default_value_t = 120)]
65
    pin_monitor_interval_seconds: u64,
66

67
    /// Number of backup worker threads to run in parallel
68
    #[arg(long, default_value_t = 4)]
69
    backup_parallelism: usize,
70

71
    /// Maximum number of backup tasks to queue before blocking
72
    #[arg(long, default_value_t = 10000)]
73
    backup_queue_size: usize,
74

75
    /// Disable colored log output
76
    #[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
77
    no_color: bool,
78

79
    /// Path to a TOML file with one or more JWT credential sets
80
    #[arg(long)]
81
    auth_config: Option<String>,
82

83
    /// Path to a TOML file with IPFS provider configuration
84
    /// When provided, this is used instead of IPFS_* env vars
85
    #[arg(long)]
86
    ipfs_config: Option<String>,
87
}
88

89
#[derive(serde::Deserialize)]
90
struct IpfsConfigFile {
91
    ipfs_pinning_provider: Vec<nftbk::ipfs::IpfsPinningConfig>,
92
}
93

94
#[tokio::main]
UNCOV
95
async fn main() {
×
96
    // We are consuming config both from the environment and from the command line
UNCOV
97
    dotenv().ok();
×
UNCOV
98
    let args = Args::parse();
×
UNCOV
99
    logging::init(args.log_level.clone(), !args.no_color);
×
UNCOV
100
    info!(
×
UNCOV
101
        "Version: {} {} (commit {})",
×
102
        env!("CARGO_BIN_NAME"),
103
        env!("CARGO_PKG_VERSION"),
104
        env!("GIT_COMMIT")
105
    );
106
    info!("Initializing server with options: {:?}", args);
×
107

108
    // Load authentication config
109
    let auth_token = env::var("NFTBK_AUTH_TOKEN").ok();
×
110
    info!(
×
111
        "Symmetric authentication enabled: {}",
×
112
        is_defined(&auth_token)
×
113
    );
UNCOV
114
    let mut jwt_credentials: Vec<JwtCredential> = Vec::new();
×
UNCOV
115
    let mut x402_config: Option<X402Config> = None;
×
UNCOV
116
    if let Some(path) = &args.auth_config {
×
117
        match load_auth_config(std::path::Path::new(path)) {
×
UNCOV
118
            Ok(auth_config) => {
×
UNCOV
119
                jwt_credentials = auth_config.jwt_credentials;
×
120
                x402_config = auth_config.x402_config;
×
121
            }
122
            Err(e) => {
×
123
                error!("Failed to load auth config from '{}': {}", path, e);
×
124
                std::process::exit(1);
×
125
            }
126
        }
127
    }
128

129
    // Load IPFS provider configuration from file if provided
130
    let ipfs_pinning_configs = if args.ipfs_config.is_none() {
×
131
        // No config file, use empty list (AppState will fall back to env vars)
UNCOV
132
        Vec::new()
×
133
    } else {
134
        let path = args.ipfs_config.as_ref().unwrap();
×
UNCOV
135
        match std::fs::read_to_string(path) {
×
UNCOV
136
            Ok(contents) => match toml::from_str::<IpfsConfigFile>(&contents) {
×
UNCOV
137
                Ok(file) => {
×
UNCOV
138
                    info!(
×
UNCOV
139
                        "Loaded {} IPFS pinning provider(s) from config file '{}'",
×
140
                        file.ipfs_pinning_provider.len(),
×
141
                        path
142
                    );
UNCOV
143
                    file.ipfs_pinning_provider
×
144
                }
145
                Err(e) => {
×
146
                    error!("Failed to parse IPFS config file '{}': {}", path, e);
×
147
                    std::process::exit(1);
×
148
                }
149
            },
150
            Err(e) => {
×
UNCOV
151
                error!("Failed to read IPFS config file '{}': {}", path, e);
×
UNCOV
152
                std::process::exit(1);
×
153
            }
154
        }
155
    };
156

157
    let (backup_task_sender, backup_task_receiver) =
×
UNCOV
158
        mpsc::channel::<BackupTaskOrShutdown>(args.backup_queue_size);
×
UNCOV
159
    let db_url =
×
160
        std::env::var("DATABASE_URL").expect("DATABASE_URL env var must be set for Postgres");
×
161
    let shutdown_flag = Arc::new(AtomicBool::new(false));
×
162
    let state = AppState::new(
UNCOV
163
        &args.chain_config,
×
UNCOV
164
        &args.base_dir,
×
UNCOV
165
        args.unsafe_skip_checksum_check,
×
UNCOV
166
        auth_token.clone(),
×
167
        args.enable_pruner,
×
168
        args.pruner_retention_days,
×
169
        backup_task_sender.clone(),
×
170
        &db_url,
×
171
        (args.backup_queue_size + 1) as u32,
×
UNCOV
172
        shutdown_flag.clone(),
×
173
        ipfs_pinning_configs,
×
174
    )
175
    .await;
×
176

177
    // Spawn worker pool for backup tasks
178
    let worker_handles =
×
179
        spawn_backup_workers(args.backup_parallelism, backup_task_receiver, state.clone());
×
180

181
    // Recover incomplete backup tasks from previous server runs
182
    match recover_incomplete_tasks(&*state.db, &state.backup_task_sender).await {
×
183
        Ok(count) => {
×
UNCOV
184
            if count > 0 {
×
185
                info!("Successfully recovered {} incomplete backup tasks", count);
×
186
            }
187
        }
188
        Err(e) => {
×
189
            error!("Failed to recover incomplete backup tasks: {}", e);
×
190
            // Don't exit the server, just log the error and continue
191
        }
192
    }
193

194
    // Start the pruner thread
195
    let pruner_handle = if !args.enable_pruner {
×
UNCOV
196
        None
×
197
    } else {
UNCOV
198
        let db = state.db.clone();
×
199
        let base_dir = args.base_dir.clone();
×
200
        let interval = args.pruner_interval_seconds;
×
UNCOV
201
        let shutdown_flag = state.shutdown_flag.clone();
×
UNCOV
202
        Some(tokio::spawn(async move {
×
203
            run_pruner(db, base_dir, interval, shutdown_flag).await;
×
204
        }))
205
    };
206

207
    // Start the pin monitor thread if IPFS providers are configured
UNCOV
208
    let pin_monitor_handle = if state.ipfs_pinning_instances.is_empty() {
×
209
        None
×
210
    } else {
UNCOV
211
        let db = state.db.clone();
×
UNCOV
212
        let providers = state.ipfs_pinning_instances.clone();
×
UNCOV
213
        let interval = args.pin_monitor_interval_seconds;
×
UNCOV
214
        let shutdown_flag = state.shutdown_flag.clone();
×
UNCOV
215
        info!(
×
216
            "Starting pin monitor with {} IPFS provider(s) and {} second interval",
×
217
            providers.len(),
×
218
            interval
219
        );
220
        Some(tokio::spawn(async move {
×
221
            run_pin_monitor(db, providers.to_vec(), interval, shutdown_flag).await;
×
222
        }))
223
    };
224

225
    // Add graceful shutdown
UNCOV
226
    let shutdown_signal = async move {
×
UNCOV
227
        let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
×
228
            .expect("failed to install SIGTERM handler");
229

230
        tokio::select! {
×
UNCOV
231
            _ = signal::ctrl_c() => {
×
232
                info!("Received SIGINT (Ctrl+C), shutting down server...");
×
233
            }
234
            _ = sigterm.recv() => {
×
235
                info!("Received SIGTERM, shutting down server...");
×
236
            }
237
        }
238
        shutdown_flag.store(true, Ordering::SeqCst);
×
239
    };
240

241
    // Start the server
242
    let addr: SocketAddr = args.listen_address.parse().expect("Invalid listen address");
×
UNCOV
243
    let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
×
244
    let app = build_router(
UNCOV
245
        state.clone(),
×
UNCOV
246
        jwt_credentials
×
247
            .into_iter()
×
248
            .map(|c| (c.issuer, c.audience, c.verification_key))
×
UNCOV
249
            .collect(),
×
UNCOV
250
        x402_config.clone(),
×
251
    );
252
    info!("Listening on {}", addr);
×
253
    axum::serve(listener, app)
×
UNCOV
254
        .with_graceful_shutdown(shutdown_signal)
×
255
        .await
×
256
        .unwrap();
UNCOV
257
    info!("Server has exited");
×
258

259
    if let Some(handle) = pruner_handle {
×
UNCOV
260
        let _ = handle.await;
×
261
    }
UNCOV
262
    info!("Pruner has exited");
×
263

264
    if let Some(handle) = pin_monitor_handle {
×
UNCOV
265
        let _ = handle.await;
×
266
    }
267
    info!("Pin monitor has exited");
×
268

269
    // On shutdown, send one Shutdown message per worker
270
    for _ in 0..args.backup_parallelism {
×
UNCOV
271
        let _ = state
×
272
            .backup_task_sender
×
273
            .send(BackupTaskOrShutdown::Shutdown)
×
274
            .await;
×
275
    }
276
    // Drop the last sender to close the channel and signal workers to exit
277
    drop(state.backup_task_sender);
×
UNCOV
278
    info!("Backup task sender has exited");
×
279

280
    // Wait for all workers to finish
UNCOV
281
    for handle in worker_handles {
×
282
        let _ = handle.await;
×
283
    }
284
    info!("Backup workers have exited");
×
285

286
    // Give time for final logs to flush
287
    let _ = std::io::stdout().flush();
×
UNCOV
288
    std::thread::sleep(std::time::Duration::from_millis(200));
×
289
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc