• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4341741407

pending completion
4341741407

push

github

GitHub
chore: Remove `TableInfo::table_name` and `TableInfo::id`. Remove parameter of `Connector::get_tables`. (#1145)

282 of 282 new or added lines in 43 files covered. (100.0%)

29118 of 37709 relevant lines covered (77.22%)

41938.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.52
/dozer-api/src/grpc/shared_impl/mod.rs
1
use dozer_cache::cache::expression::{default_limit_for_query, QueryExpression};
2
use dozer_cache::cache::RecordWithId;
3
use dozer_cache::CacheReader;
4
use dozer_types::grpc_types::types::Operation;
5
use dozer_types::log::warn;
6
use dozer_types::serde_json;
7
use dozer_types::types::Schema;
8
use tokio::sync::broadcast::error::RecvError;
9
use tokio::sync::broadcast::Receiver;
10
use tokio_stream::wrappers::ReceiverStream;
11
use tonic::{Code, Response, Status};
12

13
use crate::api_helper::{get_records, get_records_count};
14
use crate::auth::Access;
15

16
mod filter;
17

18
pub fn from_error(error: impl std::error::Error) -> Status {
×
19
    Status::new(Code::Internal, error.to_string())
×
20
}
×
21

22
fn parse_query(
23✔
23
    query: Option<&str>,
23✔
24
    default: impl FnOnce() -> QueryExpression,
23✔
25
) -> Result<QueryExpression, Status> {
23✔
26
    match query {
23✔
27
        Some(query) => {
16✔
28
            if query.is_empty() {
16✔
29
                Ok(default())
4✔
30
            } else {
31
                serde_json::from_str(query).map_err(from_error)
12✔
32
            }
33
        }
34
        None => Ok(default()),
7✔
35
    }
36
}
23✔
37

38
pub fn count(
9✔
39
    reader: &CacheReader,
9✔
40
    endpoint_name: &str,
9✔
41
    query: Option<&str>,
9✔
42
    access: Option<Access>,
9✔
43
) -> Result<usize, Status> {
9✔
44
    let mut query = parse_query(query, QueryExpression::with_no_limit)?;
9✔
45
    Ok(get_records_count(
9✔
46
        reader,
9✔
47
        endpoint_name,
9✔
48
        &mut query,
9✔
49
        access,
9✔
50
    )?)
9✔
51
}
9✔
52

53
pub fn query<'a>(
14✔
54
    reader: &'a CacheReader,
14✔
55
    endpoint_name: &'a str,
14✔
56
    query: Option<&str>,
14✔
57
    access: Option<Access>,
14✔
58
) -> Result<(&'a Schema, Vec<RecordWithId>), Status> {
14✔
59
    let mut query = parse_query(query, QueryExpression::with_default_limit)?;
14✔
60
    if query.limit.is_none() {
14✔
61
        query.limit = Some(default_limit_for_query());
3✔
62
    }
11✔
63
    let (schema, records) = get_records(reader, endpoint_name, &mut query, access)?;
14✔
64
    Ok((schema, records))
14✔
65
}
14✔
66

67
pub fn on_event<T: Send + 'static>(
4✔
68
    reader: &CacheReader,
4✔
69
    endpoint_name: &str,
4✔
70
    filter: Option<&str>,
4✔
71
    mut broadcast_receiver: Option<Receiver<Operation>>,
4✔
72
    _access: Option<Access>,
4✔
73
    event_mapper: impl Fn(Operation) -> Option<T> + Send + Sync + 'static,
4✔
74
) -> Result<Response<ReceiverStream<T>>, Status> {
4✔
75
    // TODO: Use access.
4✔
76

4✔
77
    if broadcast_receiver.is_none() {
4✔
78
        return Err(Status::unavailable(
×
79
            "on_event is not enabled. This is currently an experimental feature. Enable it in the config.",
×
80
        ));
×
81
    }
4✔
82

83
    let filter = match filter {
4✔
84
        Some(filter) => {
4✔
85
            if filter.is_empty() {
4✔
86
                None
1✔
87
            } else {
88
                Some(serde_json::from_str(filter).map_err(from_error)?)
3✔
89
            }
90
        }
91
        None => None,
×
92
    };
93
    let schema = reader
4✔
94
        .get_schema_and_indexes_by_name(endpoint_name)
4✔
95
        .map_err(|_| Status::invalid_argument(endpoint_name))?
4✔
96
        .0
97
        .clone();
4✔
98

4✔
99
    let (tx, rx) = tokio::sync::mpsc::channel(1);
4✔
100

4✔
101
    tokio::spawn(async move {
4✔
102
        loop {
103
            if let Some(broadcast_receiver) = broadcast_receiver.as_mut() {
17✔
104
                let event = broadcast_receiver.recv().await;
17✔
105
                match event {
13✔
106
                    Ok(op) => {
13✔
107
                        if filter::op_satisfies_filter(&op, filter.as_ref(), &schema) {
13✔
108
                            if let Some(event) = event_mapper(op) {
3✔
109
                                if (tx.send(event).await).is_err() {
3✔
110
                                    // receiver dropped
111
                                    break;
×
112
                                }
3✔
113
                            }
×
114
                        }
10✔
115
                    }
116
                    Err(e) => {
×
117
                        warn!("Failed to receive event from broadcast channel: {}", e);
×
118
                        if e == RecvError::Closed {
×
119
                            break;
×
120
                        }
×
121
                    }
122
                }
123
            }
×
124
        }
125
    });
4✔
126

4✔
127
    Ok(Response::new(ReceiverStream::new(rx)))
4✔
128
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc