• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4101480021

pending completion
4101480021

Pull #808

github

GitHub
Merge d70e9b907 into 2203cbbcb
Pull Request #808: feat: Introduce DataFusion Connector

74 of 74 new or added lines in 2 files covered. (100.0%)

24538 of 37144 relevant lines covered (66.06%)

38855.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.52
/dozer-api/src/grpc/shared_impl/mod.rs
1
use dozer_cache::cache::expression::{default_limit_for_query, QueryExpression};
2
use dozer_types::log::warn;
3
use dozer_types::serde_json;
4
use dozer_types::types::{Record, Schema};
5
use tokio::sync::broadcast::error::RecvError;
6
use tokio::sync::broadcast::Receiver;
7
use tokio_stream::wrappers::ReceiverStream;
8
use tonic::{Code, Response, Status};
9

10
use crate::auth::Access;
11
use crate::{api_helper::ApiHelper, PipelineDetails};
12

13
use super::internal_grpc::pipeline_response::ApiEvent;
14
use super::internal_grpc::PipelineResponse;
15
use super::types::Operation;
16

17
mod filter;
18

19
pub fn from_error(error: impl std::error::Error) -> Status {
×
20
    Status::new(Code::Internal, error.to_string())
×
21
}
×
22

23
fn parse_query(
18✔
24
    query: Option<&str>,
18✔
25
    default: impl FnOnce() -> QueryExpression,
18✔
26
) -> Result<QueryExpression, Status> {
18✔
27
    match query {
18✔
28
        Some(query) => {
16✔
29
            if query.is_empty() {
16✔
30
                Ok(default())
4✔
31
            } else {
32
                serde_json::from_str(query).map_err(from_error)
12✔
33
            }
34
        }
35
        None => Ok(default()),
2✔
36
    }
37
}
18✔
38

39
pub fn count(
9✔
40
    pipeline_details: &PipelineDetails,
9✔
41
    query: Option<&str>,
9✔
42
    access: Option<Access>,
9✔
43
) -> Result<usize, Status> {
9✔
44
    let query = parse_query(query, QueryExpression::with_no_limit)?;
9✔
45
    let api_helper = ApiHelper::new(pipeline_details, access)?;
9✔
46
    api_helper.get_records_count(query).map_err(from_error)
9✔
47
}
9✔
48

49
pub fn query(
9✔
50
    pipeline_details: &PipelineDetails,
9✔
51
    query: Option<&str>,
9✔
52
    access: Option<Access>,
9✔
53
) -> Result<(Schema, Vec<Record>), Status> {
9✔
54
    let mut query = parse_query(query, QueryExpression::with_default_limit)?;
9✔
55
    if query.limit.is_none() {
9✔
56
        query.limit = Some(default_limit_for_query());
3✔
57
    }
6✔
58
    let api_helper = ApiHelper::new(pipeline_details, access)?;
9✔
59
    let (schema, records) = api_helper.get_records(query).map_err(from_error)?;
9✔
60
    Ok((schema, records))
9✔
61
}
9✔
62

63
pub fn on_event<T: Send + 'static>(
4✔
64
    pipeline_details: &PipelineDetails,
4✔
65
    filter: Option<&str>,
4✔
66
    mut broadcast_receiver: Option<Receiver<PipelineResponse>>,
4✔
67
    access: Option<Access>,
4✔
68
    event_mapper: impl Fn(Operation, String) -> Option<T> + Send + Sync + 'static,
4✔
69
) -> Result<Response<ReceiverStream<T>>, Status> {
4✔
70
    if broadcast_receiver.is_none() {
4✔
71
        return Err(Status::unavailable(
×
72
            "on_event is not enabled. This is currently an experimental feature. Enable it in the config.",
×
73
        ));
×
74
    }
4✔
75

76
    let filter = match filter {
4✔
77
        Some(filter) => {
4✔
78
            if filter.is_empty() {
4✔
79
                None
1✔
80
            } else {
81
                Some(serde_json::from_str(filter).map_err(from_error)?)
3✔
82
            }
83
        }
84
        None => None,
×
85
    };
86
    let api_helper = ApiHelper::new(pipeline_details, access)?;
4✔
87
    let schema = api_helper
4✔
88
        .get_schema()
4✔
89
        .map_err(|_| Status::invalid_argument(&pipeline_details.cache_endpoint.endpoint.name))?;
4✔
90

91
    let (tx, rx) = tokio::sync::mpsc::channel(1);
4✔
92

4✔
93
    tokio::spawn(async move {
4✔
94
        loop {
95
            if let Some(broadcast_receiver) = broadcast_receiver.as_mut() {
6✔
96
                let event = broadcast_receiver.recv().await;
6✔
97
                match event {
4✔
98
                    Ok(event) => {
2✔
99
                        if let Some(ApiEvent::Op(op)) = event.api_event {
2✔
100
                            if filter::op_satisfies_filter(&op, filter.as_ref(), &schema) {
2✔
101
                                if let Some(event) = event_mapper(op, event.endpoint) {
2✔
102
                                    if (tx.send(event).await).is_err() {
2✔
103
                                        // receiver dropped
104
                                        break;
×
105
                                    }
2✔
106
                                }
×
107
                            }
×
108
                        }
×
109
                    }
110
                    Err(e) => {
2✔
111
                        warn!("Failed to receive event from broadcast channel: {}", e);
2✔
112
                        if e == RecvError::Closed {
2✔
113
                            break;
2✔
114
                        }
×
115
                    }
116
                }
117
            }
×
118
        }
119
    });
4✔
120

4✔
121
    Ok(Response::new(ReceiverStream::new(rx)))
4✔
122
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc