• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

heathcliff26 / cerberus-mergeguard / 19329024062

13 Nov 2025 10:50AM UTC coverage: 70.605% (-0.1%) from 70.703%
19329024062

push

github

heathcliff26
🔄 synced file(s) with heathcliff26/ci

723 of 1024 relevant lines covered (70.61%)

2.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.0
/src/server.rs
1
use crate::{
2
    client::Client,
3
    error::Error,
4
    types::{CheckRunEvent, IssueCommentEvent, PullRequestEvent},
5
};
6
use axum::{
7
    Json, Router,
8
    extract::State,
9
    http::{HeaderMap, HeaderValue, StatusCode},
10
    routing::{get, post},
11
};
12
use hmac::{Hmac, Mac};
13
use serde::{Deserialize, Serialize};
14
use std::net::SocketAddr;
15
use std::sync::Arc;
16
use tokio::{net::TcpListener, signal, sync::Mutex, time::Duration};
17
use tower_http::trace::TraceLayer;
18
use tracing::{debug, error, info, warn};
19

20
mod hex;
21
#[cfg(test)]
22
mod test;
23
mod tls;
24

25
pub const SERVER_STATUS_OK: &str = "ok";
26
pub const SERVER_STATUS_ERROR: &str = "error";
27
pub const SERVER_MESSAGE_OK: &str = "Server is running fine";
28

29
/// Options for the http server
30
#[derive(Serialize, Deserialize, Debug)]
31
#[serde(default, rename_all = "kebab-case")]
32
pub struct ServerOptions {
33
    /// Port to bind to, defaults to 8080
34
    #[serde(default = "default_port")]
35
    pub port: u16,
36

37
    /// Optional ssl configuration for the server
38
    pub ssl: SSLOptions,
39

40
    /// Shared webhook secret for verifying the webhook sender
41
    pub webhook_secret: Option<String>,
42

43
    /// Refresh check runs periodically instead of on every webhook event
44
    /// This is useful for reducing the number of API calls to GitHub.
45
    /// When set to zero, periodic refresh is disabled.
46
    /// Unit is in seconds.
47
    #[serde(default = "Default::default")]
48
    pub periodic_refresh: u64,
49
}
50

51
fn default_port() -> u16 {
10✔
52
    8080
10✔
53
}
10✔
54

55
impl ServerOptions {
56
    /// Validate the server options
57
    pub fn validate(&self) -> Result<(), &'static str> {
5✔
58
        if self.port == 0 {
5✔
59
            return Err("Port can't be 0");
×
60
        }
5✔
61
        self.ssl.validate()
5✔
62
    }
5✔
63
}
64

65
impl Default for ServerOptions {
66
    fn default() -> Self {
8✔
67
        Self {
8✔
68
            port: default_port(),
8✔
69
            webhook_secret: std::env::var("CERBERUS_WEBHOOK_SECRET").ok(),
8✔
70
            ssl: SSLOptions::default(),
8✔
71
            periodic_refresh: 0,
8✔
72
        }
8✔
73
    }
8✔
74
}
75

76
/// SSL configuration for the server
77
#[derive(Serialize, Deserialize, Debug, Default)]
78
#[serde(default)]
79
pub struct SSLOptions {
80
    /// Whether to enable SSL, defaults to false
81
    pub enabled: bool,
82
    /// Path to the SSL private key file
83
    pub key: String,
84
    /// Path to the SSL certificate file
85
    pub cert: String,
86
}
87

88
impl SSLOptions {
89
    /// Validate the SSL options
90
    pub fn validate(&self) -> Result<(), &'static str> {
5✔
91
        if !self.enabled {
5✔
92
            return Ok(());
5✔
93
        }
×
94
        if self.key.is_empty() || self.cert.is_empty() {
×
95
            return Err("Incomplete SSL configuration: cert and key must be set if SSL is enabled");
×
96
        }
×
97
        Ok(())
×
98
    }
5✔
99
}
100

101
/// Job for refreshing check runs
102
#[derive(Debug, Ord, PartialEq, PartialOrd, Eq)]
103
struct Job {
104
    app_installation_id: u64,
105
    repo: String,
106
    commit: String,
107
}
108

109
/// HTTP Server for receiving webhook events from GitHub
110
pub struct Server {
111
    options: ServerOptions,
112
}
113

114
#[derive(Clone)]
115
struct ServerState {
116
    webhook_secret: Option<String>,
117
    github: Arc<Client>,
118
    job_queue: Arc<Mutex<Vec<Job>>>,
119
    use_job_queue: bool,
120
}
121

122
impl ServerState {
123
    /// Create a new server state with the given webhook secret and GitHub client
124
    fn new(webhook_secret: Option<String>, github: Client) -> Self {
9✔
125
        let github = Arc::new(github);
9✔
126
        Self {
9✔
127
            webhook_secret,
9✔
128
            github,
9✔
129
            job_queue: Arc::new(Mutex::new(Vec::new())),
9✔
130
            use_job_queue: false,
9✔
131
        }
9✔
132
    }
9✔
133

134
    /// Create a new pending job and add it to the job queue
135
    async fn new_job(&self, app_installation_id: u64, repo: &str, commit: &str) {
2✔
136
        let job = Job {
2✔
137
            app_installation_id,
2✔
138
            repo: repo.to_string(),
2✔
139
            commit: commit.to_string(),
2✔
140
        };
2✔
141
        let mut job_queue = self.job_queue.lock().await;
2✔
142
        job_queue.push(job);
2✔
143
    }
2✔
144

145
    /// Start a background task that periodically runs all jobs in the queue
146
    fn periodically_run_job_queue(&mut self, period: u64) {
1✔
147
        let job_queue = self.job_queue.clone();
1✔
148
        let github = self.github.clone();
1✔
149

150
        info!(
1✔
151
            "Periodic refresh of check runs enabled with a period of {} seconds",
×
152
            period,
153
        );
154

155
        self.use_job_queue = true;
1✔
156
        tokio::spawn(async move {
1✔
157
            let period = Duration::from_secs(period);
1✔
158
            loop {
159
                tokio::time::sleep(period).await;
2✔
160

161
                let mut job_queue = job_queue.lock().await;
1✔
162
                if job_queue.is_empty() {
1✔
163
                    continue;
×
164
                }
1✔
165

166
                deduplicate_jobs(job_queue.as_mut());
1✔
167

168
                info!("Running {} jobs in the queue", job_queue.len());
1✔
169

170
                for job in job_queue.drain(..) {
1✔
171
                    if let Err(e) = github
1✔
172
                        .refresh_check_run_status(job.app_installation_id, &job.repo, &job.commit)
1✔
173
                        .await
1✔
174
                    {
175
                        error!(
×
176
                            "Failed to refresh check run status for job: '{}' - '{}': {}",
×
177
                            job.repo, job.commit, e
178
                        );
179
                    }
1✔
180
                }
181
            }
182
        });
183
    }
1✔
184
}
185

186
impl Server {
187
    /// Create a new server with the given options and GitHub client
188
    pub fn new(options: ServerOptions) -> Self {
3✔
189
        Self { options }
3✔
190
    }
3✔
191

192
    /// Run the server
193
    /// Server will shutdown gracefully on Ctrl+C or SIGTERM
194
    pub async fn run(&self, github: Client) -> Result<(), Error> {
3✔
195
        let mut state = ServerState::new(self.options.webhook_secret.clone(), github);
3✔
196
        if self.options.periodic_refresh > 0 {
3✔
197
            state.periodically_run_job_queue(self.options.periodic_refresh);
×
198
        }
3✔
199
        let router = new_router(state);
3✔
200

201
        let addr = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], self.options.port));
3✔
202
        info!("Starting server on {}", addr);
3✔
203

204
        if self.options.ssl.enabled {
3✔
205
            let listener =
×
206
                tls::TlsListener::bind(addr, &self.options.ssl.key, &self.options.ssl.cert)
×
207
                    .await
×
208
                    .map_err(|e| Error::BindPort(Box::new(e)))?;
×
209

210
            axum::serve(listener, router)
×
211
                .with_graceful_shutdown(shutdown_signal())
×
212
                .await
×
213
                .map_err(Error::Serve)
×
214
        } else {
215
            let listener = TcpListener::bind(addr)
3✔
216
                .await
3✔
217
                .map_err(|e| Error::BindPort(Box::new(e)))?;
3✔
218

219
            axum::serve(listener, router)
3✔
220
                .with_graceful_shutdown(shutdown_signal())
3✔
221
                .await
3✔
222
                .map_err(Error::Serve)
×
223
        }
224
    }
×
225
}
226

227
fn new_router(state: ServerState) -> Router {
3✔
228
    let webhook_router: Router = Router::new()
3✔
229
        .route("/webhook", post(webhook_handler))
3✔
230
        .with_state(state)
3✔
231
        .layer(TraceLayer::new_for_http());
3✔
232

233
    // Do not use tracing for the health check endpoint
234
    let health_router: Router = Router::new().route("/healthz", get(healthz));
3✔
235

236
    Router::new().merge(webhook_router).merge(health_router)
3✔
237
}
3✔
238

239
/// Expose health check endpoint
240
/// Can be used when running under kubernetes to check if the server is running
241
/// GET /healthz
242
async fn healthz() -> (StatusCode, Json<Response>) {
×
243
    (StatusCode::OK, Json(Response::new()))
×
244
}
×
245

246
/// Handle the webhook events send from GitHub
247
/// POST /webhook
248
async fn webhook_handler(
7✔
249
    headers: HeaderMap,
7✔
250
    state: State<ServerState>,
7✔
251
    payload: String,
7✔
252
) -> (StatusCode, Json<Response>) {
7✔
253
    let event = match headers.get("X-GitHub-Event") {
7✔
254
        Some(event) => event
7✔
255
            .to_str()
7✔
256
            .unwrap_or("could not read X-GitHub-Event header"),
7✔
257
        None => {
258
            return (
×
259
                StatusCode::BAD_REQUEST,
×
260
                Json(Response::error("Missing X-GitHub-Event header")),
×
261
            );
×
262
        }
263
    };
264
    debug!("Received webhook event: {}", event);
7✔
265
    if let Err(e) = verify_webhook(
7✔
266
        headers.get("X-Hub-Signature-256"),
7✔
267
        state.webhook_secret.as_deref(),
7✔
268
        &payload,
7✔
269
    ) {
7✔
270
        warn!("Failed to verify webhook signature: {}", e.1.message);
×
271
        return e;
×
272
    }
7✔
273

274
    match event {
7✔
275
        "check_run" => handle_check_run_event(state.0, &payload).await,
7✔
276
        "pull_request" => handle_pull_request_event(&state.github, &payload).await,
4✔
277
        "issue_comment" => handle_issue_comment_event(&state.github, &payload).await,
3✔
278
        "check_suite" => (StatusCode::OK, Json(Response::new())), // Ignore check_suite events
1✔
279
        event => {
×
280
            let message = format!("Received unsupported event: {event}");
×
281
            info!("{message}");
×
282
            (StatusCode::NOT_IMPLEMENTED, Json(Response::error(&message)))
×
283
        }
284
    }
285
}
7✔
286

287
/// Verify the webhook request against the shared secret
288
fn verify_webhook(
13✔
289
    signature: Option<&HeaderValue>,
13✔
290
    secret: Option<&str>,
13✔
291
    payload: &str,
13✔
292
) -> Result<(), (StatusCode, Json<Response>)> {
13✔
293
    let secret = match secret {
13✔
294
        Some(s) => s,
4✔
295
        None => {
296
            return Ok(());
9✔
297
        }
298
    };
299

300
    let signature = match signature {
4✔
301
        Some(s) => s.to_str().map_err(|e| {
3✔
302
            info!("Failed to read X-Hub-Signature-256 header: {e}");
×
303
            (
×
304
                StatusCode::FORBIDDEN,
×
305
                Json(Response::error("Invalid X-Hub-Signature-256 header")),
×
306
            )
×
307
        })?,
×
308
        None => {
309
            return Err((
1✔
310
                StatusCode::FORBIDDEN,
1✔
311
                Json(Response::error("Missing X-Hub-Signature-256 header")),
1✔
312
            ));
1✔
313
        }
314
    };
315
    let signature = signature.strip_prefix("sha256=").unwrap_or(signature);
3✔
316
    let signature = hex::decode_hex(signature).map_err(|_| {
3✔
317
        (
1✔
318
            StatusCode::FORBIDDEN,
1✔
319
            Json(Response::error("Invalid X-Hub-Signature-256 header")),
1✔
320
        )
1✔
321
    })?;
1✔
322

323
    let mut mac = Hmac::<sha2::Sha256>::new_from_slice(secret.as_bytes()).map_err(|e| {
2✔
324
        error!("Failed to create HMAC from secret: {e}");
×
325
        (
×
326
            StatusCode::INTERNAL_SERVER_ERROR,
×
327
            Json(Response::error("Failed to create HMAC from secret")),
×
328
        )
×
329
    })?;
×
330
    mac.update(payload.as_bytes());
2✔
331

332
    mac.verify_slice(signature.as_slice()).map_err(|_| {
2✔
333
        (
1✔
334
            StatusCode::FORBIDDEN,
1✔
335
            Json(Response::error("Invalid webhook signature")),
1✔
336
        )
1✔
337
    })?;
1✔
338

339
    Ok(())
1✔
340
}
13✔
341

342
/// Handle webhook pull_request events
343
async fn handle_pull_request_event(client: &Client, payload: &str) -> (StatusCode, Json<Response>) {
1✔
344
    let payload: PullRequestEvent = match serde_json::from_str(payload) {
1✔
345
        Ok(event) => event,
1✔
346
        Err(e) => {
×
347
            warn!("Failed to parse pull_request event payload: {e}");
×
348
            return (
×
349
                StatusCode::BAD_REQUEST,
×
350
                Json(Response::error("Invalid pull_request event payload")),
×
351
            );
×
352
        }
353
    };
354

355
    match payload.action.as_str() {
1✔
356
        "opened" | "synchronize" => {}
1✔
357
        action => {
×
358
            debug!("Ignoring pull_request event with action: {action}");
×
359
            return (StatusCode::OK, Json(Response::new()));
×
360
        }
361
    }
362

363
    let app_id = match payload.installation {
1✔
364
        Some(installation) => installation.id,
1✔
365
        None => {
366
            warn!("Missing app installation id in pull_request event");
×
367
            return (
×
368
                StatusCode::BAD_REQUEST,
×
369
                Json(Response::error("Missing app installation id")),
×
370
            );
×
371
        }
372
    };
373

374
    if let Err(e) = client
1✔
375
        .create_check_run(
1✔
376
            app_id,
1✔
377
            &payload.repository.full_name,
1✔
378
            &payload.pull_request.head.sha,
1✔
379
        )
380
        .await
1✔
381
    {
382
        error!("Failed to create check run: {e}");
×
383
        return (
×
384
            StatusCode::INTERNAL_SERVER_ERROR,
×
385
            Json(Response::error("Failed to create check-run")),
×
386
        );
×
387
    };
1✔
388
    info!(
1✔
389
        "Created check run for pull request {} - {}",
1✔
390
        payload.repository.full_name, payload.pull_request.number
391
    );
392
    (StatusCode::OK, Json(Response::new()))
1✔
393
}
1✔
394

395
/// Handle webhook check_run events
396
async fn handle_check_run_event(state: ServerState, payload: &str) -> (StatusCode, Json<Response>) {
4✔
397
    let payload: CheckRunEvent = match serde_json::from_str(payload) {
4✔
398
        Ok(event) => event,
4✔
399
        Err(e) => {
×
400
            warn!("Failed to parse check_run event payload: {e}");
×
401
            return (
×
402
                StatusCode::BAD_REQUEST,
×
403
                Json(Response::error("Invalid check_run event payload")),
×
404
            );
×
405
        }
406
    };
407

408
    if payload
4✔
409
        .check_run
4✔
410
        .app
4✔
411
        .is_some_and(|app| app.client_id == state.github.client_id())
4✔
412
    {
413
        debug!("Ignoring check_run event from our own app");
2✔
414
        return (StatusCode::OK, Json(Response::new()));
2✔
415
    }
2✔
416

417
    let app_id = match payload.installation {
2✔
418
        Some(installation) => installation.id,
2✔
419
        None => {
420
            warn!("Missing app installation id in check_run event");
×
421
            return (
×
422
                StatusCode::BAD_REQUEST,
×
423
                Json(Response::error("Missing app installation id")),
×
424
            );
×
425
        }
426
    };
427

428
    if state.use_job_queue {
2✔
429
        state
1✔
430
            .new_job(
1✔
431
                app_id,
1✔
432
                &payload.repository.full_name,
1✔
433
                &payload.check_run.head_sha,
1✔
434
            )
1✔
435
            .await;
1✔
436
        return (StatusCode::OK, Json(Response::new()));
1✔
437
    }
1✔
438

439
    match state
1✔
440
        .github
1✔
441
        .refresh_check_run_status(
1✔
442
            app_id,
1✔
443
            &payload.repository.full_name,
1✔
444
            &payload.check_run.head_sha,
1✔
445
        )
1✔
446
        .await
1✔
447
    {
448
        Ok(_) => (StatusCode::OK, Json(Response::new())),
1✔
449
        Err(e) => {
×
450
            error!("Failed to refresh check-run status: {e}");
×
451
            (
×
452
                StatusCode::INTERNAL_SERVER_ERROR,
×
453
                Json(Response::error("Failed to refresh check-run status")),
×
454
            )
×
455
        }
456
    }
457
}
4✔
458

459
/// Handle webhook issue_comment events
460
async fn handle_issue_comment_event(
2✔
461
    client: &Client,
2✔
462
    payload: &str,
2✔
463
) -> (StatusCode, Json<Response>) {
2✔
464
    let payload: IssueCommentEvent = match serde_json::from_str(payload) {
2✔
465
        Ok(event) => event,
2✔
466
        Err(e) => {
×
467
            warn!("Failed to parse issue_comment event payload: {e}");
×
468
            return (
×
469
                StatusCode::BAD_REQUEST,
×
470
                Json(Response::error("Invalid issue_comment event payload")),
×
471
            );
×
472
        }
473
    };
474

475
    let app_id = match payload.installation {
2✔
476
        Some(installation) => installation.id,
2✔
477
        None => {
478
            warn!("Missing app installation id in issue_comment event");
×
479
            return (
×
480
                StatusCode::BAD_REQUEST,
×
481
                Json(Response::error("Missing app installation id")),
×
482
            );
×
483
        }
484
    };
485

486
    if payload.action != "created" {
2✔
487
        debug!(
×
488
            "Ignoring issue_comment event with action: {}",
×
489
            payload.action
490
        );
491
        return (StatusCode::OK, Json(Response::new()));
×
492
    }
2✔
493

494
    if !payload.comment.body.contains("/cerberus refresh") {
2✔
495
        debug!("Ignoring issue comment without '/cerberus' command");
1✔
496
        return (StatusCode::OK, Json(Response::new()));
1✔
497
    }
1✔
498
    info!(
1✔
499
        "Received issue_comment event for issue {}: {}",
×
500
        payload.issue.number, payload.comment.body
501
    );
502

503
    let commit = match client
1✔
504
        .get_pull_request_head_commit(app_id, &payload.repository.full_name, payload.issue.number)
1✔
505
        .await
1✔
506
    {
507
        Ok(commit) => commit,
1✔
508
        Err(e) => {
×
509
            error!("Failed to get pull request head commit: {e}");
×
510
            return (
×
511
                StatusCode::INTERNAL_SERVER_ERROR,
×
512
                Json(Response::error("Failed to get pull request head commit")),
×
513
            );
×
514
        }
515
    };
516

517
    if let Err(e) = client
1✔
518
        .refresh_check_run_status(app_id, &payload.repository.full_name, &commit)
1✔
519
        .await
1✔
520
    {
521
        error!("Failed to refresh check-run status: {e}");
×
522
        return (
×
523
            StatusCode::INTERNAL_SERVER_ERROR,
×
524
            Json(Response::error("Failed to refresh check-run status")),
×
525
        );
×
526
    }
1✔
527

528
    (StatusCode::OK, Json(Response::new()))
1✔
529
}
2✔
530

531
/// Detailed status of the Webserver
532
#[derive(Debug, Serialize, Deserialize)]
533
pub struct Response {
534
    /// Status of the server.
535
    /// "ok" if everything is running fine, "error" if something is wrong.
536
    pub status: String,
537
    /// Optional message providing more details about the status.
538
    pub message: String,
539
}
540

541
impl Response {
542
    /// Create a new response with ok status.
543
    pub fn new() -> Self {
8✔
544
        Self {
8✔
545
            status: SERVER_STATUS_OK.to_string(),
8✔
546
            message: SERVER_MESSAGE_OK.to_string(),
8✔
547
        }
8✔
548
    }
8✔
549

550
    /// Create a new response with the error status.
551
    pub fn error(message: &str) -> Self {
6✔
552
        Self {
6✔
553
            status: SERVER_STATUS_ERROR.to_string(),
6✔
554
            message: message.to_string(),
6✔
555
        }
6✔
556
    }
6✔
557
}
558

559
/// Asynchronously wait for a shutdown signal (Ctrl+C or SIGTERM).
560
async fn shutdown_signal() {
3✔
561
    let ctrl_c = async {
3✔
562
        tokio::signal::ctrl_c()
3✔
563
            .await
3✔
564
            .expect("failed to install Ctrl+C handler");
×
565
    };
×
566

567
    #[cfg(unix)]
568
    let terminate = async {
3✔
569
        signal::unix::signal(signal::unix::SignalKind::terminate())
3✔
570
            .expect("failed to install signal handler")
3✔
571
            .recv()
3✔
572
            .await;
3✔
573
    };
×
574

575
    #[cfg(not(unix))]
576
    let terminate = std::future::pending::<()>();
577

578
    tokio::select! {
3✔
579
        _ = ctrl_c => {},
3✔
580
        _ = terminate => {},
3✔
581
    }
582
}
×
583

584
/// Remove duplicates from job queue
585
fn deduplicate_jobs(job_queue: &mut Vec<Job>) {
2✔
586
    job_queue.sort();
2✔
587
    job_queue.dedup();
2✔
588
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc