• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

yaleman / maremma / #249

02 Feb 2025 10:09PM UTC coverage: 75.046% (+0.8%) from 74.233%
#249

push

web-flow
New fixes (#131)

14 of 25 new or added lines in 3 files covered. (56.0%)

2 existing lines in 1 file now uncovered.

2033 of 2709 relevant lines covered (75.05%)

2.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.78
/src/check_loop.rs
1
//! Runs the service checks on a loop
2

3
use crate::db::get_next_service_check;
4
use crate::prelude::*;
5
use opentelemetry::metrics::Counter;
6
use opentelemetry::KeyValue;
7
use rand::seq::IteratorRandom;
8
use tokio::sync::Semaphore;
9

10
const DEFAULT_BACKOFF: std::time::Duration = tokio::time::Duration::from_millis(50);
11
const MAX_BACKOFF: std::time::Duration = tokio::time::Duration::from_secs(1);
12

13
#[derive(Clone, Debug)]
14
/// The end result of a service check
15
pub struct CheckResult {
16
    /// When the check finished
17
    pub timestamp: chrono::DateTime<Utc>,
18
    /// How long it took
19
    pub time_elapsed: Duration,
20
    /// The result
21
    pub status: ServiceStatus,
22
    /// Any explanatory/returned text
23
    pub result_text: String,
24
}
25

26
impl Default for CheckResult {
27
    fn default() -> Self {
2✔
28
        Self {
29
            timestamp: chrono::Utc::now(),
2✔
30
            time_elapsed: Duration::zero(),
2✔
31
            status: ServiceStatus::Unknown,
32
            result_text: String::new(),
2✔
33
        }
34
    }
35
}
36

37
#[instrument(level = "INFO", skip_all, fields(service_check_id=%service_check.id, service_id=%service.id))]
38
/// Does what it says on the tin
39
pub(crate) async fn run_service_check(
40
    db: Arc<RwLock<DatabaseConnection>>,
41
    service_check: &entities::service_check::Model,
42
    service: entities::service::Model,
43
) -> Result<(), Error> {
44
    let db_writer = db.write().await;
4✔
45
    let check = match Service::try_from_service_model(&service, &db_writer).await {
5✔
46
        Ok(check) => check,
1✔
47
        Err(err) => {
×
NEW
48
            let errmsg = format!(
×
49
                "Failed to convert service check {} to service: {:?}",
50
                service_check.id, err
51
            );
NEW
52
            error!(errmsg);
×
NEW
53
            return Err(Error::Generic(errmsg));
×
54
        }
55
    };
56

57
    let host: entities::host::Model = match service_check
7✔
58
        .find_related(entities::host::Entity)
1✔
59
        .one(&*db_writer)
4✔
60
        .await?
7✔
61
    {
62
        Some(host) => host,
1✔
63
        None => {
64
            error!(
×
65
                "Failed to get host for service check: {:?}",
66
                service_check.id
67
            );
68
            return Err(Error::HostNotFound(service_check.host_id));
×
69
        }
70
    };
71

72
    #[cfg(not(tarpaulin_include))]
73
    let service_to_run = check.config().ok_or_else(|| {
74
        error!(
75
            "Failed to get service config for {}",
76
            service.id.hyphenated()
77
        );
78
        Error::ServiceConfigNotFound(service.id.hyphenated().to_string())
79
    })?;
80
    drop(db_writer);
1✔
81

82
    debug!("Starting service_check={:?}", service_check);
10✔
83
    let start_time = chrono::Utc::now();
1✔
84
    let result = match service_to_run.run(&host).await {
3✔
85
        Ok(val) => val,
×
86
        Err(err) => CheckResult {
87
            status: ServiceStatus::Error,
88
            time_elapsed: chrono::Utc::now() - start_time,
3✔
89
            result_text: format!("Error: {:?}", err),
2✔
90
            ..Default::default()
91
        },
92
    };
93
    debug!(
22✔
94
        "Completed service_check={:?} result={:?}",
95
        service_check, result.status
96
    );
97
    let max_jitter = service_to_run.jitter_value();
4✔
98

99
    let db_writer = db.write().await;
4✔
100
    entities::service_check_history::Model::from_service_check_result(service_check.id, &result)
11✔
101
        .into_active_model()
102
        .insert(&*db_writer)
4✔
103
        .await?;
8✔
104
    let mut model = service_check.clone().into_active_model();
1✔
105
    model.last_check.set_if_not_equals(chrono::Utc::now());
1✔
106
    model.status.set_if_not_equals(result.status);
1✔
107

108
    // get a number between 0 and jitter
109
    let jitter: i64 = (0..max_jitter).choose(&mut rand::thread_rng()).unwrap_or(0) as i64;
1✔
110

111
    let next_check: DateTime<Utc> = Cron::new(&service.cron_schedule)
3✔
112
        .parse()
113
        .inspect_err(|err| {
1✔
NEW
114
            error!(
×
115
                "Failed to parse cron schedule while setting next occurrence of {:?}: {:?}",
116
                model.id, err
117
            );
118
        })?
119
        .find_next_occurrence(&chrono::Utc::now(), false)?
1✔
120
        + chrono::Duration::seconds(jitter);
2✔
121
    model.next_check.set_if_not_equals(next_check);
1✔
122

123
    if model.is_changed() {
2✔
124
        debug!("Saving {:?}", model);
10✔
125
        model.save(&*db_writer).await.map_err(|err| {
3✔
126
            error!("{} error saving {:?}", service.id.hyphenated(), err);
×
127
            Error::from(err)
×
128
        })?;
129
    } else {
NEW
130
        warn!("set_last_check with no change? {:?}", model);
×
131
    }
132
    drop(db_writer);
1✔
133

134
    Ok(())
1✔
135
}
136

137
#[instrument(level = "DEBUG", skip_all, fields(service_check_id = %service_check.id, service_id = %service.id))]
138
async fn run_inner(
139
    db: Arc<RwLock<DatabaseConnection>>,
140
    service_check: entities::service_check::Model,
141
    service: entities::service::Model,
142
    checks_run_since_startup: Arc<Counter<u64>>,
143
) -> Result<(), Error> {
144
    let sc_id = service_check.id.hyphenated().to_string();
×
145
    if let Err(err) = run_service_check(db.clone(), &service_check, service).await {
×
146
        error!("Failed to run service_check {} error={:?}", sc_id, err);
×
147

NEW
148
        let db_writer: tokio::sync::RwLockWriteGuard<'_, DatabaseConnection> = db.write().await;
×
149
        if let Some(service_check) = entities::service_check::Entity::find()
×
150
            .filter(entities::service_check::Column::Id.eq(&sc_id))
×
151
            .one(&*db_writer)
×
152
            .await
×
153
            .map_err(Error::from)?
×
154
        {
NEW
155
            service_check
×
NEW
156
                .set_status(ServiceStatus::Error, &db_writer)
×
NEW
157
                .await?;
×
158
        } else {
NEW
159
            error!(
×
160
                "Trying to set error status but couldn't find service check {}",
161
                sc_id
162
            );
163
        }
164
        drop(db_writer);
×
165

166
        checks_run_since_startup.add(
×
167
            1,
168
            &[
×
169
                KeyValue::new("result", ToString::to_string(&ServiceStatus::Error)),
×
170
                KeyValue::new("id", sc_id),
×
171
            ],
172
        );
173
    } else {
174
        checks_run_since_startup.add(
×
175
            1,
176
            &[
×
177
                KeyValue::new("result", ToString::to_string(&ServiceStatus::Ok)),
×
178
                KeyValue::new("id", sc_id),
×
179
            ],
180
        );
181
    }
182
    Ok(())
×
183
}
184

185
#[cfg(not(tarpaulin_include))]
186
/// Loop around and do the checks, keeping it to a limit based on `max_permits`
187
pub async fn run_check_loop(
188
    db: Arc<RwLock<DatabaseConnection>>,
189
    max_permits: usize,
190
    metrics_meter: Arc<Meter>,
191
) -> Result<(), Error> {
192
    // Create a Counter Instrument.
193

194
    let checks_run_since_startup = metrics_meter
195
        .u64_counter("checks_run_since_startup")
196
        .build();
197
    let checks_run_since_startup = Arc::new(checks_run_since_startup);
198

199
    let mut backoff: std::time::Duration = DEFAULT_BACKOFF;
200
    // Limit to n concurrent tasks
201
    let semaphore = Arc::new(Semaphore::new(max_permits));
202
    info!("Max concurrent tasks set to {}", max_permits);
203
    loop {
204
        while semaphore.available_permits() == 0 {
205
            warn!("No spare task slots, something might be running slow!");
206
            tokio::time::sleep(backoff).await;
207
        }
208
        match semaphore.clone().acquire_owned().await {
209
            Ok(permit) => {
210
                let db_lock = db.write().await;
211
                let next_service = get_next_service_check(&db_lock).await?;
212

213
                if let Some((service_check, service)) = next_service {
214
                    // set the service_check to running
215
                    service_check
216
                        .set_status(ServiceStatus::Checking, &db_lock)
217
                        .await?;
218
                    drop(db_lock);
219
                    tokio::spawn(run_inner(
220
                        db.clone(),
221
                        service_check,
222
                        service,
223
                        checks_run_since_startup.clone(),
224
                    ));
225
                    // we did a thing, so we can reset the back-off time, because there might be another
226
                    backoff = DEFAULT_BACKOFF;
227
                } else {
228
                    // didn't get a task, increase backoff a little, but don't overflow the max
229
                    backoff += DEFAULT_BACKOFF;
230
                    if backoff > MAX_BACKOFF {
231
                        backoff = MAX_BACKOFF;
232
                    }
233
                    drop(db_lock);
234
                }
235
                drop(permit); // Release the semaphore when the task is done
236
            }
237
            Err(err) => {
238
                error!("Failed to acquire semaphore permit: {:?}", err);
239
                // something went wrong so we want to chill a bit
240
                backoff = std::cmp::max(MAX_BACKOFF / 2, DEFAULT_BACKOFF);
241
            }
242
        };
243
    }
244
}
245

246
#[cfg(test)]
247
mod tests {
248
    use entities::service_check;
249

250
    use super::*;
251
    use crate::db::tests::test_setup;
252

253
    #[tokio::test]
254
    async fn test_run_service_check() {
255
        let (db, _config) = test_setup().await.expect("Failed to setup test");
256

257
        let db_lock = db.write().await;
258

259
        let service = entities::service::Entity::find()
260
            .filter(entities::service::Column::ServiceType.eq(ServiceType::Ping))
261
            .one(&*db_lock)
262
            .await
263
            .expect("Failed to query ping service")
264
            .expect("Failed to find ping service");
265

266
        let service_check = service_check::Entity::find()
267
            .filter(service_check::Column::ServiceId.eq(service.id))
268
            .one(&*db_lock)
269
            .await
270
            .expect("Failed to query service check")
271
            .expect("Failed to find service check");
272
        drop(db_lock);
273

274
        run_service_check(db.clone(), &service_check, service)
275
            .await
276
            .expect("Failed to run service check");
277
    }
278

279
    #[tokio::test]
280
    async fn test_run_pending_service_check() {
281
        let (db, _config) = test_setup().await.expect("Failed to setup test");
282

283
        let db_writer = db.write().await;
284

285
        service_check::Entity::update_many()
286
            .col_expr(
287
                service_check::Column::Status,
288
                Expr::value(ServiceStatus::Pending),
289
            )
290
            .exec(&*db_writer)
291
            .await
292
            .expect("Failed to update service checks to pending");
293

294
        let service = entities::service::Entity::find()
295
            .filter(entities::service::Column::ServiceType.eq(ServiceType::Ping))
296
            .one(&*db_writer)
297
            .await
298
            .expect("Failed to query ping service")
299
            .expect("Failed to find ping service");
300

301
        let service_check = service_check::Entity::find()
302
            .filter(service_check::Column::ServiceId.eq(service.id))
303
            .one(&*db_writer)
304
            .await
305
            .expect("Failed to query service check")
306
            .expect("Failed to find service check");
307

308
        drop(db_writer);
309
        dbg!(&service, &service_check);
310

311
        run_service_check(db.clone(), &service_check, service)
312
            .await
313
            .expect("Failed to run service check");
314
    }
315
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc