• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / github-workflows-update / 17780379842

16 Sep 2025 09:58PM UTC coverage: 50.572% (+5.7%) from 44.828%
17780379842

push

github

lpenz
Cargo.*: increment version to 0.3.26

221 of 437 relevant lines covered (50.57%)

1.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/proxy.rs
1
// Copyright (C) 2022 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! The proxy [`Server`] spawns a task that makes async requests and
6
//! caches the result, while async [`Client`] provides the API.
7

8
use anyhow;
9
use std::collections::HashMap;
10
use tokio::sync::mpsc;
11
use tokio::sync::oneshot;
12
use tracing::{Level, event, instrument};
13

14
use crate::resource::Resource;
15
use crate::version::Version;
16

17
#[derive(Debug)]
18
pub struct Server {
19
    server_ch: mpsc::Sender<Message>,
20
}
21

22
#[derive(Debug)]
23
pub struct Client {
24
    server_ch: mpsc::Sender<Message>,
25
}
26

27
#[derive(Debug)]
28
pub enum Message {
29
    Request {
30
        resource: Resource,
31
        client_ch: oneshot::Sender<Option<Vec<Version>>>,
32
    },
33
    Downloaded {
34
        resource: Resource,
35
        versions: Option<Vec<Version>>,
36
    },
37
}
38

39
type Cache = HashMap<Resource, Option<Vec<Version>>>;
40
type Pending = HashMap<Resource, Vec<oneshot::Sender<Option<Vec<Version>>>>>;
41

42
impl Server {
43
    #[instrument(level = "debug")]
44
    pub fn new() -> Server {
×
45
        let (server_ch, mut queue): (mpsc::Sender<Message>, mpsc::Receiver<Message>) =
×
46
            mpsc::channel(32);
×
47
        let worker_ch = server_ch.clone();
×
48
        tokio::spawn(async move {
×
49
            event!(Level::INFO, "Server task started");
×
50
            let mut pending: Pending = Default::default();
×
51
            let mut cache: Cache = Default::default();
×
52
            while let Some(msg) = queue.recv().await {
×
53
                match msg {
×
54
                    Message::Request {
55
                        resource,
×
56
                        client_ch,
×
57
                    } => {
58
                        Server::handle_request(
×
59
                            worker_ch.clone(),
×
60
                            &cache,
×
61
                            &mut pending,
×
62
                            resource,
×
63
                            client_ch,
×
64
                        )
×
65
                        .await
×
66
                    }
67
                    Message::Downloaded { resource, versions } => {
×
68
                        cache.insert(resource.clone(), versions.clone());
×
69
                        if let Some(clients) = pending.remove(&resource) {
×
70
                            event!(
×
71
                                Level::INFO,
×
72
                                resource = %resource,
73
                                num_clients = clients.len(),
×
74
                                "retrieved, answering pending"
×
75
                            );
76
                            for client_ch in clients {
×
77
                                client_ch.send(versions.clone()).unwrap();
×
78
                            }
×
79
                        } else {
80
                            event!(
×
81
                                Level::ERROR,
×
82
                                resource = %resource,
83
                                "no pending request found"
×
84
                            );
85
                        }
86
                    }
87
                }
88
            }
89
        });
×
90
        Server { server_ch }
×
91
    }
×
92

93
    #[instrument(level = "debug")]
94
    async fn handle_request(
×
95
        worker_ch: mpsc::Sender<Message>,
×
96
        cache: &Cache,
×
97
        pending: &mut Pending,
×
98
        resource: Resource,
×
99
        client_ch: oneshot::Sender<Option<Vec<Version>>>,
×
100
    ) {
×
101
        if let Some(versions) = cache.get(&resource) {
102
            event!(Level::INFO, resource = %resource, "cache hit");
103
            Server::worker_send(worker_ch, resource, versions.clone()).await;
104
            return;
105
        }
106
        let e = pending.entry(resource.clone()).or_default();
107
        if e.is_empty() {
108
            event!(Level::INFO, resource = %resource, "downloader task started");
109
            tokio::spawn(async move {
×
110
                match resource.get_versions().await {
×
111
                    Ok(versions) => {
×
112
                        Server::worker_send(worker_ch, resource, Some(versions)).await;
×
113
                    }
114
                    Err(e) => {
×
115
                        event!(
×
116
                            Level::ERROR,
×
117
                            resource = %resource,
118
                            error = %e,
119
                            "error in get_version"
×
120
                        );
121
                        Server::worker_send(worker_ch, resource, None).await;
×
122
                    }
123
                };
124
            });
×
125
        } else {
126
            event!(
127
                Level::INFO,
128
                resource = %resource,
129
                "downloader task already present"
130
            );
131
        }
132
        e.push(client_ch);
133
    }
×
134

135
    #[instrument(level = "debug")]
136
    async fn worker_send(
×
137
        worker_ch: mpsc::Sender<Message>,
×
138
        resource: Resource,
×
139
        versions: Option<Vec<Version>>,
×
140
    ) {
×
141
        if let Err(e) = worker_ch
142
            .send(Message::Downloaded { resource, versions })
143
            .await
144
        {
145
            event!(
146
                Level::ERROR,
147
                error = %e,
148
                "error sending download to server task"
149
            );
150
        }
151
    }
×
152

153
    #[instrument(level = "debug")]
154
    pub fn new_client(&self) -> Client {
×
155
        Client {
×
156
            server_ch: self.server_ch.clone(),
×
157
        }
×
158
    }
×
159
}
160

161
impl Default for Server {
162
    fn default() -> Self {
×
163
        Self::new()
×
164
    }
×
165
}
166

167
impl Client {
168
    #[instrument(level = "debug")]
169
    pub async fn get_versions(&self, resource: &Resource) -> anyhow::Result<Option<Vec<Version>>> {
×
170
        let (client_ch, response) = oneshot::channel();
171
        self.server_ch
172
            .send(Message::Request {
173
                resource: resource.clone(),
174
                client_ch,
175
            })
176
            .await?;
177
        Ok(response.await?)
178
    }
×
179

180
    #[instrument(level = "debug")]
181
    pub async fn fetch_latest_version(
×
182
        &self,
×
183
        resource: &Resource,
×
184
        current_version: &Version,
×
185
    ) -> Option<(Resource, Version)> {
×
186
        let versions = match self.get_versions(resource).await {
187
            Ok(versions) => versions.unwrap_or_default(),
188
            Err(e) => {
189
                event!(
190
                    Level::ERROR,
191
                    resource = %resource,
192
                    error = %e,
193
                    "error getting version",
194
                );
195
                return None;
196
            }
197
        };
198
        if versions.is_empty() {
199
            event!(
200
                Level::ERROR,
201
                resource = %resource,
202
                versions = ?versions,
203
                "no version found",
204
            );
205
            return None;
206
        } else if !versions.contains(current_version) {
207
            event!(
208
                Level::WARN,
209
                resource = %resource,
210
                current = %current_version,
211
                versions = ?versions,
212
                "current version not present in version list",
213
            );
214
        }
215
        let latest = versions.iter().max().unwrap();
216
        event!(
217
            Level::INFO,
218
            resource = %resource,
219
            versions = ?versions,
220
            latest = %latest,
221
            "got versions",
222
        );
223
        Some((resource.clone(), latest.clone()))
224
    }
×
225
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc