• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

yaleman / maremma / #272

02 Nov 2025 10:24AM UTC coverage: 77.069% (+2.2%) from 74.826%
#272

push

web-flow
chore(deps): bump actions/download-artifact from 5 to 6 (#187)

Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 5 to 6.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v5...v6)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

2198 of 2852 relevant lines covered (77.07%)

2.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.39
/src/check_loop.rs
1
//! Runs the service checks on a loop
2

3
use crate::db::get_next_service_check;
4
use crate::prelude::*;
5
use opentelemetry::metrics::Counter;
6
use opentelemetry::KeyValue;
7
use rand::seq::IteratorRandom;
8
use std::str::FromStr;
9
use tokio::sync::Semaphore;
10

11
const DEFAULT_BACKOFF: std::time::Duration = tokio::time::Duration::from_millis(50);
12
const MAX_BACKOFF: std::time::Duration = tokio::time::Duration::from_secs(1);
13

14
#[derive(Clone, Debug)]
15
/// The end result of a service check
16
pub struct CheckResult {
17
    /// When the check finished
18
    pub timestamp: chrono::DateTime<Utc>,
19
    /// How long it took
20
    pub time_elapsed: Duration,
21
    /// The result
22
    pub status: ServiceStatus,
23
    /// Any explanatory/returned text
24
    pub result_text: String,
25
}
26

27
impl Default for CheckResult {
28
    fn default() -> Self {
1✔
29
        Self {
30
            timestamp: chrono::Utc::now(),
1✔
31
            time_elapsed: Duration::zero(),
2✔
32
            status: ServiceStatus::Unknown,
33
            result_text: String::new(),
2✔
34
        }
35
    }
36
}
37

38
#[instrument(level = "INFO", skip_all, fields(service_check_id=%service_check.id, service_id=%service.id))]
39
/// Does what it says on the tin
40
pub(crate) async fn run_service_check(
41
    db: Arc<RwLock<DatabaseConnection>>,
42
    service_check: &entities::service_check::Model,
43
    service: entities::service::Model,
44
) -> Result<(), Error> {
45
    let db_writer = db.write().await;
2✔
46
    let check = match Service::try_from_service_model(&service, &db_writer).await {
4✔
47
        Ok(check) => check,
1✔
48
        Err(err) => {
×
49
            let errmsg = format!(
×
50
                "Failed to convert service check {} to service: {:?}",
51
                service_check.id, err
52
            );
53
            error!(errmsg);
×
54
            return Err(Error::Generic(errmsg));
×
55
        }
56
    };
57

58
    let host: entities::host::Model = match service_check
5✔
59
        .find_related(entities::host::Entity)
3✔
60
        .one(&*db_writer)
3✔
61
        .await?
4✔
62
    {
63
        Some(host) => host,
1✔
64
        None => {
65
            error!(
×
66
                "Failed to get host for service check: {:?}",
67
                service_check.id
68
            );
69
            return Err(Error::HostNotFound(service_check.host_id));
×
70
        }
71
    };
72

73
    #[cfg(not(tarpaulin_include))]
74
    let service_to_run = check.config().ok_or_else(|| {
75
        error!(
76
            "Failed to get service config for {}",
77
            service.id.hyphenated()
78
        );
79
        Error::ServiceConfigNotFound(service.id.hyphenated().to_string())
80
    })?;
81
    drop(db_writer);
1✔
82

83
    debug!("Starting service_check={:?}", service_check);
2✔
84
    let start_time = chrono::Utc::now();
1✔
85
    // Here we actually run the check
86
    let result = match service_to_run.run(&host).await {
3✔
87
        Ok(val) => val,
×
88
        Err(err) => CheckResult {
89
            status: ServiceStatus::Error,
90
            time_elapsed: chrono::Utc::now() - start_time,
2✔
91
            result_text: format!("Error: {err:?}"),
1✔
92
            ..Default::default()
93
        },
94
    };
95
    debug!(
6✔
96
        "Completed service_check={:?} result={:?}",
97
        service_check, result.status
98
    );
99
    let max_jitter = service_to_run.jitter_value();
3✔
100

101
    let db_writer = db.write().await;
2✔
102
    entities::service_check_history::Model::from_service_check_result(service_check.id, &result)
8✔
103
        .into_active_model()
104
        .insert(&*db_writer)
4✔
105
        .await?;
8✔
106

107
    // // explicitly set status to work around the weird issues that are happening
108
    // service_check
109
    //     .set_status(result.status, &db_writer)
110
    //     .await
111
    //     .inspect_err(|err| error!("Failed to set service status at end of check: {:?}", err))?;
112

113
    let mut model = service_check.clone().into_active_model();
1✔
114
    model.last_check.set_if_not_equals(chrono::Utc::now());
1✔
115
    model.status.set_if_not_equals(result.status);
1✔
116

117
    // get a number between 0 and jitter
118
    let jitter: i64 = (0..max_jitter).choose(&mut rand::rng()).unwrap_or(0) as i64;
1✔
119

120
    let next_check: DateTime<Utc> = Cron::from_str(&service.cron_schedule)
4✔
121
        .inspect_err(|err| {
1✔
122
            error!(
×
123
                "Failed to parse cron schedule while setting next occurrence of {:?}: {:?}",
124
                model.id, err
125
            );
126
        })?
127
        .find_next_occurrence(&chrono::Utc::now(), false)
2✔
128
        .inspect_err(|err| {
1✔
129
            error!(
×
130
                "Failed to get next occurrence for check {}! {err:?}",
131
                service_check.id
132
            )
133
        })?
134
        + chrono::Duration::seconds(jitter);
2✔
135
    model.next_check.set_if_not_equals(next_check);
1✔
136

137
    model.update(&*db_writer).await.inspect_err(|err| {
2✔
138
        error!("{} error saving {:?}", service.id.hyphenated(), err);
×
139
    })?;
140

141
    drop(db_writer);
1✔
142

143
    Ok(())
1✔
144
}
145

146
#[instrument(level = "DEBUG", skip_all, fields(service_check_id = %service_check.id, service_id = %service.id))]
147
async fn run_inner(
148
    db: Arc<RwLock<DatabaseConnection>>,
149
    service_check: entities::service_check::Model,
150
    service: entities::service::Model,
151
    checks_run_since_startup: Arc<Counter<u64>>,
152
) -> Result<(), Error> {
153
    let sc_id = service_check.id.hyphenated().to_string();
×
154
    match run_service_check(db.clone(), &service_check, service).await {
×
155
        Err(err) => {
×
156
            error!("Failed to run service_check {} error={:?}", sc_id, err);
×
157

158
            let db_writer: tokio::sync::RwLockWriteGuard<'_, DatabaseConnection> = db.write().await;
×
159
            match entities::service_check::Entity::find()
×
160
                .filter(entities::service_check::Column::Id.eq(&sc_id))
×
161
                .one(&*db_writer)
×
162
                .await
×
163
                .map_err(Error::from)?
×
164
            {
165
                Some(service_check) => {
×
166
                    service_check
×
167
                        .set_status(ServiceStatus::Error, &db_writer)
×
168
                        .await?;
×
169
                }
170
                _ => {
171
                    error!(
×
172
                        "Trying to set error status but couldn't find service check {}",
173
                        sc_id
174
                    );
175
                }
176
            }
177
            drop(db_writer);
×
178

179
            checks_run_since_startup.add(
×
180
                1,
181
                &[
×
182
                    KeyValue::new("result", ToString::to_string(&ServiceStatus::Error)),
×
183
                    KeyValue::new("id", sc_id),
×
184
                ],
185
            );
186
        }
187
        Ok(_) => {
188
            checks_run_since_startup.add(
×
189
                1,
190
                &[
×
191
                    KeyValue::new("result", ToString::to_string(&ServiceStatus::Ok)),
×
192
                    KeyValue::new("id", sc_id),
×
193
                ],
194
            );
195
        }
196
    };
197

198
    Ok(())
×
199
}
200

201
#[cfg(not(tarpaulin_include))]
202
/// Loop around and do the checks, keeping it to a limit based on `max_permits`
203
pub async fn run_check_loop(
204
    db: Arc<RwLock<DatabaseConnection>>,
205
    max_permits: usize,
206
    metrics_meter: Arc<Meter>,
207
) -> Result<(), Error> {
208
    // Create a Counter Instrument.
209

210
    use std::cmp::min;
211

212
    let checks_run_since_startup = metrics_meter
213
        .u64_counter("checks_run_since_startup")
214
        .build();
215
    let checks_run_since_startup = Arc::new(checks_run_since_startup);
216

217
    let mut backoff: std::time::Duration = DEFAULT_BACKOFF;
218
    // Limit to n concurrent tasks
219
    let semaphore = Arc::new(Semaphore::new(max_permits));
220
    info!("Max concurrent tasks set to {}", max_permits);
221
    loop {
222
        while semaphore.available_permits() == 0 {
223
            warn!("No spare task slots, something might be running slow!");
224
            tokio::time::sleep(backoff).await;
225
        }
226
        match semaphore.clone().acquire_owned().await {
227
            Ok(permit) => {
228
                let db_lock = db.write().await;
229
                let next_service = get_next_service_check(&db_lock).await?;
230

231
                match next_service {
232
                    Some((service_check, service)) => {
233
                        // set the service_check to running
234
                        service_check
235
                            .set_status(ServiceStatus::Checking, &db_lock)
236
                            .await?;
237
                        drop(db_lock);
238
                        tokio::spawn(run_inner(
239
                            db.clone(),
240
                            service_check,
241
                            service,
242
                            checks_run_since_startup.clone(),
243
                        ));
244
                        // we did a thing, so we can reset the back-off time, because there might be another
245
                        backoff = DEFAULT_BACKOFF;
246
                    }
247
                    None => {
248
                        drop(db_lock);
249
                        // didn't get a task, increase backoff a little, but don't overflow the max
250
                        backoff = min(MAX_BACKOFF, backoff + DEFAULT_BACKOFF);
251
                    }
252
                };
253
                drop(permit); // Release the semaphore when the task is done
254
            }
255
            Err(err) => {
256
                error!("Failed to acquire semaphore permit: {:?}", err);
257
                // something went wrong so we want to chill a bit
258
                backoff = std::cmp::max(MAX_BACKOFF / 2, DEFAULT_BACKOFF);
259
            }
260
        };
261
    }
262
}
263

264
#[cfg(test)]
265
mod tests {
266
    use entities::service_check;
267

268
    use super::*;
269
    use crate::db::tests::test_setup;
270

271
    #[tokio::test]
272
    async fn test_run_service_check() {
273
        let (db, _config) = test_setup().await.expect("Failed to setup test");
274

275
        let db_lock = db.write().await;
276

277
        let service = entities::service::Entity::find()
278
            .filter(entities::service::Column::ServiceType.eq(ServiceType::Ping))
279
            .one(&*db_lock)
280
            .await
281
            .expect("Failed to query ping service")
282
            .expect("Failed to find ping service");
283

284
        let service_check = service_check::Entity::find()
285
            .filter(service_check::Column::ServiceId.eq(service.id))
286
            .one(&*db_lock)
287
            .await
288
            .expect("Failed to query service check")
289
            .expect("Failed to find service check");
290
        drop(db_lock);
291

292
        run_service_check(db.clone(), &service_check, service)
293
            .await
294
            .expect("Failed to run service check");
295
    }
296

297
    #[tokio::test]
298
    async fn test_run_pending_service_check() {
299
        let (db, _config) = test_setup().await.expect("Failed to setup test");
300

301
        let db_writer = db.write().await;
302

303
        service_check::Entity::update_many()
304
            .col_expr(
305
                service_check::Column::Status,
306
                Expr::value(ServiceStatus::Pending),
307
            )
308
            .exec(&*db_writer)
309
            .await
310
            .expect("Failed to update service checks to pending");
311

312
        let service = entities::service::Entity::find()
313
            .filter(entities::service::Column::ServiceType.eq(ServiceType::Ping))
314
            .one(&*db_writer)
315
            .await
316
            .expect("Failed to query ping service")
317
            .expect("Failed to find ping service");
318

319
        let service_check = service_check::Entity::find()
320
            .filter(service_check::Column::ServiceId.eq(service.id))
321
            .one(&*db_writer)
322
            .await
323
            .expect("Failed to query service check")
324
            .expect("Failed to find service check");
325

326
        drop(db_writer);
327
        dbg!(&service, &service_check);
328

329
        run_service_check(db.clone(), &service_check, service)
330
            .await
331
            .expect("Failed to run service check");
332
    }
333
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc