• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/ethereum/connector.rs
1
use std::collections::HashMap;
2
use std::{str::FromStr, sync::Arc};
3

4
use crate::connectors::{Connector, ValidationResults};
5
use crate::ingestion::Ingestor;
6
use crate::{
7
    connectors::{ethereum::helper, TableInfo},
8
    errors::ConnectorError,
9
};
10
use dozer_types::ingestion_types::{EthConfig, EthFilter};
11

12
use dozer_types::parking_lot::RwLock;
13
use dozer_types::serde_json;
14

15
use super::sender::{run, EthDetails};
16
use dozer_types::types::ReplicationChangesTrackingType;
17
use tokio::runtime::Runtime;
18
use web3::ethabi::{Contract, Event};
19
use web3::types::{Address, BlockNumber, Filter, FilterBuilder, H256, U64};
20

21
pub struct EthConnector {
22
    pub id: u64,
23
    config: EthConfig,
24
    // Address -> (contract, contract_name)
25
    contracts: HashMap<String, ContractTuple>,
26
    tables: Option<Vec<TableInfo>>,
27
    // contract_signacture -> SchemaID
28
    schema_map: HashMap<H256, usize>,
29
    ingestor: Option<Arc<RwLock<Ingestor>>>,
30
    conn_name: String,
31
}
32

33
#[derive(Debug, Clone)]
×
34
// (Contract, Name)
35
pub struct ContractTuple(pub Contract, pub String);
36

37
pub const ETH_LOGS_TABLE: &str = "eth_logs";
38
impl EthConnector {
39
    pub fn build_filter(filter: &EthFilter) -> Filter {
×
40
        let builder = FilterBuilder::default();
×
41

42
        // Optionally add a from_block filter
43
        let builder = match filter.from_block {
×
44
            Some(block_no) => builder.from_block(BlockNumber::Number(U64::from(block_no))),
×
45
            None => builder,
×
46
        };
47
        // Optionally add a to_block filter
48
        let builder = match filter.to_block {
×
49
            Some(block_no) => builder.to_block(BlockNumber::Number(U64::from(block_no))),
×
50
            None => builder,
×
51
        };
52

53
        // Optionally Add Address filter
54
        let builder = match filter.addresses.is_empty() {
×
55
            false => {
56
                let addresses = filter
×
57
                    .addresses
×
58
                    .iter()
×
59
                    .map(|a| Address::from_str(a).unwrap())
×
60
                    .collect();
×
61
                builder.address(addresses)
×
62
            }
63
            true => builder,
×
64
        };
65

66
        // Optionally add topics
67
        let builder = match filter.topics.is_empty() {
×
68
            false => {
69
                let topics: Vec<Vec<H256>> = filter
×
70
                    .topics
×
71
                    .iter()
×
72
                    .map(|t| vec![H256::from_str(t).unwrap()])
×
73
                    .collect();
×
74
                builder.topics(
×
75
                    topics.get(0).cloned(),
×
76
                    topics.get(1).cloned(),
×
77
                    topics.get(2).cloned(),
×
78
                    topics.get(3).cloned(),
×
79
                )
×
80
            }
81
            true => builder,
×
82
        };
83

84
        builder.build()
×
85
    }
×
86

87
    pub fn new(id: u64, config: EthConfig, conn_name: String) -> Self {
×
88
        let mut contracts = HashMap::new();
×
89

90
        for c in &config.contracts {
×
91
            let contract = serde_json::from_str(&c.abi).expect("unable to parse contract from abi");
×
92
            contracts.insert(
×
93
                c.address.to_string().to_lowercase(),
×
94
                ContractTuple(contract, c.name.to_string()),
×
95
            );
×
96
        }
×
97

98
        let schema_map = Self::build_schema_map(&contracts);
×
99
        Self {
×
100
            id,
×
101
            config,
×
102
            contracts,
×
103
            schema_map,
×
104
            tables: None,
×
105
            ingestor: None,
×
106
            conn_name,
×
107
        }
×
108
    }
×
109

110
    fn build_schema_map(contracts: &HashMap<String, ContractTuple>) -> HashMap<H256, usize> {
×
111
        let mut schema_map = HashMap::new();
×
112

×
113
        let mut signatures = vec![];
×
114
        for contract_tuple in contracts.values() {
×
115
            let contract = contract_tuple.0.clone();
×
116
            let events: Vec<&Event> = contract.events.values().flatten().collect();
×
117
            for evt in events {
×
118
                signatures.push(evt.signature());
×
119
            }
×
120
        }
121
        signatures.sort();
×
122

123
        for (idx, signature) in signatures.iter().enumerate() {
×
124
            schema_map.insert(signature.to_owned(), 2 + idx);
×
125
        }
×
126
        schema_map
×
127
    }
×
128
}
129

130
impl Connector for EthConnector {
131
    fn get_schemas(
×
132
        &self,
×
133
        tables: Option<Vec<TableInfo>>,
×
134
    ) -> Result<
×
135
        Vec<(
×
136
            String,
×
137
            dozer_types::types::Schema,
×
138
            ReplicationChangesTrackingType,
×
139
        )>,
×
140
        ConnectorError,
×
141
    > {
×
142
        let mut schemas = vec![(
×
143
            ETH_LOGS_TABLE.to_string(),
×
144
            helper::get_eth_schema(),
×
145
            ReplicationChangesTrackingType::FullChanges,
×
146
        )];
×
147

×
148
        let event_schemas = helper::get_contract_event_schemas(
×
149
            self.contracts.to_owned(),
×
150
            self.schema_map.to_owned(),
×
151
        );
×
152
        schemas.extend(event_schemas);
×
153

154
        let schemas = if let Some(tables) = tables {
×
155
            schemas
×
156
                .iter()
×
157
                .filter(|(n, _, _)| tables.iter().any(|t| t.table_name == *n))
×
158
                .cloned()
×
159
                .collect()
×
160
        } else {
161
            schemas
×
162
        };
163

164
        Ok(schemas)
×
165
    }
×
166

167
    fn get_tables(&self) -> Result<Vec<TableInfo>, ConnectorError> {
×
168
        let schemas = self.get_schemas(None)?;
×
169

170
        let tables = schemas
×
171
            .iter()
×
172
            .enumerate()
×
173
            .map(|(id, (name, schema, _))| TableInfo {
×
174
                name: name.to_string(),
×
175
                table_name: name.to_string(),
×
176
                id: id as u32,
×
177
                columns: Some(schema.fields.iter().map(|f| f.name.to_owned()).collect()),
×
178
            })
×
179
            .collect();
×
180
        Ok(tables)
×
181
    }
×
182

183
    fn initialize(
×
184
        &mut self,
×
185
        ingestor: Arc<RwLock<Ingestor>>,
×
186
        tables: Option<Vec<TableInfo>>,
×
187
    ) -> Result<(), ConnectorError> {
×
188
        self.ingestor = Some(ingestor);
×
189
        self.tables = tables;
×
190
        Ok(())
×
191
    }
×
192

193
    fn start(&self, from_seq: Option<(u64, u64)>) -> Result<(), ConnectorError> {
×
194
        // Start a new thread that interfaces with ETH node
×
195
        let wss_url = self.config.wss_url.to_owned();
×
196
        let filter = self.config.filter.to_owned().unwrap_or_default();
×
197

198
        let ingestor = self
×
199
            .ingestor
×
200
            .as_ref()
×
201
            .map_or(Err(ConnectorError::InitializationError), Ok)?
×
202
            .clone();
×
203

×
204
        Runtime::new().unwrap().block_on(async {
×
205
            let details = Arc::new(EthDetails::new(
×
206
                wss_url,
×
207
                filter,
×
208
                ingestor,
×
209
                self.contracts.to_owned(),
×
210
                self.tables.to_owned(),
×
211
                self.schema_map.to_owned(),
×
212
                from_seq,
×
213
                self.conn_name.clone(),
×
214
            ));
×
215
            run(details).await
×
216
        })
×
217
    }
×
218

219
    fn stop(&self) {}
×
220

221
    fn test_connection(&self) -> Result<(), ConnectorError> {
×
222
        Ok(())
×
223
    }
×
224

225
    fn validate(&self, _tables: Option<Vec<TableInfo>>) -> Result<(), ConnectorError> {
×
226
        // Return contract parsing error
227
        for contract in &self.config.contracts {
×
228
            let res: Result<Contract, serde_json::Error> = serde_json::from_str(&contract.abi);
×
229
            if let Err(e) = res {
×
230
                return Err(ConnectorError::map_serialization_error(e));
×
231
            }
×
232
        }
233
        Ok(())
×
234
    }
×
235

236
    fn validate_schemas(&self, _tables: &[TableInfo]) -> Result<ValidationResults, ConnectorError> {
×
237
        Ok(HashMap::new())
×
238
    }
×
239
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc