• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4414871180

pending completion
4414871180

push

github

GitHub
feat: add object store validation (#1140)

40 of 40 new or added lines in 2 files covered. (100.0%)

28658 of 39135 relevant lines covered (73.23%)

92989.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.25
/dozer-api/src/grpc/shared_impl/mod.rs
1
use dozer_cache::cache::expression::{default_limit_for_query, QueryExpression};
2
use dozer_cache::cache::RecordWithId;
3
use dozer_cache::CacheReader;
4
use dozer_types::grpc_types::types::Operation;
5
use dozer_types::log::warn;
6
use dozer_types::serde_json;
7
use tokio::sync::broadcast::error::RecvError;
8
use tokio::sync::broadcast::Receiver;
9
use tokio_stream::wrappers::ReceiverStream;
10
use tonic::{Code, Response, Status};
11

12
use crate::api_helper::{get_records, get_records_count};
13
use crate::auth::Access;
14

15
mod filter;
16

17
pub fn from_error(error: impl std::error::Error) -> Status {
×
18
    Status::new(Code::Internal, error.to_string())
×
19
}
×
20

21
fn parse_query(
28✔
22
    query: Option<&str>,
28✔
23
    default: impl FnOnce() -> QueryExpression,
28✔
24
) -> Result<QueryExpression, Status> {
28✔
25
    match query {
28✔
26
        Some(query) => {
16✔
27
            if query.is_empty() {
16✔
28
                Ok(default())
4✔
29
            } else {
30
                serde_json::from_str(query).map_err(from_error)
12✔
31
            }
32
        }
33
        None => Ok(default()),
12✔
34
    }
35
}
28✔
36

37
pub fn count(
9✔
38
    reader: &CacheReader,
9✔
39
    query: Option<&str>,
9✔
40
    access: Option<Access>,
9✔
41
) -> Result<usize, Status> {
9✔
42
    let mut query = parse_query(query, QueryExpression::with_no_limit)?;
9✔
43
    Ok(get_records_count(reader, &mut query, access)?)
9✔
44
}
9✔
45

46
pub fn query(
19✔
47
    reader: &CacheReader,
19✔
48
    query: Option<&str>,
19✔
49
    access: Option<Access>,
19✔
50
) -> Result<Vec<RecordWithId>, Status> {
19✔
51
    let mut query = parse_query(query, QueryExpression::with_default_limit)?;
19✔
52
    if query.limit.is_none() {
19✔
53
        query.limit = Some(default_limit_for_query());
3✔
54
    }
16✔
55
    let records = get_records(reader, &mut query, access)?;
19✔
56
    Ok(records)
19✔
57
}
19✔
58

59
pub fn on_event<T: Send + 'static>(
4✔
60
    reader: &CacheReader,
4✔
61
    filter: Option<&str>,
4✔
62
    mut broadcast_receiver: Option<Receiver<Operation>>,
4✔
63
    _access: Option<Access>,
4✔
64
    event_mapper: impl Fn(Operation) -> Option<T> + Send + Sync + 'static,
4✔
65
) -> Result<Response<ReceiverStream<T>>, Status> {
4✔
66
    // TODO: Use access.
4✔
67

4✔
68
    if broadcast_receiver.is_none() {
4✔
69
        return Err(Status::unavailable(
×
70
            "on_event is not enabled. This is currently an experimental feature. Enable it in the config.",
×
71
        ));
×
72
    }
4✔
73

74
    let filter = match filter {
4✔
75
        Some(filter) => {
4✔
76
            if filter.is_empty() {
4✔
77
                None
1✔
78
            } else {
79
                Some(serde_json::from_str(filter).map_err(from_error)?)
3✔
80
            }
81
        }
82
        None => None,
×
83
    };
84
    let schema = reader.get_schema().0.clone();
4✔
85

4✔
86
    let (tx, rx) = tokio::sync::mpsc::channel(1);
4✔
87

4✔
88
    tokio::spawn(async move {
4✔
89
        loop {
90
            if let Some(broadcast_receiver) = broadcast_receiver.as_mut() {
17✔
91
                let event = broadcast_receiver.recv().await;
17✔
92
                match event {
13✔
93
                    Ok(op) => {
13✔
94
                        if filter::op_satisfies_filter(&op, filter.as_ref(), &schema) {
13✔
95
                            if let Some(event) = event_mapper(op) {
3✔
96
                                if (tx.send(event).await).is_err() {
3✔
97
                                    // receiver dropped
98
                                    break;
×
99
                                }
3✔
100
                            }
×
101
                        }
10✔
102
                    }
103
                    Err(e) => {
×
104
                        warn!("Failed to receive event from broadcast channel: {}", e);
×
105
                        if e == RecvError::Closed {
×
106
                            break;
×
107
                        }
×
108
                    }
109
                }
110
            }
×
111
        }
112
    });
4✔
113

4✔
114
    Ok(Response::new(ReceiverStream::new(rx)))
4✔
115
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc