• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4763031797

pending completion
4763031797

Pull #1458

github

GitHub
Merge 6302acc92 into c58df4a0b
Pull Request #1458: fix: uuid postgres type support

20 of 20 new or added lines in 3 files covered. (100.0%)

34612 of 43909 relevant lines covered (78.83%)

11679.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.37
/dozer-api/src/grpc/shared_impl/mod.rs
1
use dozer_cache::cache::expression::{default_limit_for_query, QueryExpression};
2
use dozer_cache::cache::CacheRecord;
3
use dozer_cache::CacheReader;
4
use dozer_types::grpc_types::types::Operation;
5
use dozer_types::log::warn;
6
use dozer_types::serde_json;
7
use tokio::sync::broadcast::error::RecvError;
8
use tokio::sync::broadcast::Receiver;
9
use tokio_stream::wrappers::ReceiverStream;
10
use tonic::{Code, Response, Status};
11

12
use crate::api_helper::{get_records, get_records_count};
13
use crate::auth::Access;
14

15
mod filter;
16

17
pub fn from_error(error: impl std::error::Error) -> Status {
×
18
    Status::new(Code::Internal, error.to_string())
×
19
}
×
20

21
fn parse_query(
26✔
22
    query: Option<&str>,
26✔
23
    default: impl FnOnce() -> QueryExpression,
26✔
24
) -> Result<QueryExpression, Status> {
26✔
25
    match query {
26✔
26
        Some(query) => {
16✔
27
            if query.is_empty() {
16✔
28
                Ok(default())
4✔
29
            } else {
30
                serde_json::from_str(query).map_err(from_error)
12✔
31
            }
32
        }
33
        None => Ok(default()),
10✔
34
    }
35
}
26✔
36

37
pub fn count(
9✔
38
    reader: &CacheReader,
9✔
39
    query: Option<&str>,
9✔
40
    endpoint: &str,
9✔
41
    access: Option<Access>,
9✔
42
) -> Result<usize, Status> {
9✔
43
    let mut query = parse_query(query, QueryExpression::with_no_limit)?;
9✔
44
    Ok(get_records_count(reader, &mut query, endpoint, access)?)
9✔
45
}
9✔
46

47
pub fn query(
17✔
48
    reader: &CacheReader,
17✔
49
    query: Option<&str>,
17✔
50
    endpoint: &str,
17✔
51
    access: Option<Access>,
17✔
52
) -> Result<Vec<CacheRecord>, Status> {
17✔
53
    let mut query = parse_query(query, QueryExpression::with_default_limit)?;
17✔
54
    if query.limit.is_none() {
17✔
55
        query.limit = Some(default_limit_for_query());
3✔
56
    }
14✔
57
    let records = get_records(reader, &mut query, endpoint, access)?;
17✔
58
    Ok(records)
17✔
59
}
17✔
60

61
pub fn on_event<T: Send + 'static>(
4✔
62
    reader: &CacheReader,
4✔
63
    filter: Option<&str>,
4✔
64
    mut broadcast_receiver: Option<Receiver<Operation>>,
4✔
65
    _access: Option<Access>,
4✔
66
    event_mapper: impl Fn(Operation) -> Option<T> + Send + Sync + 'static,
4✔
67
) -> Result<Response<ReceiverStream<T>>, Status> {
4✔
68
    // TODO: Use access.
4✔
69

4✔
70
    if broadcast_receiver.is_none() {
4✔
71
        return Err(Status::unavailable(
×
72
            "on_event is not enabled. This is currently an experimental feature. Enable it in the config.",
×
73
        ));
×
74
    }
4✔
75

76
    let filter = match filter {
4✔
77
        Some(filter) => {
4✔
78
            if filter.is_empty() {
4✔
79
                None
1✔
80
            } else {
81
                Some(serde_json::from_str(filter).map_err(from_error)?)
3✔
82
            }
83
        }
84
        None => None,
×
85
    };
86
    let schema = reader.get_schema().0.clone();
4✔
87

4✔
88
    let (tx, rx) = tokio::sync::mpsc::channel(1);
4✔
89

4✔
90
    tokio::spawn(async move {
4✔
91
        loop {
92
            if let Some(broadcast_receiver) = broadcast_receiver.as_mut() {
6✔
93
                let event = broadcast_receiver.recv().await;
6✔
94
                match event {
4✔
95
                    Ok(op) => {
2✔
96
                        if filter::op_satisfies_filter(&op, filter.as_ref(), &schema) {
2✔
97
                            if let Some(event) = event_mapper(op) {
2✔
98
                                if (tx.send(event).await).is_err() {
2✔
99
                                    // receiver dropped
100
                                    break;
×
101
                                }
2✔
102
                            }
×
103
                        }
×
104
                    }
105
                    Err(e) => {
2✔
106
                        warn!("Failed to receive event from broadcast channel: {}", e);
2✔
107
                        if e == RecvError::Closed {
2✔
108
                            break;
2✔
109
                        }
×
110
                    }
111
                }
112
            }
×
113
        }
114
    });
4✔
115

4✔
116
    Ok(Response::new(ReceiverStream::new(rx)))
4✔
117
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc