• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

yaleman / maremma / #247

01 Feb 2025 03:10PM UTC coverage: 74.233% (+0.2%) from 74.023%
#247

push

web-flow
Another attempt at fixing the DB locks (#130)

* fix: updating a comment about get_next_service_check ordering
* fix: Moving to only using write locks on the DB
* fix: Cleanup of how OrderFields is turned into a String
* fix: clippy all the things
* fix: less hand-written strings more constants
* fix: adding gha rust caching

64 of 70 new or added lines in 12 files covered. (91.43%)

1 existing line in 1 file now uncovered.

2008 of 2705 relevant lines covered (74.23%)

2.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.24
/src/check_loop.rs
1
//! Runs the service checks on a loop
2

3
use crate::db::get_next_service_check;
4
use crate::prelude::*;
5
use opentelemetry::metrics::Counter;
6
use opentelemetry::KeyValue;
7
use rand::seq::IteratorRandom;
8
use tokio::sync::Semaphore;
9

10
const DEFAULT_BACKOFF: std::time::Duration = tokio::time::Duration::from_millis(50);
11
const MAX_BACKOFF: std::time::Duration = tokio::time::Duration::from_secs(1);
12

13
#[derive(Clone, Debug)]
14
/// The end result of a service check
15
pub struct CheckResult {
16
    /// When the check finished
17
    pub timestamp: chrono::DateTime<Utc>,
18
    /// How long it took
19
    pub time_elapsed: Duration,
20
    /// The result
21
    pub status: ServiceStatus,
22
    /// Any explanatory/returned text
23
    pub result_text: String,
24
}
25

26
#[instrument(level = "INFO", skip_all, fields(service_check_id=%service_check.id, service_id=%service.id))]
27
/// Does what it says on the tin
28
pub(crate) async fn run_service_check(
29
    db: Arc<RwLock<DatabaseConnection>>,
30
    service_check: &entities::service_check::Model,
31
    service: entities::service::Model,
32
) -> Result<(), Error> {
33
    let db_writer = db.write().await;
4✔
34
    let check = match Service::try_from_service_model(&service, &db_writer).await {
5✔
35
        Ok(check) => check,
1✔
36
        Err(err) => {
×
37
            error!(
×
38
                "Failed to convert service check {} to service: {:?}",
39
                service_check.id, err
40
            );
41
            return Err(Error::Generic(format!(
×
42
                "Failed to convert service check {} to service: {:?}",
43
                service_check.id, err
44
            )));
45
        }
46
    };
47

48
    let host: entities::host::Model = match service_check
5✔
49
        .find_related(entities::host::Entity)
1✔
50
        .one(&*db_writer)
2✔
51
        .await?
4✔
52
    {
53
        Some(host) => {
1✔
54
            debug!(
1✔
55
                "Found host: {} for service_check={}",
56
                host.name,
57
                service_check.id.hyphenated()
58
            );
59
            host
1✔
60
        }
61
        None => {
62
            error!(
×
63
                "Failed to get host for service check: {:?}",
64
                service_check.id
65
            );
66
            return Err(Error::HostNotFound(service_check.host_id));
×
67
        }
68
    };
69

70
    #[cfg(not(tarpaulin_include))]
71
    let service_to_run = check.config().ok_or_else(|| {
72
        error!(
73
            "Failed to get service config for {}",
74
            service.id.hyphenated()
75
        );
76
        Error::ServiceConfigNotFound(service.id.hyphenated().to_string())
77
    })?;
78
    drop(db_writer);
1✔
79
    debug!("Starting service_check={:?}", service_check);
13✔
80
    let result = match service_to_run.run(&host).await {
4✔
81
        Ok(val) => val,
×
82
        Err(err) => CheckResult {
83
            timestamp: chrono::Utc::now(),
2✔
84
            time_elapsed: Duration::zero(),
2✔
85
            status: ServiceStatus::Error,
86
            result_text: format!("Error: {:?}", err),
2✔
87
        },
88
    };
89
    debug!(
18✔
90
        "Completed service_check={:?} result={:?}",
91
        service_check, result.status
92
    );
93
    let jitter = service_to_run.jitter_value();
3✔
94

95
    let db_writer = db.write().await;
2✔
96
    entities::service_check_history::Model::from_service_check_result(service_check.id, &result)
8✔
97
        .into_active_model()
98
        .insert(&*db_writer)
2✔
99
        .await?;
7✔
100
    let mut model = service_check.clone().into_active_model();
2✔
101
    model.last_check.set_if_not_equals(chrono::Utc::now());
2✔
102
    model.status.set_if_not_equals(result.status);
2✔
103

104
    // get a number between 0 and jitter
105
    let jitter: i64 = (0..jitter).choose(&mut rand::thread_rng()).unwrap_or(0) as i64;
2✔
106

107
    let next_check = Cron::new(&service.cron_schedule)
5✔
108
        .parse()?
×
109
        .find_next_occurrence(&chrono::Utc::now(), false)?
2✔
110
        + chrono::Duration::seconds(jitter);
2✔
111
    model.next_check.set_if_not_equals(next_check);
1✔
112

113
    if model.is_changed() {
4✔
114
        debug!("Saving {:?}", model);
20✔
115
        model.save(&*db_writer).await.map_err(|err| {
6✔
116
            error!("{} error saving {:?}", service.id.hyphenated(), err);
×
117
            Error::from(err)
×
118
        })?;
119
    } else {
120
        debug!("set_last_check with no change? {:?}", model);
×
121
    }
122
    drop(db_writer);
1✔
123

124
    Ok(())
1✔
125
}
126

127
#[instrument(level = "DEBUG", skip_all, fields(service_check_id = %service_check.id, service_id = %service.id))]
128
async fn run_inner(
129
    db: Arc<RwLock<DatabaseConnection>>,
130
    service_check: entities::service_check::Model,
131
    service: entities::service::Model,
132
    checks_run_since_startup: Arc<Counter<u64>>,
133
) -> Result<(), Error> {
134
    let sc_id = service_check.id.hyphenated().to_string();
×
135
    if let Err(err) = run_service_check(db.clone(), &service_check, service).await {
×
136
        error!("Failed to run service_check {} error={:?}", sc_id, err);
×
137

138
        let db_writer = db.write().await;
×
139
        if let Some(service_check) = entities::service_check::Entity::find()
×
140
            .filter(entities::service_check::Column::Id.eq(&sc_id))
×
141
            .one(&*db_writer)
×
142
            .await
×
143
            .map_err(Error::from)?
×
144
        {
145
            let mut service_check = service_check.into_active_model();
×
146
            service_check.status.set_if_not_equals(ServiceStatus::Error);
×
147
            service_check.update(&*db_writer).await?;
×
148
        }
NEW
149
        drop(db_writer);
×
150

151
        checks_run_since_startup.add(
×
152
            1,
153
            &[
×
154
                KeyValue::new("result", ToString::to_string(&ServiceStatus::Error)),
×
155
                KeyValue::new("id", sc_id),
×
156
            ],
157
        );
158
    } else {
159
        checks_run_since_startup.add(
×
160
            1,
161
            &[
×
162
                KeyValue::new("result", ToString::to_string(&ServiceStatus::Ok)),
×
163
                KeyValue::new("id", sc_id),
×
164
            ],
165
        );
166
    }
167
    Ok(())
×
168
}
169

170
#[cfg(not(tarpaulin_include))]
171
/// Loop around and do the checks, keeping it to a limit based on `max_permits`
172
pub async fn run_check_loop(
173
    db: Arc<RwLock<DatabaseConnection>>,
174
    max_permits: usize,
175
    metrics_meter: Arc<Meter>,
176
) -> Result<(), Error> {
177
    // Create a Counter Instrument.
178

179
    let checks_run_since_startup = metrics_meter
180
        .u64_counter("checks_run_since_startup")
181
        .build();
182
    let checks_run_since_startup = Arc::new(checks_run_since_startup);
183

184
    let mut backoff: std::time::Duration = DEFAULT_BACKOFF;
185
    // Limit to n concurrent tasks
186
    let semaphore = Arc::new(Semaphore::new(max_permits));
187
    info!("Max concurrent tasks set to {}", max_permits);
188
    loop {
189
        while semaphore.available_permits() == 0 {
190
            warn!("No spare task slots, something might be running slow!");
191
            tokio::time::sleep(backoff).await;
192
        }
193
        match semaphore.clone().acquire_owned().await {
194
            Ok(permit) => {
195
                let db_lock = db.write().await;
196
                let next_service = get_next_service_check(&db_lock).await?;
197

198
                if let Some((service_check, service)) = next_service {
199
                    // set the service_check to running
200
                    service_check
201
                        .set_status(ServiceStatus::Checking, &db_lock)
202
                        .await?;
203
                    drop(db_lock);
204
                    tokio::spawn(run_inner(
205
                        db.clone(),
206
                        service_check,
207
                        service,
208
                        checks_run_since_startup.clone(),
209
                    ));
210
                    // we did a thing, so we can reset the back-off time, because there might be another
211
                    backoff = DEFAULT_BACKOFF;
212
                } else {
213
                    // didn't get a task, increase backoff a little, but don't overflow the max
214
                    backoff += DEFAULT_BACKOFF;
215
                    if backoff > MAX_BACKOFF {
216
                        backoff = MAX_BACKOFF;
217
                    }
218
                }
219
                drop(permit); // Release the semaphore when the task is done
220
            }
221
            Err(err) => {
222
                error!("Failed to acquire semaphore permit: {:?}", err);
223
                // something went wrong so we want to chill a bit
224
                backoff = std::cmp::max(MAX_BACKOFF / 2, DEFAULT_BACKOFF);
225
            }
226
        };
227
    }
228
}
229

230
#[cfg(test)]
231
mod tests {
232
    use entities::service_check;
233

234
    use super::*;
235
    use crate::db::tests::test_setup;
236

237
    #[tokio::test]
238
    async fn test_run_service_check() {
239
        let (db, _config) = test_setup().await.expect("Failed to setup test");
240

241
        let db_lock = db.write().await;
242

243
        let service = entities::service::Entity::find()
244
            .filter(entities::service::Column::ServiceType.eq(ServiceType::Ping))
245
            .one(&*db_lock)
246
            .await
247
            .expect("Failed to query ping service")
248
            .expect("Failed to find ping service");
249

250
        let service_check = service_check::Entity::find()
251
            .filter(service_check::Column::ServiceId.eq(service.id))
252
            .one(&*db_lock)
253
            .await
254
            .expect("Failed to query service check")
255
            .expect("Failed to find service check");
256
        drop(db_lock);
257

258
        run_service_check(db.clone(), &service_check, service)
259
            .await
260
            .expect("Failed to run service check");
261
    }
262

263
    #[tokio::test]
264
    async fn test_run_pending_service_check() {
265
        let (db, _config) = test_setup().await.expect("Failed to setup test");
266

267
        let db_writer = db.write().await;
268

269
        service_check::Entity::update_many()
270
            .col_expr(
271
                service_check::Column::Status,
272
                Expr::value(ServiceStatus::Pending),
273
            )
274
            .exec(&*db_writer)
275
            .await
276
            .expect("Failed to update service checks to pending");
277

278
        let service = entities::service::Entity::find()
279
            .filter(entities::service::Column::ServiceType.eq(ServiceType::Ping))
280
            .one(&*db_writer)
281
            .await
282
            .expect("Failed to query ping service")
283
            .expect("Failed to find ping service");
284

285
        let service_check = service_check::Entity::find()
286
            .filter(service_check::Column::ServiceId.eq(service.id))
287
            .one(&*db_writer)
288
            .await
289
            .expect("Failed to query service check")
290
            .expect("Failed to find service check");
291

292
        drop(db_writer);
293
        dbg!(&service, &service_check);
294

295
        run_service_check(db.clone(), &service_check, service)
296
            .await
297
            .expect("Failed to run service check");
298
    }
299
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc