• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6011767741

29 Aug 2023 11:42AM UTC coverage: 76.517%. First build
6011767741

Pull #1881

github

chubei
feat: `on_event` can subscribe to multiple endpoints at once
Pull Request #1881: feat: `on_event` can subscribe to multiple endpoints at once

78 of 78 new or added lines in 7 files covered. (100.0%)

49003 of 64042 relevant lines covered (76.52%)

48280.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.97
/dozer-api/src/grpc/shared_impl/mod.rs
1
use std::collections::HashMap;
2

3
use dozer_cache::cache::expression::{default_limit_for_query, FilterExpression, QueryExpression};
4
use dozer_cache::cache::CacheRecord;
5
use dozer_cache::CacheReader;
6
use dozer_types::grpc_types::types::Operation;
7
use dozer_types::log::warn;
8
use dozer_types::serde_json;
9
use dozer_types::types::Schema;
10
use tokio::sync::broadcast::error::RecvError;
11
use tokio::sync::broadcast::Receiver;
12
use tokio_stream::wrappers::ReceiverStream;
13
use tonic::{Code, Response, Status};
14

15
use crate::api_helper::{get_records, get_records_count};
16
use crate::auth::Access;
17

×
18
mod filter;
×
19

×
20
pub fn from_error(error: impl std::error::Error) -> Status {
×
21
    Status::new(Code::Internal, error.to_string())
×
22
}
×
23

×
24
fn parse_query(
30✔
25
    query: Option<&str>,
30✔
26
    default: impl FnOnce() -> QueryExpression,
30✔
27
) -> Result<QueryExpression, Status> {
30✔
28
    match query {
30✔
29
        Some(query) => {
16✔
30
            if query.is_empty() {
16✔
31
                Ok(default())
4✔
32
            } else {
33
                serde_json::from_str(query).map_err(from_error)
12✔
34
            }
35
        }
×
36
        None => Ok(default()),
14✔
37
    }
×
38
}
30✔
39

×
40
pub fn count(
9✔
41
    reader: &CacheReader,
9✔
42
    query: Option<&str>,
9✔
43
    endpoint: &str,
9✔
44
    access: Option<Access>,
9✔
45
) -> Result<usize, Status> {
9✔
46
    let mut query = parse_query(query, QueryExpression::with_no_limit)?;
9✔
47
    Ok(get_records_count(reader, &mut query, endpoint, access)?)
9✔
48
}
9✔
49

×
50
pub fn query(
21✔
51
    reader: &CacheReader,
21✔
52
    query: Option<&str>,
21✔
53
    endpoint: &str,
21✔
54
    access: Option<Access>,
21✔
55
) -> Result<Vec<CacheRecord>, Status> {
21✔
56
    let mut query = parse_query(query, QueryExpression::with_default_limit)?;
21✔
57
    if query.limit.is_none() {
21✔
58
        query.limit = Some(default_limit_for_query());
3✔
59
    }
18✔
60
    let records = get_records(reader, &mut query, endpoint, access)?;
21✔
61
    Ok(records)
21✔
62
}
21✔
63

×
64
#[derive(Debug)]
×
65
pub struct EndpointFilter {
×
66
    schema: Schema,
×
67
    filter: Option<FilterExpression>,
×
68
}
×
69

×
70
impl EndpointFilter {
×
71
    pub fn new(schema: Schema, filter: Option<&str>) -> Result<Self, Status> {
4✔
72
        let filter = filter
4✔
73
            .and_then(|filter| {
4✔
74
                if filter.is_empty() {
4✔
75
                    None
1✔
76
                } else {
×
77
                    Some(serde_json::from_str(filter))
3✔
78
                }
×
79
            })
4✔
80
            .transpose()
4✔
81
            .map_err(from_error)?;
4✔
82
        Ok(Self { schema, filter })
4✔
83
    }
4✔
84
}
×
85

86
pub fn on_event<T: Send + 'static>(
4✔
87
    endpoints: HashMap<String, EndpointFilter>,
4✔
88
    mut broadcast_receiver: Option<Receiver<Operation>>,
4✔
89
    _access: Option<Access>,
4✔
90
    event_mapper: impl Fn(Operation) -> T + Send + Sync + 'static,
4✔
91
) -> Result<Response<ReceiverStream<T>>, Status> {
4✔
92
    // TODO: Use access.
4✔
93

4✔
94
    if broadcast_receiver.is_none() {
4✔
95
        return Err(Status::unavailable(
×
96
            "on_event is not enabled. This is currently an experimental feature. Enable it in the config.",
×
97
        ));
×
98
    }
4✔
99

4✔
100
    let (tx, rx) = tokio::sync::mpsc::channel(1);
4✔
101

4✔
102
    tokio::spawn(async move {
4✔
103
        loop {
×
104
            if let Some(broadcast_receiver) = broadcast_receiver.as_mut() {
16✔
105
                let event = broadcast_receiver.recv().await;
16✔
106
                match event {
12✔
107
                    Ok(op) => {
12✔
108
                        if let Some(filter) = endpoints.get(&op.endpoint_name) {
12✔
109
                            if filter::op_satisfies_filter(
12✔
110
                                &op,
12✔
111
                                filter.filter.as_ref(),
12✔
112
                                &filter.schema,
12✔
113
                            ) && (tx.send(event_mapper(op)).await).is_err()
12✔
114
                            {
×
115
                                // receiver dropped
×
116
                                break;
×
117
                            }
12✔
118
                        }
×
119
                    }
120
                    Err(e) => {
×
121
                        warn!("Failed to receive event from broadcast channel: {}", e);
×
122
                        if e == RecvError::Closed {
×
123
                            break;
×
124
                        }
×
125
                    }
126
                }
127
            }
×
128
        }
129
    });
4✔
130

4✔
131
    Ok(Response::new(ReceiverStream::new(rx)))
4✔
132
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc