• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/ethereum/helper.rs
1
use dozer_types::types::{
2
    Field, FieldDefinition, FieldType, Operation, OperationEvent, Record,
3
    ReplicationChangesTrackingType, Schema, SchemaIdentifier, SchemaWithChangesType,
4
    SourceDefinition,
5
};
6
use std::collections::HashMap;
7
use std::sync::Arc;
8

9
use web3::ethabi::RawLog;
10
use web3::transports::WebSocket;
11
use web3::types::{Log, H256};
12

13
use crate::connectors::TableInfo;
14

15
use super::connector::{ContractTuple, ETH_LOGS_TABLE};
16
use super::sender::EthDetails;
17

18
pub async fn get_wss_client(url: &str) -> Result<web3::Web3<WebSocket>, web3::Error> {
×
19
    Ok(web3::Web3::new(
20
        web3::transports::WebSocket::new(url).await?,
×
21
    ))
22
}
×
23

24
pub fn get_contract_event_schemas(
×
25
    contracts: HashMap<String, ContractTuple>,
×
26
    schema_map: HashMap<H256, usize>,
×
27
) -> Vec<SchemaWithChangesType> {
×
28
    let mut schemas = vec![];
×
29

30
    for (_, contract_tuple) in contracts {
×
31
        for event in contract_tuple.0.events.values().flatten() {
×
32
            let mut fields = vec![];
×
33
            for input in event.inputs.iter().cloned() {
×
34
                fields.push(FieldDefinition {
35
                    name: input.name,
×
36
                    typ: match input.kind {
×
37
                        web3::ethabi::ParamType::Address => FieldType::String,
×
38
                        web3::ethabi::ParamType::Bytes => FieldType::Binary,
×
39
                        web3::ethabi::ParamType::FixedBytes(_) => FieldType::Binary,
×
40
                        web3::ethabi::ParamType::Int(_) => FieldType::UInt,
×
41
                        web3::ethabi::ParamType::Uint(_) => FieldType::UInt,
×
42
                        web3::ethabi::ParamType::Bool => FieldType::Boolean,
×
43
                        web3::ethabi::ParamType::String => FieldType::String,
×
44
                        // TODO: These are to be mapped to appropriate types
45
                        web3::ethabi::ParamType::Array(_)
46
                        | web3::ethabi::ParamType::FixedArray(_, _)
47
                        | web3::ethabi::ParamType::Tuple(_) => FieldType::Text,
×
48
                    },
49
                    nullable: false,
50
                    source: SourceDefinition::Dynamic,
×
51
                });
52
            }
53

54
            let schema_id = schema_map
×
55
                .get(&event.signature())
×
56
                .expect("schema is missing")
×
57
                .to_owned();
×
58

×
59
            schemas.push((
×
60
                get_table_name(&contract_tuple, &event.name),
×
61
                Schema {
×
62
                    identifier: Some(SchemaIdentifier {
×
63
                        id: schema_id as u32,
×
64
                        version: 1,
×
65
                    }),
×
66
                    fields,
×
67
                    primary_index: vec![0],
×
68
                },
×
69
                ReplicationChangesTrackingType::FullChanges,
×
70
            ));
×
71
        }
72
    }
73

74
    schemas
×
75
}
×
76

77
pub fn decode_event(
×
78
    log: Log,
×
79
    contracts: HashMap<String, ContractTuple>,
×
80
    tables: Option<Vec<TableInfo>>,
×
81
    schema_map: HashMap<H256, usize>,
×
82
) -> Option<OperationEvent> {
×
83
    let address = format!("{:?}", log.address);
×
84

85
    if let Some(contract_tuple) = contracts.get(&address) {
×
86
        // Topics 0, 1, 2 should be name, buyer, seller in most cases
87
        let name = log
×
88
            .topics
×
89
            .get(0)
×
90
            .expect("name is expected")
×
91
            .to_owned()
×
92
            .to_string();
×
93
        let opt_event = contract_tuple
×
94
            .0
×
95
            .events
×
96
            .values()
×
97
            .flatten()
×
98
            .into_iter()
×
99
            .find(|evt| evt.signature().to_string() == name);
×
100

101
        if let Some(event) = opt_event {
×
102
            let schema_id = schema_map
×
103
                .get(&event.signature())
×
104
                .expect("schema is missing")
×
105
                .to_owned();
×
106

×
107
            let seq_no = get_id(&log) + schema_id as u64;
×
108
            let table_name = get_table_name(contract_tuple, &event.name);
×
109
            let is_table_required = tables.map_or(true, |tables| {
×
110
                tables.iter().any(|t| t.table_name == table_name)
×
111
            });
×
112
            if is_table_required {
×
113
                let parsed_event = event
×
114
                    .parse_log(RawLog {
×
115
                        topics: log.topics,
×
116
                        data: log.data.0,
×
117
                    })
×
118
                    .unwrap_or_else(|_| {
×
119
                        panic!(
×
120
                            "parsing event failed: block_no: {}, txn_hash: {}. Have you included the right abi to address mapping ?",
×
121
                            log.block_number.unwrap(),
×
122
                            log.transaction_hash.unwrap()
×
123
                        )
×
124
                    });
×
125

×
126
                // let columns_idx = get_columns_idx(&table_name, default_columns, tables.clone());
×
127
                let values = parsed_event
×
128
                    .params
×
129
                    .into_iter()
×
130
                    .map(|p| map_abitype_to_field(p.value))
×
131
                    .collect();
×
132
                return Some(OperationEvent {
×
133
                    seq_no,
×
134
                    operation: Operation::Insert {
×
135
                        new: Record {
×
136
                            schema_id: Some(SchemaIdentifier {
×
137
                                id: schema_id as u32,
×
138
                                version: 1,
×
139
                            }),
×
140
                            values,
×
141
                            version: None,
×
142
                        },
×
143
                    },
×
144
                });
×
145
            }
×
146
        }
×
147
    }
×
148

149
    None
×
150
}
×
151

152
pub fn get_table_name(contract_tuple: &ContractTuple, event_name: &str) -> String {
×
153
    format!("{}_{}", contract_tuple.1, event_name)
×
154
}
×
155

156
pub fn map_abitype_to_field(f: web3::ethabi::Token) -> Field {
×
157
    match f {
×
158
        web3::ethabi::Token::Address(f) => Field::String(format!("{f:?}")),
×
159
        web3::ethabi::Token::FixedBytes(f) => Field::Binary(f),
×
160
        web3::ethabi::Token::Bytes(f) => Field::Binary(f),
×
161
        // TODO: Convert i64 appropriately
162
        web3::ethabi::Token::Int(f) => Field::UInt(f.low_u64()),
×
163
        web3::ethabi::Token::Uint(f) => Field::UInt(f.low_u64()),
×
164
        web3::ethabi::Token::Bool(f) => Field::Boolean(f),
×
165
        web3::ethabi::Token::String(f) => Field::String(f),
×
166
        web3::ethabi::Token::FixedArray(f)
×
167
        | web3::ethabi::Token::Array(f)
×
168
        | web3::ethabi::Token::Tuple(f) => Field::Text(
×
169
            f.iter()
×
170
                .map(|f| f.to_string())
×
171
                .collect::<Vec<String>>()
×
172
                .join(","),
×
173
        ),
×
174
    }
175
}
×
176
pub fn map_log_to_event(log: Log, details: Arc<EthDetails>) -> Option<OperationEvent> {
×
177
    // Check if table is requested
×
178
    let is_table_required = details.tables.as_ref().map_or(true, |tables| {
×
179
        tables.iter().any(|t| t.table_name == ETH_LOGS_TABLE)
×
180
    });
×
181

×
182
    if !is_table_required {
×
183
        None
×
184
    } else if log.log_index.is_some() {
×
185
        let (idx, values) = map_log_to_values(log);
×
186
        Some(OperationEvent {
×
187
            seq_no: idx,
×
188
            operation: Operation::Insert {
×
189
                new: Record {
×
190
                    schema_id: Some(SchemaIdentifier { id: 1, version: 1 }),
×
191
                    values,
×
192
                    version: None,
×
193
                },
×
194
            },
×
195
        })
×
196
    } else {
197
        None
×
198
    }
199
}
×
200

201
pub fn get_id(log: &Log) -> u64 {
×
202
    let block_no = log
×
203
        .block_number
×
204
        .expect("expected for non pendning")
×
205
        .as_u64();
×
206

×
207
    let log_idx = log.log_index.expect("expected for non pendning").as_u64();
×
208

×
209
    block_no * 100_000 + log_idx * 2
×
210
}
×
211
pub fn map_log_to_values(log: Log) -> (u64, Vec<Field>) {
×
212
    let block_no = log.block_number.expect("expected for non pending").as_u64();
×
213
    let txn_idx = log
×
214
        .transaction_index
×
215
        .expect("expected for non pending")
×
216
        .as_u64();
×
217
    let log_idx = log.log_index.expect("expected for non pending").as_u64();
×
218

×
219
    let idx = get_id(&log);
×
220

×
221
    let values = vec![
×
222
        Field::UInt(idx),
×
223
        Field::String(format!("{:?}", log.address)),
×
224
        Field::Text(
×
225
            log.topics
×
226
                .iter()
×
227
                .map(|t| t.to_string())
×
228
                .collect::<Vec<String>>()
×
229
                .join(" "),
×
230
        ),
×
231
        Field::Binary(log.data.0),
×
232
        log.block_hash
×
233
            .map_or(Field::Null, |f| Field::String(f.to_string())),
×
234
        Field::UInt(block_no),
×
235
        log.transaction_hash
×
236
            .map_or(Field::Null, |f| Field::String(f.to_string())),
×
237
        Field::UInt(txn_idx),
×
238
        Field::UInt(log_idx),
×
239
        log.transaction_log_index
×
240
            .map_or(Field::Null, |f| Field::Int(f.try_into().unwrap())),
×
241
        log.log_type.map_or(Field::Null, Field::String),
×
242
        log.removed.map_or(Field::Null, Field::Boolean),
×
243
    ];
×
244

×
245
    (idx, values)
×
246
}
×
247

×
248
pub fn get_columns_idx(
×
249
    table_name: &str,
×
250
    default_columns: Vec<String>,
×
251
    tables: Option<Vec<TableInfo>>,
×
252
) -> Vec<usize> {
×
253
    let columns = tables.as_ref().map_or(vec![], |tables| {
×
254
        tables
×
255
            .iter()
×
256
            .find(|t| t.table_name == table_name)
×
257
            .map_or(vec![], |t| {
×
258
                t.columns.as_ref().map_or(vec![], |cols| cols.clone())
×
259
            })
×
260
    });
×
261
    let columns = if columns.is_empty() {
×
262
        default_columns.clone()
×
263
    } else {
×
264
        columns
×
265
    };
×
266

×
267
    columns
×
268
        .iter()
×
269
        .map(|c| {
×
270
            default_columns
×
271
                .iter()
×
272
                .position(|f| f == c)
×
273
                .unwrap_or_else(|| panic!("column not found: {c}"))
×
274
        })
×
275
        .collect::<Vec<usize>>()
×
276
}
×
277

×
278
pub fn get_eth_schema() -> Schema {
×
279
    Schema {
×
280
        identifier: Some(SchemaIdentifier { id: 1, version: 1 }),
×
281
        fields: vec![
×
282
            FieldDefinition {
×
283
                name: "id".to_string(),
×
284
                typ: FieldType::UInt,
×
285
                nullable: false,
×
286
                source: SourceDefinition::Dynamic,
×
287
            },
×
288
            FieldDefinition {
×
289
                name: "address".to_string(),
×
290
                typ: FieldType::String,
×
291
                nullable: false,
×
292
                source: SourceDefinition::Dynamic,
×
293
            },
×
294
            FieldDefinition {
×
295
                name: "topics".to_string(),
×
296
                typ: FieldType::String,
×
297
                nullable: false,
×
298
                source: SourceDefinition::Dynamic,
×
299
            },
×
300
            FieldDefinition {
×
301
                name: "data".to_string(),
×
302
                typ: FieldType::Binary,
×
303
                nullable: false,
×
304
                source: SourceDefinition::Dynamic,
×
305
            },
×
306
            FieldDefinition {
×
307
                name: "block_hash".to_string(),
×
308
                typ: FieldType::String,
×
309
                nullable: true,
×
310
                source: SourceDefinition::Dynamic,
×
311
            },
×
312
            FieldDefinition {
×
313
                name: "block_number".to_string(),
×
314
                typ: FieldType::UInt,
×
315
                nullable: true,
×
316
                source: SourceDefinition::Dynamic,
×
317
            },
×
318
            FieldDefinition {
×
319
                name: "transaction_hash".to_string(),
×
320
                typ: FieldType::String,
×
321
                nullable: true,
×
322
                source: SourceDefinition::Dynamic,
×
323
            },
×
324
            FieldDefinition {
×
325
                name: "transaction_index".to_string(),
×
326
                typ: FieldType::Int,
×
327
                nullable: true,
×
328
                source: SourceDefinition::Dynamic,
×
329
            },
×
330
            FieldDefinition {
×
331
                name: "log_index".to_string(),
×
332
                typ: FieldType::Int,
×
333
                nullable: true,
×
334
                source: SourceDefinition::Dynamic,
×
335
            },
×
336
            FieldDefinition {
×
337
                name: "transaction_log_index".to_string(),
×
338
                typ: FieldType::Int,
×
339
                nullable: true,
×
340
                source: SourceDefinition::Dynamic,
×
341
            },
×
342
            FieldDefinition {
×
343
                name: "log_type".to_string(),
×
344
                typ: FieldType::String,
×
345
                nullable: true,
×
346
                source: SourceDefinition::Dynamic,
×
347
            },
×
348
            FieldDefinition {
×
349
                name: "removed".to_string(),
×
350
                typ: FieldType::Boolean,
×
351
                nullable: true,
×
352
                source: SourceDefinition::Dynamic,
×
353
            },
×
354
        ],
×
355

×
356
        primary_index: vec![0],
×
357
    }
×
358
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc