• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

divviup / divviup-api / 25874010908

14 May 2026 05:09PM UTC coverage: 58.423% (+1.2%) from 57.254%
25874010908

push

github

web-flow
Migrate from Trillium [part 8]: remove Trillium server, serve Axum directly (#2251)

Axum is now the primary HTTP listener. The Trillium server, which was a pure pass-through proxy since Part 7C, is removed from the request path.

Production changes:
- `build_app(Config) -> BuiltApp` replaces `DivviupApi` for production use
- `bin.rs` rewritten: Axum serves directly, monitoring server is a separate Axum router, graceful shutdown via tokio signals + CancellationToken
- `Queue` uses `CancellationToken` instead of Trillium's `Stopper`/`CloneCounterObserver`
- `telemetry.rs` and `trace.rs` handlers converted from Trillium to Axum
- `Config` gains `listen_address` field (from HOST/PORT env vars, default [::]:8080)
- Replace the Trillium-based static asset handler (`trillium-static-compiled` + `OriginRouter`) with an Axum middleware using `tower-http`'s `ServeDir` and `ServeFile`.

The middleware intercepts requests whose `Host` (or `X-Forwarded-Host`) matches the configured `app_url` and serves the React SPA with appropriate `cache-control` headers (`max-age=1year` for `/assets/*`, `no-cache` for everything else). Unmatched hosts pass through to the API routes.

Dead code removal:
- Deleted: `handler/logger.rs`, `handler/opentelemetry.rs`, `axum_proxy` test
- Removed `FromConn` impls from: `Db`, `User`, `PermissionsActor`, `AccountBearerToken`
- Removed Trillium `Handler` impls from: `Db`, `ErrorHandler`, `CorsHeaders`, `ReplaceMimeTypes`, `SessionStore`
- Unified `PermissionsActor::is_allowed`/`if_allowed` to use `http::Method`
- Removed deps: `trillium-compression`, `trillium-conn-id`, `trillium-forwarding`, `trillium-opentelemetry`, `trillium-prometheus`, `trillium-redirect`, `trillium-sessions`, `trillium-cookies`, `async-session`
- Added deps: `cookie` (key-expansion feature, for session key derivation)

`DivviupApi` is kept as a thin test-only shim that spawns Axum on IPv6 Localhost and proxies via the existing `AxumProxy`, preserving test-support co... (continued)

112 of 227 new or added lines in 14 files covered. (49.34%)

8 existing lines in 4 files now uncovered.

4335 of 7420 relevant lines covered (58.42%)

63.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.12
/src/queue.rs
1
mod job;
2
pub use crate::entity::queue::JobStatus;
3
pub use job::*;
4

5
use crate::{
6
    entity::queue::{ActiveModel, Column, Entity, Model},
7
    Config, Db,
8
};
9
use sea_orm::{
10
    sea_query::{all, Expr},
11
    ActiveModelTrait, ActiveValue, ColumnTrait, DbErr, EntityTrait, IntoActiveModel,
12
    PaginatorTrait, QueryFilter, TransactionTrait,
13
};
14
use std::{ops::Range, sync::Arc, time::Duration};
15
use time::OffsetDateTime;
16
use tokio::{
17
    task::{JoinHandle, JoinSet},
18
    time::sleep,
19
};
20
use tokio_util::sync::CancellationToken;
21

22
#[derive(Clone, Debug)]
23
pub struct Queue {
24
    cancel: CancellationToken,
25
    db: Db,
26
    job_state: Arc<SharedJobState>,
27
}
28
/*
29
These configuration variables may eventually be useful to put on Config
30
*/
31
const MAX_RETRY: i32 = 5;
32
const QUEUE_CHECK_INTERVAL: Range<u64> = 60_000..120_000;
33
const SCHEDULE_RANDOMNESS: Range<u64> = 0..15_000;
34
const QUEUE_WORKER_COUNT: u8 = 2;
35

36
fn reschedule_based_on_failure_count(failure_count: i32) -> Option<OffsetDateTime> {
×
37
    if failure_count >= MAX_RETRY {
×
38
        None
×
39
    } else {
40
        let duration = Duration::from_millis(
×
41
            1000 * 4_u64.pow(failure_count.try_into().unwrap())
×
42
                + fastrand::u64(SCHEDULE_RANDOMNESS),
×
43
        );
44
        Some(OffsetDateTime::now_utc() + duration)
×
45
    }
46
}
×
47

48
impl Queue {
49
    pub fn new(db: &Db, config: &Config, cancel: CancellationToken) -> Self {
2✔
50
        Self {
2✔
51
            cancel,
2✔
52
            db: db.clone(),
2✔
53
            job_state: Arc::new(config.into()),
2✔
54
        }
2✔
55
    }
2✔
56

57
    pub async fn schedule_recurring_tasks_if_needed(&self) -> Result<(), DbErr> {
×
58
        let tx = self.db.begin().await?;
×
59

60
        let session_cleanup_jobs = Entity::find()
×
61
            .filter(all![
×
62
                Expr::cust_with_expr("job->>'type' = $1", "SessionCleanup"),
×
63
                Column::ScheduledAt.gt(OffsetDateTime::now_utc()),
×
64
            ])
×
65
            .count(&tx)
×
66
            .await?;
×
67

68
        if session_cleanup_jobs == 0 {
×
69
            Job::from(SessionCleanup).insert(&tx).await?;
×
70
        }
×
71
        tx.commit().await?;
×
72

73
        let tx = self.db.begin().await?;
×
74
        let queue_cleanup_jobs = Entity::find()
×
75
            .filter(all![
×
76
                Expr::cust_with_expr("job->>'type' = $1", "QueueCleanup"),
×
77
                Column::ScheduledAt.gt(OffsetDateTime::now_utc()),
×
78
            ])
×
79
            .count(&tx)
×
80
            .await?;
×
81

82
        if queue_cleanup_jobs == 0 {
×
83
            Job::from(QueueCleanup).insert(&tx).await?;
×
84
        }
×
85
        tx.commit().await?;
×
86

87
        Ok(())
×
88
    }
×
89

90
    // TODO(#2262): use TaskTracker to wait for in-flight jobs during graceful shutdown
91
    pub async fn perform_one_queue_job(&self) -> Result<Option<Model>, DbErr> {
5✔
92
        let tx = self.db.begin().await?;
5✔
93
        let model = if let Some(queue_item) = Entity::next(&tx).await? {
5✔
94
            let mut queue_item = queue_item.into_active_model();
4✔
95

96
            let mut job = queue_item.job.take().ok_or_else(|| {
4✔
97
                DbErr::Custom(String::from(
×
98
                    r#"Queue item found without a job.
×
99
                       We believe this to be unreachable"#,
×
100
                ))
×
101
            })?;
×
102

103
            let result = job.perform(&self.job_state, &tx).await;
4✔
104
            queue_item.job = ActiveValue::Set(job);
4✔
105

106
            match result {
4✔
107
                Ok(Some(next_job)) => {
3✔
108
                    queue_item.status = ActiveValue::Set(JobStatus::Success);
3✔
109
                    queue_item.scheduled_at = ActiveValue::Set(None);
3✔
110

111
                    let mut next_job = ActiveModel::from(next_job);
3✔
112
                    next_job.parent_id = ActiveValue::Set(Some(*queue_item.id.as_ref()));
3✔
113
                    let next_job = next_job.insert(&tx).await?;
3✔
114
                    queue_item.child_id = ActiveValue::Set(Some(next_job.id));
3✔
115
                }
116

117
                Ok(None) => {
1✔
118
                    queue_item.scheduled_at = ActiveValue::Set(None);
1✔
119
                    queue_item.status = ActiveValue::Set(JobStatus::Success);
1✔
120
                }
1✔
121

122
                Err(e) if e.is_retryable() => {
×
123
                    queue_item.failure_count =
×
124
                        ActiveValue::Set(queue_item.failure_count.as_ref() + 1);
×
125
                    let reschedule =
×
126
                        reschedule_based_on_failure_count(*queue_item.failure_count.as_ref());
×
127
                    queue_item.status = ActiveValue::Set(
128
                        reschedule.map_or(JobStatus::Failed, |_| JobStatus::Pending),
×
129
                    );
130
                    queue_item.scheduled_at = ActiveValue::Set(reschedule);
×
131
                    queue_item.error_message = ActiveValue::Set(Some(e.into()));
×
132
                }
133

134
                Err(e) => {
×
135
                    queue_item.failure_count =
×
136
                        ActiveValue::Set(queue_item.failure_count.as_ref() + 1);
×
137
                    queue_item.scheduled_at = ActiveValue::Set(None);
×
138
                    queue_item.status = ActiveValue::Set(JobStatus::Failed);
×
139
                    queue_item.error_message = ActiveValue::Set(Some(e.into()));
×
140
                }
×
141
            }
142

143
            queue_item.updated_at = ActiveValue::Set(OffsetDateTime::now_utc());
4✔
144
            Some(queue_item.update(&tx).await?)
4✔
145
        } else {
146
            None
1✔
147
        };
148
        tx.commit().await?;
5✔
149
        Ok(model)
5✔
150
    }
5✔
151

152
    fn spawn_worker(self, join_set: &mut JoinSet<()>) {
×
153
        join_set.spawn(async move {
×
154
            loop {
NEW
155
                if self.cancel.is_cancelled() {
×
156
                    break;
×
157
                }
×
158

159
                match self.perform_one_queue_job().await {
×
160
                    Err(e) => {
×
161
                        tracing::error!("job error {e}");
×
162
                    }
163

164
                    Ok(Some(_)) => {}
×
165

166
                    Ok(None) => {
NEW
167
                        let sleep_duration =
×
NEW
168
                            Duration::from_millis(fastrand::u64(QUEUE_CHECK_INTERVAL));
×
NEW
169
                        tokio::select! {
×
NEW
170
                            () = self.cancel.cancelled() => break,
×
NEW
171
                            () = sleep(sleep_duration) => {}
×
172
                        }
173
                    }
174
                }
175
            }
176
        });
×
177
    }
×
178

179
    async fn supervise_workers(self) {
×
180
        self.schedule_recurring_tasks_if_needed().await.unwrap();
×
181
        let mut join_set = JoinSet::new();
×
182
        for _ in 0..QUEUE_WORKER_COUNT {
×
183
            self.clone().spawn_worker(&mut join_set);
×
184
        }
×
185

186
        while join_set.join_next().await.is_some() {
×
NEW
187
            if !self.cancel.is_cancelled() {
×
188
                tracing::error!("Worker task shut down. Restarting.");
×
189
                self.clone().spawn_worker(&mut join_set);
×
190
            }
×
191
        }
192
    }
×
193

194
    pub fn spawn_workers(self) -> JoinHandle<()> {
×
195
        tokio::task::spawn(self.supervise_workers())
×
196
    }
×
197
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc