• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12466060820

23 Dec 2024 11:26AM UTC coverage: 90.353% (-0.2%) from 90.512%
12466060820

Pull #998

github

web-flow
Merge 66ab0655c into 34e12969f
Pull Request #998: Quota and Data usage Logging

834 of 1211 new or added lines in 66 files covered. (68.87%)

222 existing lines in 18 files now uncovered.

133834 of 148123 relevant lines covered (90.35%)

54353.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.77
/services/src/pro/workflows/postgres_workflow_registry.rs
1
use crate::error::Result;
2
use crate::pro::contexts::ProPostgresDb;
3
use crate::workflows::registry::TxWorkflowRegistry;
4
use crate::workflows::workflow::{Workflow, WorkflowId};
5
use crate::{error, workflows::registry::WorkflowRegistry};
6
use async_trait::async_trait;
7
use bb8_postgres::{
8
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
9
};
10
use snafu::ResultExt;
11

12
#[async_trait]
13
impl<Tls> TxWorkflowRegistry for ProPostgresDb<Tls>
14
where
15
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
16
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
17
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
18
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
19
{
20
    async fn register_workflow_in_tx(
21
        &self,
22
        workflow: Workflow,
23
        tx: &tokio_postgres::Transaction<'_>,
24
    ) -> Result<WorkflowId> {
24✔
25
        let workflow_id = WorkflowId::from_hash(&workflow);
24✔
26

24✔
27
        tx.execute(
24✔
28
            "INSERT INTO workflows (id, workflow) VALUES ($1, $2) 
24✔
29
            ON CONFLICT DO NOTHING;",
24✔
30
            &[
24✔
31
                &workflow_id,
24✔
32
                &serde_json::to_value(&workflow).context(error::SerdeJson)?,
24✔
33
            ],
34
        )
35
        .await?;
34✔
36

37
        Ok(workflow_id)
24✔
38
    }
48✔
39
}
40

41
#[async_trait]
42
impl<Tls> WorkflowRegistry for ProPostgresDb<Tls>
43
where
44
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
45
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
46
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
47
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
48
{
49
    async fn register_workflow(&self, workflow: Workflow) -> Result<WorkflowId> {
8✔
50
        let mut conn = self.conn_pool.get().await?;
8✔
51
        let tx = conn.transaction().await?;
8✔
52

53
        let id = self.register_workflow_in_tx(workflow, &tx).await?;
16✔
54

55
        tx.commit().await?;
8✔
56

57
        Ok(id)
8✔
58
    }
16✔
59

60
    async fn load_workflow(&self, id: &WorkflowId) -> Result<Workflow> {
8✔
61
        // TODO: add authorization beyond the fact that you can only access workflows if you happen to know the id.
62
        let conn = self.conn_pool.get().await?;
8✔
63
        let stmt = conn
8✔
64
            .prepare("SELECT workflow FROM workflows WHERE id = $1")
8✔
65
            .await?;
8✔
66

67
        let row = conn.query(&stmt, &[&id]).await?;
8✔
68

69
        if row.is_empty() {
8✔
UNCOV
70
            return Err(error::Error::NoWorkflowForGivenId);
×
71
        }
8✔
72

8✔
73
        Ok(serde_json::from_value(row[0].get(0)).context(error::SerdeJson)?)
8✔
74
    }
16✔
75
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc