• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283045331

pending completion
4283045331

push

github

GitHub
feat: Support timestamp diff (#1074)

58 of 58 new or added lines in 2 files covered. (100.0%)

27146 of 37535 relevant lines covered (72.32%)

33460.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.1
/dozer-api/src/grpc/shared_impl/mod.rs
1
use dozer_cache::cache::expression::{default_limit_for_query, QueryExpression};
2
use dozer_cache::cache::RecordWithId;
3
use dozer_cache::CacheReader;
4
use dozer_types::grpc_types::internal::PipelineResponse;
5
use dozer_types::grpc_types::types::Operation;
6
use dozer_types::log::warn;
7
use dozer_types::serde_json;
8
use dozer_types::types::Schema;
9
use tokio::sync::broadcast::error::RecvError;
10
use tokio::sync::broadcast::Receiver;
11
use tokio_stream::wrappers::ReceiverStream;
12
use tonic::{Code, Response, Status};
13

14
use crate::api_helper::{get_records, get_records_count};
15
use crate::auth::Access;
16

17
use dozer_types::grpc_types::internal::pipeline_response::ApiEvent;
18

19
mod filter;
20

21
pub fn from_error(error: impl std::error::Error) -> Status {
×
22
    Status::new(Code::Internal, error.to_string())
×
23
}
×
24

25
fn parse_query(
23✔
26
    query: Option<&str>,
23✔
27
    default: impl FnOnce() -> QueryExpression,
23✔
28
) -> Result<QueryExpression, Status> {
23✔
29
    match query {
23✔
30
        Some(query) => {
16✔
31
            if query.is_empty() {
16✔
32
                Ok(default())
4✔
33
            } else {
34
                serde_json::from_str(query).map_err(from_error)
12✔
35
            }
36
        }
37
        None => Ok(default()),
7✔
38
    }
39
}
23✔
40

41
pub fn count(
9✔
42
    reader: &CacheReader,
9✔
43
    endpoint_name: &str,
9✔
44
    query: Option<&str>,
9✔
45
    access: Option<Access>,
9✔
46
) -> Result<usize, Status> {
9✔
47
    let mut query = parse_query(query, QueryExpression::with_no_limit)?;
9✔
48
    Ok(get_records_count(
9✔
49
        reader,
9✔
50
        endpoint_name,
9✔
51
        &mut query,
9✔
52
        access,
9✔
53
    )?)
9✔
54
}
9✔
55

56
pub fn query<'a>(
14✔
57
    reader: &'a CacheReader,
14✔
58
    endpoint_name: &'a str,
14✔
59
    query: Option<&str>,
14✔
60
    access: Option<Access>,
14✔
61
) -> Result<(&'a Schema, Vec<RecordWithId>), Status> {
14✔
62
    let mut query = parse_query(query, QueryExpression::with_default_limit)?;
14✔
63
    if query.limit.is_none() {
14✔
64
        query.limit = Some(default_limit_for_query());
3✔
65
    }
11✔
66
    let (schema, records) = get_records(reader, endpoint_name, &mut query, access)?;
14✔
67
    Ok((schema, records))
14✔
68
}
14✔
69

70
pub fn on_event<T: Send + 'static>(
4✔
71
    reader: &CacheReader,
4✔
72
    endpoint_name: &str,
4✔
73
    filter: Option<&str>,
4✔
74
    mut broadcast_receiver: Option<Receiver<PipelineResponse>>,
4✔
75
    _access: Option<Access>,
4✔
76
    event_mapper: impl Fn(Operation, String) -> Option<T> + Send + Sync + 'static,
4✔
77
) -> Result<Response<ReceiverStream<T>>, Status> {
4✔
78
    // TODO: Use access.
4✔
79

4✔
80
    if broadcast_receiver.is_none() {
4✔
81
        return Err(Status::unavailable(
×
82
            "on_event is not enabled. This is currently an experimental feature. Enable it in the config.",
×
83
        ));
×
84
    }
4✔
85

86
    let filter = match filter {
4✔
87
        Some(filter) => {
4✔
88
            if filter.is_empty() {
4✔
89
                None
1✔
90
            } else {
91
                Some(serde_json::from_str(filter).map_err(from_error)?)
3✔
92
            }
93
        }
94
        None => None,
×
95
    };
96
    let schema = reader
4✔
97
        .get_schema_and_indexes_by_name(endpoint_name)
4✔
98
        .map_err(|_| Status::invalid_argument(endpoint_name))?
4✔
99
        .0
100
        .clone();
4✔
101

4✔
102
    let (tx, rx) = tokio::sync::mpsc::channel(1);
4✔
103

4✔
104
    tokio::spawn(async move {
4✔
105
        loop {
106
            if let Some(broadcast_receiver) = broadcast_receiver.as_mut() {
16✔
107
                let event = broadcast_receiver.recv().await;
16✔
108
                match event {
13✔
109
                    Ok(event) => {
12✔
110
                        if let Some(ApiEvent::Op(op)) = event.api_event {
12✔
111
                            if filter::op_satisfies_filter(&op, filter.as_ref(), &schema) {
12✔
112
                                if let Some(event) = event_mapper(op, event.endpoint) {
2✔
113
                                    if (tx.send(event).await).is_err() {
2✔
114
                                        // receiver dropped
115
                                        break;
×
116
                                    }
2✔
117
                                }
×
118
                            }
10✔
119
                        }
×
120
                    }
121
                    Err(e) => {
1✔
122
                        warn!("Failed to receive event from broadcast channel: {}", e);
1✔
123
                        if e == RecvError::Closed {
1✔
124
                            break;
1✔
125
                        }
×
126
                    }
127
                }
128
            }
×
129
        }
130
    });
4✔
131

4✔
132
    Ok(Response::new(ReceiverStream::new(rx)))
4✔
133
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc