• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.16
/dozer-api/src/grpc/shared_impl/mod.rs
1
use std::collections::HashMap;
2

3
use dozer_cache::cache::expression::{FilterExpression, QueryExpression};
4
use dozer_cache::cache::CacheRecord;
5
use dozer_cache::CacheReader;
6
use dozer_types::grpc_types::types::Operation;
7
use dozer_types::log::warn;
8
use dozer_types::serde_json;
9
use dozer_types::tonic::{Code, Response, Status};
10
use dozer_types::types::Schema;
11
use tokio::sync::broadcast::error::RecvError;
12
use tokio::sync::broadcast::Receiver;
13
use tokio_stream::wrappers::ReceiverStream;
14

15
use crate::api_helper::{get_records, get_records_count};
16
use crate::auth::Access;
17

18
mod filter;
19

20
pub fn from_error(error: impl std::error::Error) -> Status {
×
21
    Status::new(Code::Internal, error.to_string())
×
22
}
×
23

24
fn parse_query(
30✔
25
    query: Option<&str>,
30✔
26
    default: impl FnOnce() -> QueryExpression,
30✔
27
) -> Result<QueryExpression, Status> {
30✔
28
    match query {
30✔
29
        Some(query) => {
16✔
30
            if query.is_empty() {
16✔
31
                Ok(default())
4✔
32
            } else {
33
                serde_json::from_str(query).map_err(from_error)
12✔
34
            }
35
        }
36
        None => Ok(default()),
14✔
37
    }
38
}
30✔
39

40
pub fn count(
9✔
41
    reader: &CacheReader,
9✔
42
    query: Option<&str>,
9✔
43
    endpoint: &str,
9✔
44
    access: Option<Access>,
9✔
45
) -> Result<usize, Status> {
9✔
46
    let mut query = parse_query(query, QueryExpression::with_no_limit)?;
9✔
47
    Ok(get_records_count(reader, &mut query, endpoint, access)?)
9✔
48
}
9✔
49

50
pub fn query(
21✔
51
    reader: &CacheReader,
21✔
52
    query: Option<&str>,
21✔
53
    endpoint: &str,
21✔
54
    access: Option<Access>,
21✔
55
    default_max_num_records: usize,
21✔
56
) -> Result<Vec<CacheRecord>, Status> {
21✔
57
    let mut query = parse_query(query, || {
21✔
58
        QueryExpression::with_limit(default_max_num_records)
15✔
59
    })?;
21✔
60
    if query.limit.is_none() {
21✔
61
        query.limit = Some(default_max_num_records);
3✔
62
    }
18✔
63
    let records = get_records(reader, &mut query, endpoint, access)?;
21✔
64
    Ok(records)
21✔
65
}
21✔
66

67
#[derive(Debug)]
×
68
pub struct EndpointFilter {
69
    schema: Schema,
70
    filter: Option<FilterExpression>,
71
}
72

73
impl EndpointFilter {
74
    pub fn new(schema: Schema, filter: Option<&str>) -> Result<Self, Status> {
4✔
75
        let filter = filter
4✔
76
            .and_then(|filter| {
4✔
77
                if filter.is_empty() {
4✔
78
                    None
1✔
79
                } else {
80
                    Some(serde_json::from_str(filter))
3✔
81
                }
82
            })
4✔
83
            .transpose()
4✔
84
            .map_err(from_error)?;
4✔
85
        Ok(Self { schema, filter })
4✔
86
    }
4✔
87
}
88

89
pub fn on_event<T: Send + 'static>(
4✔
90
    endpoints: HashMap<String, EndpointFilter>,
4✔
91
    mut broadcast_receiver: Option<Receiver<Operation>>,
4✔
92
    _access: Option<Access>,
4✔
93
    event_mapper: impl Fn(Operation) -> T + Send + Sync + 'static,
4✔
94
) -> Result<Response<ReceiverStream<T>>, Status> {
4✔
95
    // TODO: Use access.
4✔
96

4✔
97
    if broadcast_receiver.is_none() {
4✔
98
        return Err(Status::unavailable(
×
99
            "on_event is not enabled. This is currently an experimental feature. Enable it in the config.",
×
100
        ));
×
101
    }
4✔
102

4✔
103
    if endpoints.is_empty() {
4✔
104
        return Err(Status::invalid_argument("empty endpoints array"));
×
105
    }
4✔
106

4✔
107
    let (tx, rx) = tokio::sync::mpsc::channel(1);
4✔
108

4✔
109
    tokio::spawn(async move {
4✔
110
        loop {
111
            if let Some(broadcast_receiver) = broadcast_receiver.as_mut() {
17✔
112
                let event = broadcast_receiver.recv().await;
17✔
113
                match event {
13✔
114
                    Ok(op) => {
13✔
115
                        if let Some(filter) = endpoints.get(&op.endpoint_name) {
13✔
116
                            if filter::op_satisfies_filter(
13✔
117
                                &op,
13✔
118
                                filter.filter.as_ref(),
13✔
119
                                &filter.schema,
13✔
120
                            ) && (tx.send(event_mapper(op)).await).is_err()
13✔
121
                            {
122
                                // receiver dropped
123
                                break;
×
124
                            }
13✔
125
                        }
×
126
                    }
127
                    Err(e) => {
×
128
                        warn!("Failed to receive event from broadcast channel: {}", e);
×
129
                        if e == RecvError::Closed {
×
130
                            break;
×
131
                        }
×
132
                    }
133
                }
134
            }
×
135
        }
136
    });
4✔
137

4✔
138
    Ok(Response::new(ReceiverStream::new(rx)))
4✔
139
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc