• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4105700744

pending completion
4105700744

Pull #814

github

GitHub
Merge d70d4d25f into 016b3ada5
Pull Request #814: Bump clap from 4.0.32 to 4.1.4

23457 of 37651 relevant lines covered (62.3%)

44725.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-admin/src/db/source.rs
1
use super::{
2
    application::Application,
3
    connection::{DbConnection, NewConnection},
4
    constants,
5
    persistable::Persistable,
6
    pool::DbPool,
7
    schema::{self, connections, sources},
8
};
9
use crate::db::schema::apps::dsl::apps;
10
use crate::db::schema::connections::dsl::connections as dsl_connections;
11

12
use crate::server::dozer_admin_grpc::Pagination;
13
use diesel::{insert_into, prelude::*, query_dsl::methods::FilterDsl, ExpressionMethods};
14
use dozer_types::serde;
15
use schema::sources::dsl::*;
16
use serde::{Deserialize, Serialize};
17
use std::error::Error;
18

19
#[derive(
20
    Queryable,
×
21
    Identifiable,
×
22
    Associations,
×
23
    PartialEq,
×
24
    Eq,
25
    Debug,
×
26
    Clone,
×
27
    Serialize,
×
28
    Deserialize,
×
29
    Default,
×
30
)]
31
#[diesel(belongs_to(Application, foreign_key = app_id))]
32
#[diesel(table_name = sources)]
33
pub struct DBSource {
34
    pub(crate) id: String,
35
    pub(crate) app_id: String,
36
    pub(crate) name: String,
37
    pub(crate) table_name: String,
38
    pub(crate) connection_id: String,
39
    pub(crate) columns: String,
40
    pub(crate) created_at: String,
41
    pub(crate) updated_at: String,
42
}
43
#[derive(Insertable, AsChangeset, PartialEq, Eq, Debug, Clone)]
×
44
#[diesel(table_name = sources)]
45
pub struct NewSource {
46
    id: String,
47
    app_id: String,
48
    name: String,
49
    table_name: String,
50
    connection_id: String,
51
    columns_: String,
52
}
53

54
fn convert_to_source(
×
55
    db_source: DBSource,
×
56
    connection: dozer_types::models::connection::Connection,
×
57
) -> dozer_types::models::source::Source {
×
58
    let columns_value: Vec<String> = db_source
×
59
        .columns
×
60
        .split(',')
×
61
        .into_iter()
×
62
        .map(|s| s.to_string())
×
63
        .collect();
×
64

×
65
    dozer_types::models::source::Source {
×
66
        id: Some(db_source.id),
×
67
        app_id: Some(db_source.app_id),
×
68
        name: db_source.name,
×
69
        table_name: db_source.table_name,
×
70
        columns: columns_value,
×
71
        connection: Some(connection),
×
72
        refresh_config: Some(dozer_types::models::source::RefreshConfig::default()),
×
73
    }
×
74
}
×
75
impl Persistable<'_, dozer_types::models::source::Source> for dozer_types::models::source::Source {
76
    fn save(
×
77
        &mut self,
×
78
        pool: DbPool,
×
79
    ) -> Result<&mut dozer_types::models::source::Source, Box<dyn Error>> {
×
80
        self.upsert(pool)
×
81
    }
×
82

83
    fn by_id(
×
84
        pool: DbPool,
×
85
        input_id: String,
×
86
        application_id: String,
×
87
    ) -> Result<dozer_types::models::source::Source, Box<dyn Error>> {
×
88
        let mut db = pool.get()?;
×
89
        let result: DBSource = FilterDsl::filter(
×
90
            FilterDsl::filter(sources, id.eq(input_id)),
×
91
            app_id.eq(application_id.to_owned()),
×
92
        )
×
93
        .first(&mut db)?;
×
94
        let connection_info = dozer_types::models::connection::Connection::by_id(
×
95
            pool,
×
96
            result.to_owned().connection_id,
×
97
            application_id,
×
98
        )?;
×
99
        let source_info = convert_to_source(result, connection_info);
×
100
        Ok(source_info)
×
101
    }
×
102

103
    fn list(
×
104
        pool: DbPool,
×
105
        application_id: String,
×
106
        limit: Option<u32>,
×
107
        offset: Option<u32>,
×
108
    ) -> Result<(Vec<dozer_types::models::source::Source>, Pagination), Box<dyn Error>> {
×
109
        let offset = offset.unwrap_or(constants::OFFSET);
×
110
        let limit = limit.unwrap_or(constants::LIMIT);
×
111
        let mut db = pool.get()?;
×
112
        let filter_dsl = FilterDsl::filter(sources, app_id.eq(application_id.to_owned()));
×
113
        let results: Vec<(DBSource, DbConnection)> = FilterDsl::filter(
×
114
            sources::table
×
115
                .inner_join(connections::table)
×
116
                .select((sources::all_columns, connections::all_columns)),
×
117
            sources::app_id.eq(application_id),
×
118
        )
×
119
        .load::<(DBSource, DbConnection)>(&mut db)?;
×
120
        let total: i64 = filter_dsl.count().get_result(&mut db)?;
×
121
        let response: Vec<dozer_types::models::source::Source> = results
×
122
            .iter()
×
123
            .map(|result| {
×
124
                convert_to_source(
×
125
                    result.to_owned().0,
×
126
                    dozer_types::models::connection::Connection::try_from(result.to_owned().1)
×
127
                        .unwrap(),
×
128
                )
×
129
            })
×
130
            .collect();
×
131
        Ok((
×
132
            response,
×
133
            Pagination {
×
134
                limit,
×
135
                total: total.try_into().unwrap(),
×
136
                offset,
×
137
            },
×
138
        ))
×
139
    }
×
140

141
    fn upsert(
×
142
        &mut self,
×
143
        pool: DbPool,
×
144
    ) -> Result<&mut dozer_types::models::source::Source, Box<dyn Error>> {
×
145
        let mut db = pool.get()?;
×
146
        if let Some(connection) = self.connection.to_owned() {
×
147
            let mut connection = connection;
×
148
            db.transaction::<(), _, _>(|conn| -> Result<(), Box<dyn Error>> {
×
149
                let _ = apps
×
150
                    .find(self.app_id.to_owned().unwrap_or_default())
×
151
                    .first::<Application>(conn)
×
152
                    .map_err(|err| format!("App_id: {err:}"))?;
×
153

154
                connection.app_id = self.app_id.to_owned();
×
155
                let new_connection = NewConnection::try_from(connection.to_owned())?;
×
156

157
                // upsert connection incase connection not created
158
                let _inserted_connection = insert_into(dsl_connections)
×
159
                    .values(&new_connection)
×
160
                    .on_conflict(connections::id)
×
161
                    .do_update()
×
162
                    .set(&new_connection)
×
163
                    .execute(conn);
×
164
                self.connection = Some(connection.to_owned());
×
165

×
166
                let new_source = NewSource {
×
167
                    name: self.name.to_owned(),
×
168
                    table_name: self.table_name.to_owned(),
×
169
                    connection_id: connection.to_owned().id.unwrap_or_default(),
×
170
                    id: self.id.to_owned().unwrap_or_default(),
×
171
                    app_id: self.app_id.to_owned().unwrap_or_default(),
×
172
                    columns_: self.columns.join(","),
×
173
                };
×
174
                insert_into(sources)
×
175
                    .values(&new_source)
×
176
                    .on_conflict(sources::id)
×
177
                    .do_update()
×
178
                    .set(&new_source)
×
179
                    .execute(conn)?;
×
180
                self.id = Some(new_source.id);
×
181
                Ok(())
×
182
            })?;
×
183
            Ok(self)
×
184
        } else {
185
            Err("Missing connection info for Source".to_owned())?
×
186
        }
187
    }
×
188

189
    fn delete(
×
190
        pool: DbPool,
×
191
        input_id: String,
×
192
        application_id: String,
×
193
    ) -> Result<bool, Box<dyn Error>> {
×
194
        let mut db = pool.get()?;
×
195
        diesel::delete(FilterDsl::filter(
×
196
            FilterDsl::filter(sources, id.eq(input_id)),
×
197
            app_id.eq(application_id),
×
198
        ))
×
199
        .execute(&mut db)?;
×
200
        Ok(true)
×
201
    }
×
202
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc