• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

safe-global / safe-eth-py / 19324708014

13 Nov 2025 08:06AM UTC coverage: 85.227% (-8.6%) from 93.85%
19324708014

Pull #2120

github

web-flow
Merge eb6a26957 into 0a3842ae6
Pull Request #2120: Update UserOperationV07

34 of 39 new or added lines in 6 files covered. (87.18%)

919 existing lines in 26 files now uncovered.

9011 of 10573 relevant lines covered (85.23%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.55
/safe_eth/eth/oracles/oracles.py
1
import functools
1✔
2
import logging
1✔
3
from abc import ABC, abstractmethod
1✔
4
from dataclasses import dataclass
1✔
5
from functools import cached_property
1✔
6
from typing import List, Optional, Tuple
1✔
7

8
import requests
1✔
9
from eth_abi import decode as decode_abi
1✔
10
from eth_abi.exceptions import DecodingError
1✔
11
from eth_abi.packed import encode_packed
1✔
12
from eth_typing import ChecksumAddress, HexAddress, HexStr
1✔
13
from hexbytes import HexBytes
1✔
14
from web3.contract import Contract
1✔
15
from web3.exceptions import Web3Exception
1✔
16

17
from .. import EthereumClient, EthereumNetwork
1✔
18
from ..constants import NULL_ADDRESS
1✔
19
from ..contracts import (
1✔
20
    get_erc20_contract,
21
    get_uniswap_factory_contract,
22
    get_uniswap_v2_factory_contract,
23
    get_uniswap_v2_pair_contract,
24
    get_uniswap_v2_router_contract,
25
)
26
from ..utils import (
1✔
27
    bytes_to_float,
28
    fast_bytes_to_checksum_address,
29
    fast_keccak,
30
    get_empty_tx_params,
31
)
32
from .abis.aave_abis import AAVE_ATOKEN_ABI
1✔
33
from .abis.balancer_abis import balancer_pool_abi
1✔
34
from .abis.cream_abis import cream_ctoken_abi
1✔
35
from .abis.mooniswap_abis import mooniswap_abi
1✔
36
from .abis.zerion_abis import ZERION_TOKEN_ADAPTER_ABI
1✔
37
from .exceptions import CannotGetPriceFromOracle, InvalidPriceFromOracle
1✔
38
from .helpers.curve_gauge_list import CURVE_GAUGE_TO_LP_TOKEN
1✔
39
from .utils import get_decimals
1✔
40

41
logger = logging.getLogger(__name__)
1✔
42

43

44
@dataclass
1✔
45
class UnderlyingToken:
1✔
46
    address: ChecksumAddress
1✔
47
    quantity: float
1✔
48

49

50
class BaseOracle(ABC):
1✔
51
    @classmethod
1✔
52
    @abstractmethod
1✔
53
    def is_available(
1✔
54
        cls,
55
        ethereum_client: EthereumClient,
56
    ) -> bool:
57
        """
58
        :param ethereum_client:
59
        :return: `True` if Oracle is available for the EthereumClient provided, `False` otherwise
60
        """
61
        raise NotImplementedError
×
62

63

64
class PriceOracle(BaseOracle):
1✔
65
    @abstractmethod
1✔
66
    def get_price(self, *args) -> float:
1✔
67
        raise NotImplementedError
×
68

69

70
class PricePoolOracle(BaseOracle):
1✔
71
    @abstractmethod
1✔
72
    def get_pool_token_price(self, pool_token_address: ChecksumAddress) -> float:
1✔
73
        raise NotImplementedError
×
74

75

76
class ComposedPriceOracle(BaseOracle):
1✔
77
    @abstractmethod
1✔
78
    def get_underlying_tokens(self, *args) -> List[UnderlyingToken]:
1✔
79
        raise NotImplementedError
×
80

81

82
class UniswapOracle(PriceOracle):
1✔
83
    """
84
    Uniswap V1 Oracle
85

86
    https://docs.uniswap.org/protocol/V1/guides/connect-to-uniswap
87
    """
88

89
    ADDRESSES = {
1✔
90
        EthereumNetwork.MAINNET: "0xc0a47dFe034B400B47bDaD5FecDa2621de6c4d95",
91
        EthereumNetwork.RINKEBY: "0xf5D915570BC477f9B8D6C0E980aA81757A3AaC36",
92
        EthereumNetwork.ROPSTEN: "0x9c83dCE8CA20E9aAF9D3efc003b2ea62aBC08351",
93
        EthereumNetwork.GOERLI: "0x6Ce570d02D73d4c384b46135E87f8C592A8c86dA",
94
    }
95

96
    def __init__(
1✔
97
        self,
98
        ethereum_client: EthereumClient,
99
        uniswap_factory_address: Optional[str] = None,
100
    ):
101
        """
102
        :param ethereum_client:
103
        :param uniswap_factory_address: https://docs.uniswap.io/frontend-integration/connect-to-uniswap#factory-contract
104
        """
105
        self.ethereum_client = ethereum_client
1✔
106
        self.w3 = ethereum_client.w3
1✔
107
        self._uniswap_factory_address = uniswap_factory_address
1✔
108

109
    @classmethod
1✔
110
    def is_available(
1✔
111
        cls,
112
        ethereum_client: EthereumClient,
113
    ) -> bool:
114
        """
115
        :param ethereum_client:
116
        :return: `True` if Oracle is available for the EthereumClient provided, `False` otherwise
117
        """
UNCOV
118
        return ethereum_client.get_network() in cls.ADDRESSES
×
119

120
    @cached_property
1✔
121
    def uniswap_factory_address(self):
1✔
122
        if self._uniswap_factory_address:
1✔
123
            return self._uniswap_factory_address
1✔
UNCOV
124
        return self.ADDRESSES.get(
×
125
            self.ethereum_client.get_network(),
126
            self.ADDRESSES.get(EthereumNetwork.MAINNET),
127
        )
128

129
    @cached_property
1✔
130
    def uniswap_factory(self):
1✔
131
        return get_uniswap_factory_contract(self.w3, self.uniswap_factory_address)
1✔
132

133
    @functools.lru_cache(maxsize=2048)
1✔
134
    def get_uniswap_exchange(self, token_address: str) -> str:
1✔
135
        return self.uniswap_factory.functions.getExchange(token_address).call()
1✔
136

137
    def _get_balances_without_batching(
1✔
138
        self, uniswap_exchange_address: str, token_address: str
139
    ):
UNCOV
140
        uniswap_exchange_address_checksum = ChecksumAddress(
×
141
            HexAddress(HexStr(uniswap_exchange_address))
142
        )
UNCOV
143
        token_address_checksum = ChecksumAddress(HexAddress(HexStr(token_address)))
×
UNCOV
144
        balance = self.ethereum_client.get_balance(uniswap_exchange_address_checksum)
×
UNCOV
145
        token_decimals = self.ethereum_client.erc20.get_decimals(token_address_checksum)
×
UNCOV
146
        token_balance = self.ethereum_client.erc20.get_balance(
×
147
            uniswap_exchange_address_checksum, token_address_checksum
148
        )
UNCOV
149
        return balance, token_decimals, token_balance
×
150

151
    def _get_balances_using_batching(
1✔
152
        self, uniswap_exchange_address: str, token_address: str
153
    ):
154
        # Use batching instead
UNCOV
155
        payload_balance = {
×
156
            "jsonrpc": "2.0",
157
            "method": "eth_getBalance",
158
            "params": [uniswap_exchange_address, "latest"],
159
            "id": 0,
160
        }
UNCOV
161
        erc20 = get_erc20_contract(
×
162
            self.w3, ChecksumAddress(HexAddress(HexStr(token_address)))
163
        )
UNCOV
164
        params = get_empty_tx_params()
×
UNCOV
165
        decimals_data = erc20.functions.decimals().build_transaction(params)["data"]
×
UNCOV
166
        token_balance_data = erc20.functions.balanceOf(
×
167
            uniswap_exchange_address
168
        ).build_transaction(params)["data"]
UNCOV
169
        datas = [decimals_data, token_balance_data]
×
UNCOV
170
        payload_calls = [
×
171
            {
172
                "id": i + 1,
173
                "jsonrpc": "2.0",
174
                "method": "eth_call",
175
                "params": [{"to": token_address, "data": data}, "latest"],
176
            }
177
            for i, data in enumerate(datas)
178
        ]
UNCOV
179
        payloads = [payload_balance] + payload_calls
×
UNCOV
180
        r = requests.post(
×
181
            self.ethereum_client.ethereum_node_url,
182
            json=payloads,
183
            timeout=self.ethereum_client.timeout,
184
        )
UNCOV
185
        if not r.ok:
×
186
            raise CannotGetPriceFromOracle(
×
187
                f"Error from node with url={self.ethereum_client.ethereum_node_url}"
188
            )
189

UNCOV
190
        results = []
×
UNCOV
191
        for result in r.json():
×
UNCOV
192
            if "result" not in result:
×
193
                raise CannotGetPriceFromOracle(result["error"])
×
194
            else:
UNCOV
195
                results.append(HexBytes(result["result"]))
×
196

UNCOV
197
        balance = int(results[0].hex(), 16)
×
UNCOV
198
        token_decimals = decode_abi(["uint8"], results[1])[0]
×
UNCOV
199
        token_balance = decode_abi(["uint256"], results[2])[0]
×
UNCOV
200
        return balance, token_decimals, token_balance
×
201

202
    def get_price(self, token_address: str) -> float:
1✔
203
        try:
1✔
204
            uniswap_exchange_address = self.get_uniswap_exchange(token_address)
1✔
UNCOV
205
            if uniswap_exchange_address == NULL_ADDRESS:
×
206
                raise ValueError
×
207
        except (Web3Exception, DecodingError, ValueError) as e:
1✔
208
            message = f"Non existing uniswap exchange for token={token_address}"
1✔
209
            logger.debug(message)
1✔
210
            raise CannotGetPriceFromOracle(message) from e
1✔
211

UNCOV
212
        try:
×
UNCOV
213
            balance, token_decimals, token_balance = self._get_balances_using_batching(
×
214
                uniswap_exchange_address, token_address
215
            )
216
            # Check liquidity. Require at least 2 ether to be on the pool
UNCOV
217
            if balance / 1e18 < 2:
×
UNCOV
218
                raise CannotGetPriceFromOracle(
×
219
                    f"Not enough liquidity for token={token_address}"
220
                )
221

UNCOV
222
            price = balance / token_balance / 10 ** (18 - token_decimals)
×
UNCOV
223
            if price <= 0.0:
×
224
                message = (
×
225
                    f"price={price} <= 0 from uniswap-factory={uniswap_exchange_address} "
226
                    f"for token={token_address}"
227
                )
228
                logger.debug(message)
×
229
                raise InvalidPriceFromOracle(message)
×
UNCOV
230
            return price
×
UNCOV
231
        except (
×
232
            Web3Exception,
233
            DecodingError,
234
            ValueError,
235
            ZeroDivisionError,
236
        ) as e:
237
            message = f"Cannot get token balance for token={token_address}"
×
238
            logger.debug(message)
×
239
            raise CannotGetPriceFromOracle(message) from e
×
240

241

242
class UniswapV2Oracle(PricePoolOracle, PriceOracle):
1✔
243
    ROUTER_ADDRESSES = {
1✔
244
        EthereumNetwork.MAINNET: "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D",
245
    }
246

247
    # Pair init code is keccak(getCode(UniswapV2Pair))
248
    PAIR_INIT_CODE = HexBytes(
1✔
249
        "0x96e8ac4277198ff8b6f785478aa9a39f403cb768dd02cbee326c3e7da348845f"
250
    )
251

252
    def __init__(
1✔
253
        self, ethereum_client: EthereumClient, router_address: Optional[str] = None
254
    ):
255
        """
256
        :param ethereum_client:
257
        :param router_address: https://uniswap.org/docs/v2/smart-contracts/router02/
258
        """
259
        self.ethereum_client = ethereum_client
1✔
260
        self.w3 = ethereum_client.w3
1✔
261
        self.router_address: str = router_address or self.ROUTER_ADDRESSES.get(
1✔
262
            ethereum_client.get_network(),
263
            self.ROUTER_ADDRESSES[EthereumNetwork.MAINNET],
264
        )
265
        self.router = get_uniswap_v2_router_contract(
1✔
266
            ethereum_client.w3, ChecksumAddress(HexAddress(HexStr(self.router_address)))
267
        )
268

269
    @classmethod
1✔
270
    def is_available(
1✔
271
        cls,
272
        ethereum_client: EthereumClient,
273
    ) -> bool:
274
        """
275
        :param ethereum_client:
276
        :return: `True` if Oracle is available for the EthereumClient provided, `False` otherwise
277
        """
UNCOV
278
        return ethereum_client.is_contract(
×
279
            ChecksumAddress(
280
                HexAddress(
281
                    HexStr(
282
                        cls.ROUTER_ADDRESSES.get(
283
                            ethereum_client.get_network(),
284
                            cls.ROUTER_ADDRESSES[EthereumNetwork.MAINNET],
285
                        )
286
                    )
287
                )
288
            )
289
        )
290

291
    @cached_property
1✔
292
    def factory(self):
1✔
293
        return get_uniswap_v2_factory_contract(
×
294
            self.ethereum_client.w3, self.factory_address
295
        )
296

297
    @cached_property
1✔
298
    def factory_address(self) -> str:
1✔
299
        """
300
        :return: Uniswap factory checksummed address
301
        :raises: Web3Exception: If router contract is not deployed
302
        """
UNCOV
303
        return self.router.functions.factory().call()
×
304

305
    @cached_property
1✔
306
    def weth_address(self) -> str:
1✔
307
        """
308
        :return: Wrapped ether checksummed address
309
        :raises: Web3Exception: If router contract is not deployed
310
        """
311
        return self.router.functions.WETH().call()
1✔
312

313
    @functools.lru_cache(maxsize=2048)
1✔
314
    def get_pair_address(
1✔
315
        self, token_address: str, token_address_2: str
316
    ) -> Optional[str]:
317
        """
318
        Get uniswap pair address. `token_address` and `token_address_2` are interchangeable.
319
        https://uniswap.org/docs/v2/smart-contracts/factory/
320

321
        :param token_address:
322
        :param token_address_2:
323
        :return: Address of the pair for `token_address` and `token_address_2`, if it has been created, else `None`.
324
        """
325
        # Token order does not matter for getting pair, just for creating or querying PairCreated event
326
        pair_address = self.factory.functions.getPair(
×
327
            token_address, token_address_2
328
        ).call()
329
        if pair_address == NULL_ADDRESS:
×
330
            return None
×
331
        return pair_address
×
332

333
    @functools.lru_cache(maxsize=2048)
1✔
334
    def calculate_pair_address(self, token_address: str, token_address_2: str):
1✔
335
        """
336
        Calculate pair address without querying blockchain.
337
        https://uniswap.org/docs/v2/smart-contract-integration/getting-pair-addresses/#docs-header
338

339

340
        :param token_address:
341
        :param token_address_2:
342
        :return: Checksummed address for token pair. It could be not created yet
343
        """
344
        if token_address.lower() > token_address_2.lower():
1✔
345
            token_address, token_address_2 = token_address_2, token_address
1✔
346
        salt = fast_keccak(
1✔
347
            encode_packed(["address", "address"], [token_address, token_address_2])
348
        )
349
        address = fast_keccak(
1✔
350
            encode_packed(
351
                ["bytes", "address", "bytes", "bytes"],
352
                [HexBytes("ff"), self.factory_address, salt, self.PAIR_INIT_CODE],
353
            )
354
        )[-20:]
355
        return fast_bytes_to_checksum_address(address)
1✔
356

357
    def get_reserves(self, pair_address: str) -> Tuple[int, int]:
1✔
358
        """
359
        Returns the number of tokens in the pool. `getReserves()` also returns the block.timestamp (mod 2**32) of
360
        the last block during which an interaction occurred for the pair, but it's ignored.
361
        https://uniswap.org/docs/v2/smart-contracts/pair/
362

363
        :return: Reserves of `token_address` and `token_address_2` used to price trades and distribute liquidity.
364
        """
UNCOV
365
        pair_contract = get_uniswap_v2_pair_contract(
×
366
            self.ethereum_client.w3, ChecksumAddress(HexAddress(HexStr(pair_address)))
367
        )
368
        # Reserves return token_1 reserves, token_2 reserves and block timestamp (mod 2**32) of last interaction
UNCOV
369
        reserves_1, reserves_2, _ = pair_contract.functions.getReserves().call()
×
UNCOV
370
        return reserves_1, reserves_2
×
371

372
    def get_price(
1✔
373
        self, token_address: str, token_address_2: Optional[str] = None
374
    ) -> float:
375
        # These lines only make sense when `get_pair_address` is used. `calculate_pair_address` will always return
376
        # an address, even it that exchange is not deployed
377

378
        # pair_address = self.get_pair_address(token_address, token_address_2)
379
        # if not pair_address:
380
        #    error_message = f'Non existing uniswap V2 exchange for token={token_address}'
381
        #    logger.warning(error_message)
382
        #    raise CannotGetPriceFromOracle(error_message)
383
        try:
1✔
384
            token_address_2 = token_address_2 if token_address_2 else self.weth_address
1✔
385
            if token_address == token_address_2:
1✔
UNCOV
386
                return 1.0
×
387
            pair_address = self.calculate_pair_address(token_address, token_address_2)
1✔
388
            # Tokens are sorted, so token_1 < token_2
389
            reserves_1, reserves_2 = self.get_reserves(pair_address)
1✔
390
            decimals_1 = get_decimals(token_address, self.ethereum_client)
1✔
391
            decimals_2 = get_decimals(token_address_2, self.ethereum_client)
1✔
392
            if token_address.lower() > token_address_2.lower():
1✔
393
                reserves_2, reserves_1 = reserves_1, reserves_2
1✔
394

395
            # Check liquidity. Require at least 2 units of every token to be on the pool
396
            if reserves_1 / 10**decimals_1 < 2 or reserves_2 / 10**decimals_2 < 2:
1✔
397
                raise CannotGetPriceFromOracle(
1✔
398
                    f"Not enough liquidity for pair token_1={token_address} "
399
                    f"token_2={token_address_2}"
400
                )
401
            decimals_normalized_reserves_1 = reserves_1 * 10**decimals_2
1✔
402
            decimals_normalized_reserves_2 = reserves_2 * 10**decimals_1
1✔
403

404
            return decimals_normalized_reserves_2 / decimals_normalized_reserves_1
1✔
405
        except (
1✔
406
            Web3Exception,
407
            DecodingError,
408
            ValueError,
409
            ZeroDivisionError,
410
        ) as e:
411
            message = (
1✔
412
                f"Cannot get uniswap v2 price for pair token_1={token_address} "
413
                f"token_2={token_address_2}"
414
            )
415
            logger.debug(message)
1✔
416
            raise CannotGetPriceFromOracle(message) from e
1✔
417

418
    def get_price_without_exception(
1✔
419
        self, token_address: str, token_address_2: Optional[str] = None
420
    ) -> float:
421
        """
422
        :param token_address:
423
        :param token_address_2:
424
        :return: Call `get_price`, return 0. instead on an exception if there's any issue
425
        """
426
        try:
×
427
            return self.get_price(token_address, token_address_2=token_address_2)
×
428
        except CannotGetPriceFromOracle:
×
429
            return 0.0
×
430

431
    def get_pool_token_price(self, pool_token_address: ChecksumAddress) -> float:
1✔
432
        """
433
        Estimate pool token price based on its components
434

435
        :param pool_token_address:
436
        :return: Pool token eth price per unit (total pool token supply / 1e18)
437
        :raises: CannotGetPriceFromOracle
438
        """
UNCOV
439
        try:
×
UNCOV
440
            pair_contract = get_uniswap_v2_pair_contract(
×
441
                self.ethereum_client.w3, pool_token_address
442
            )
UNCOV
443
            (
×
444
                token_reserves,
445
                token_address_1,
446
                token_address_2,
447
                total_supply_response,
448
            ) = self.ethereum_client.batch_call(
449
                [
450
                    pair_contract.functions.getReserves(),
451
                    pair_contract.functions.token0(),
452
                    pair_contract.functions.token1(),
453
                    pair_contract.functions.totalSupply(),
454
                ]
455
            )
UNCOV
456
            if token_reserves is not None:
×
UNCOV
457
                (reserves_1, reserves_2, _) = token_reserves
×
458
            else:
459
                reserves_1, reserves_2 = 0.0, 0.0
×
460

UNCOV
461
            total_supply = bytes_to_float(total_supply_response)
×
UNCOV
462
            decimals_1 = get_decimals(token_address_1, self.ethereum_client)
×
UNCOV
463
            decimals_2 = get_decimals(token_address_2, self.ethereum_client)
×
464

465
            # Total value for one token should be the same than total value for the other token
466
            # if pool is under active arbitrage. We use the price for the first token we find
UNCOV
467
            for token_address, decimals, reserves in zip(
×
468
                (token_address_1, token_address_2),
469
                (decimals_1, decimals_2),
470
                (reserves_1, reserves_2),
471
            ):
UNCOV
472
                try:
×
UNCOV
473
                    price = self.get_price(str(token_address))
×
UNCOV
474
                    total_value = (reserves / 10**decimals_1) * price
×
UNCOV
475
                    return (total_value * 2) / (total_supply / 1e18)
×
476
                except CannotGetPriceFromOracle:
×
477
                    continue
×
478
        except (
×
479
            Web3Exception,
480
            DecodingError,
481
            ValueError,
482
            ZeroDivisionError,
483
        ) as e:
484
            message = f"Cannot get uniswap v2 price for pool token={pool_token_address}"
×
485
            logger.debug(message)
×
486
            raise CannotGetPriceFromOracle(message) from e
×
487
        return 0
×
488

489

490
class AaveOracle(PriceOracle):
1✔
491
    def __init__(self, ethereum_client: EthereumClient, price_oracle: PriceOracle):
1✔
492
        """
493
        :param ethereum_client:
494
        :param price_oracle: Price oracle to get the price for the components of Aave Tokens, UniswapV2 is
495
        recommended
496
        """
UNCOV
497
        self.ethereum_client = ethereum_client
×
UNCOV
498
        self.w3 = ethereum_client.w3
×
UNCOV
499
        self.price_oracle = price_oracle
×
500

501
    @classmethod
1✔
502
    def is_available(
1✔
503
        cls,
504
        ethereum_client: EthereumClient,
505
    ) -> bool:
506
        """
507
        :param ethereum_client:
508
        :return: `True` if Oracle is available for the EthereumClient provided, `False` otherwise
509
        """
UNCOV
510
        return ethereum_client.get_network() == EthereumNetwork.MAINNET
×
511

512
    def get_price(self, token_address: str) -> float:
1✔
UNCOV
513
        if (
×
514
            token_address == "0x4da27a545c0c5B758a6BA100e3a049001de870f5"
515
        ):  # Stacked Aave
UNCOV
516
            return self.price_oracle.get_price(
×
517
                "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9"
518
            )
UNCOV
519
        try:
×
UNCOV
520
            token_address_checksum = ChecksumAddress(HexAddress(HexStr(token_address)))
×
UNCOV
521
            underlying_token = (
×
522
                self.w3.eth.contract(token_address_checksum, abi=AAVE_ATOKEN_ABI)
523
                .functions.UNDERLYING_ASSET_ADDRESS()
524
                .call()
525
            )
UNCOV
526
            return self.price_oracle.get_price(underlying_token)
×
UNCOV
527
        except (Web3Exception, DecodingError, ValueError):
×
UNCOV
528
            raise CannotGetPriceFromOracle(
×
529
                f"Cannot get price for {token_address}. It is not an Aaave atoken"
530
            )
531

532

533
class CreamOracle(PriceOracle):
1✔
534
    def __init__(self, ethereum_client: EthereumClient, price_oracle: PriceOracle):
1✔
535
        """
536
        :param ethereum_client:
537
        :param price_oracle: Price oracle to get the price for the components of Cream Tokens, UniswapV2 is
538
        recommended
539
        """
UNCOV
540
        self.ethereum_client = ethereum_client
×
UNCOV
541
        self.w3 = ethereum_client.w3
×
UNCOV
542
        self.price_oracle = price_oracle
×
543

544
    @classmethod
1✔
545
    def is_available(
1✔
546
        cls,
547
        ethereum_client: EthereumClient,
548
    ) -> bool:
549
        """
550
        :param ethereum_client:
551
        :return: `True` if Oracle is available for the EthereumClient provided, `False` otherwise
552
        """
UNCOV
553
        return ethereum_client.get_network() == EthereumNetwork.MAINNET
×
554

555
    def get_price(self, token_address: str) -> float:
1✔
UNCOV
556
        try:
×
UNCOV
557
            address_checksum = ChecksumAddress(HexAddress(HexStr(token_address)))
×
UNCOV
558
            underlying_token = (
×
559
                self.w3.eth.contract(address_checksum, abi=cream_ctoken_abi)
560
                .functions.underlying()
561
                .call()
562
            )
UNCOV
563
            return self.price_oracle.get_price(underlying_token)
×
UNCOV
564
        except (Web3Exception, DecodingError, ValueError):
×
UNCOV
565
            raise CannotGetPriceFromOracle(
×
566
                f"Cannot get price for {token_address}. It is not a Cream cToken"
567
            )
568

569

570
class ZerionComposedOracle(ComposedPriceOracle):
1✔
571
    ZERION_ADAPTER_ADDRESS: Optional[str] = None
1✔
572

573
    def __init__(
1✔
574
        self,
575
        ethereum_client: EthereumClient,
576
        zerion_adapter_address: Optional[str] = None,
577
    ):
578
        """
579
        :param ethereum_client:
580
        :param zerion_adapter_address: Can be retrieved using the Zerion registry on
581
            0x06FE76B2f432fdfEcAEf1a7d4f6C3d41B5861672 . https://github.com/zeriontech/defi-sdk/wiki/Addresses is
582
            outdated
583
        """
584

585
        self.zerion_adapter_address = (
1✔
586
            zerion_adapter_address or self.ZERION_ADAPTER_ADDRESS
587
        )
588
        if not self.zerion_adapter_address:
1✔
589
            raise ValueError("Expected a Zerion adapter address")
1✔
590
        self.ethereum_client = ethereum_client
1✔
591
        self.w3 = ethereum_client.w3
1✔
592

593
    @classmethod
1✔
594
    def is_available(
1✔
595
        cls,
596
        ethereum_client: EthereumClient,
597
    ) -> bool:
598
        """
599
        :param ethereum_client:
600
        :return: `True` if Oracle is available for the EthereumClient provided, `False` otherwise
601
        """
UNCOV
602
        return ethereum_client.get_network() == EthereumNetwork.MAINNET
×
603

604
    @cached_property
1✔
605
    def zerion_adapter_contract(self) -> Optional[Contract]:
1✔
606
        """
607
        :return: https://curve.readthedocs.io/registry-registry.html
608
        """
609
        address = (
1✔
610
            ChecksumAddress(HexAddress(HexStr(self.zerion_adapter_address)))
611
            if self.zerion_adapter_address
612
            else None
613
        )
614
        if address and self.ethereum_client.is_contract(address):
1✔
UNCOV
615
            return self.w3.eth.contract(address, abi=ZERION_TOKEN_ADAPTER_ABI)
×
616
        return None
1✔
617

618
    def get_underlying_tokens(
1✔
619
        self, token_address: ChecksumAddress
620
    ) -> List[UnderlyingToken]:
621
        """
622
        Use Zerion Token adapter to return underlying components for pool
623

624
        :param token_address: Pool token address
625
        :return: Price per share and underlying token
626
        :raises: CannotGetPriceFromOracle
627
        """
628
        if not self.zerion_adapter_contract:
1✔
629
            raise CannotGetPriceFromOracle(
1✔
630
                f"{self.__class__.__name__}: Cannot get price for {token_address}. Cannot find Zerion adapter"
631
            )
632

UNCOV
633
        try:
×
UNCOV
634
            results = self.zerion_adapter_contract.functions.getComponents(
×
635
                token_address
636
            ).call()
UNCOV
637
            if results:
×
UNCOV
638
                underlying_tokens = []
×
UNCOV
639
                for token_address, _, quantity in results:
×
640
                    # If there's just one component, quantity must be 100%
UNCOV
641
                    normalized_quantity = (
×
642
                        1.0
643
                        if len(results) == 1
644
                        else quantity / 10 ** len(str(quantity))
645
                    )
UNCOV
646
                    underlying_tokens.append(
×
647
                        UnderlyingToken(token_address, normalized_quantity)
648
                    )
UNCOV
649
                return underlying_tokens
×
UNCOV
650
        except (ValueError, Web3Exception, DecodingError):
×
UNCOV
651
            pass
×
652

UNCOV
653
        raise CannotGetPriceFromOracle(
×
654
            f"{self.__class__.__name__}: Cannot get price for {token_address}. It is not a Zerion supported pool token"
655
        )
656

657

658
class CurveOracle(ZerionComposedOracle):
1✔
659
    """
660
    Curve pool Oracle. More info on https://curve.fi/
661
    """
662

663
    ZERION_ADAPTER_ADDRESS = (
1✔
664
        "0x99b0bEadc3984eab9842AF81f9fad0C2219108cc"  # Mainnet address
665
    )
666

667
    def get_underlying_tokens(
1✔
668
        self, token_address: ChecksumAddress
669
    ) -> List[UnderlyingToken]:
670
        """
671
        Check if passed token address is a Curve gauge deposit token, if it's a gauge we replace the address with
672
        the corresponding LP token address
673
        More info on https://resources.curve.fi/base-features/understanding-gauges
674
        """
675
        if CURVE_GAUGE_TO_LP_TOKEN.get(token_address):
1✔
UNCOV
676
            address = ChecksumAddress(
×
677
                HexAddress(HexStr(CURVE_GAUGE_TO_LP_TOKEN[token_address]))
678
            )
UNCOV
679
            return super().get_underlying_tokens(address)
×
680

681
        return super().get_underlying_tokens(token_address)
1✔
682

683

684
class PoolTogetherOracle(ZerionComposedOracle):
1✔
685
    """
686
    PoolTogether pool Oracle. More info on https://pooltogether.com/
687
    """
688

689
    ZERION_ADAPTER_ADDRESS = (
1✔
690
        "0xb4E0E1672fFd9b128784dB9f3BE9158fac3f1DFc"  # Mainnet address
691
    )
692

693

694
class EnzymeOracle(ZerionComposedOracle):
1✔
695
    """
696
    Enzyme pool Oracle. More info on https://enzyme.finance/
697
    """
698

699
    ZERION_ADAPTER_ADDRESS = (
1✔
700
        "0x9e71455D748C23566b19493D09435574097C7D67"  # Mainnet address
701
    )
702

703

704
class YearnOracle(ComposedPriceOracle):
1✔
705
    """
706
    Yearn oracle. More info on https://docs.yearn.finance
707
    """
708

709
    def __init__(
1✔
710
        self,
711
        ethereum_client: EthereumClient,
712
        yearn_vault_token_adapter: Optional[
713
            str
714
        ] = "0xb460FcC1B6c1CBD7D03F47B6BD5F03994d286c75",
715
        iearn_token_adapter: Optional[
716
            str
717
        ] = "0x65B23774daE2a5be02dD275918DDF048d177a5B4",
718
    ):
719
        """
720
        :param ethereum_client:
721
        :param yearn_vault_token_adapter:
722
        :param iearn_token_adapter:
723
        """
UNCOV
724
        self.ethereum_client = ethereum_client
×
UNCOV
725
        self.w3 = ethereum_client.w3
×
UNCOV
726
        self.yearn_vault_token_adapter = ZerionComposedOracle(
×
727
            ethereum_client, yearn_vault_token_adapter
728
        )
UNCOV
729
        self.iearn_token_adapter = ZerionComposedOracle(
×
730
            ethereum_client, iearn_token_adapter
731
        )
732

733
    @classmethod
1✔
734
    def is_available(
1✔
735
        cls,
736
        ethereum_client: EthereumClient,
737
    ) -> bool:
738
        """
739
        :param ethereum_client:
740
        :return: `True` if Oracle is available for the EthereumClient provided, `False` otherwise
741
        """
UNCOV
742
        return ethereum_client.get_network() == EthereumNetwork.MAINNET
×
743

744
    def get_underlying_tokens(
1✔
745
        self, token_address: ChecksumAddress
746
    ) -> List[UnderlyingToken]:
747
        """
748
        :param token_address:
749
        :return: Price per share and underlying token
750
        :raises: CannotGetPriceFromOracle
751
        """
UNCOV
752
        for adapter in (self.yearn_vault_token_adapter, self.iearn_token_adapter):
×
UNCOV
753
            try:
×
754
                # Getting underlying token function is the same for both yVault and yToken
UNCOV
755
                return adapter.get_underlying_tokens(token_address)
×
UNCOV
756
            except CannotGetPriceFromOracle:
×
UNCOV
757
                pass
×
758

UNCOV
759
        raise CannotGetPriceFromOracle(
×
760
            f"Cannot get price for {token_address}. It is not a Yearn yToken/yVault"
761
        )
762

763

764
class BalancerOracle(PricePoolOracle):
1✔
765
    """
766
    Oracle for Balancer. More info on https://balancer.exchange
767
    """
768

769
    def __init__(self, ethereum_client: EthereumClient, price_oracle: PriceOracle):
1✔
770
        """
771
        :param ethereum_client:
772
        :param price_oracle: Price oracle to get the price for the components of the Balancer Pool, UniswapV2 is
773
        recommended
774
        """
UNCOV
775
        self.ethereum_client = ethereum_client
×
UNCOV
776
        self.w3 = ethereum_client.w3
×
UNCOV
777
        self.price_oracle = price_oracle
×
778

779
    @classmethod
1✔
780
    def is_available(
1✔
781
        cls,
782
        ethereum_client: EthereumClient,
783
    ) -> bool:
784
        """
785
        :param ethereum_client:
786
        :return: `True` if Oracle is available for the EthereumClient provided, `False` otherwise
787
        """
UNCOV
788
        return ethereum_client.get_network() == EthereumNetwork.MAINNET
×
789

790
    def get_pool_token_price(self, pool_token_address: ChecksumAddress) -> float:
1✔
791
        """
792
        Estimate balancer pool token price based on its components
793

794
        :param pool_token_address: Balancer pool token address
795
        :return: Eth price for pool token
796
        :raises: CannotGetPriceFromOracle
797
        """
UNCOV
798
        try:
×
UNCOV
799
            balancer_pool_contract = self.w3.eth.contract(
×
800
                pool_token_address, abi=balancer_pool_abi
801
            )
UNCOV
802
            current_tokens, total_supply_response = self.ethereum_client.batch_call(
×
803
                [
804
                    balancer_pool_contract.functions.getCurrentTokens(),
805
                    balancer_pool_contract.functions.totalSupply(),
806
                ]
807
            )
UNCOV
808
            total_supply = bytes_to_float(total_supply_response)
×
UNCOV
809
            if not current_tokens:
×
810
                raise ValueError
×
UNCOV
811
            number_tokens = len(current_tokens)
×
812
            # denormalized_weight = self.ethereum_client.batch_call([
813
            #    balancer_pool_contract.functions.getReserves(current_token)
814
            #    for current_token in current_tokens
815
            # ])
UNCOV
816
            token_balances_and_decimals = self.ethereum_client.batch_call(
×
817
                [
818
                    balancer_pool_contract.functions.getBalance(token_address)
819
                    for token_address in current_tokens
820
                ]
821
                + [
822
                    get_erc20_contract(
823
                        self.w3, ChecksumAddress(HexAddress(HexStr(str(token_address))))
824
                    ).functions.decimals()
825
                    for token_address in current_tokens
826
                ]
827
            )
UNCOV
828
            token_balances = token_balances_and_decimals[:number_tokens]
×
UNCOV
829
            token_decimals = token_balances_and_decimals[number_tokens:]
×
UNCOV
830
            token_prices = [
×
831
                self.price_oracle.get_price(token_address)
832
                for token_address in current_tokens
833
            ]
834
            total_eth_value = 0.0
×
835
            for token_balance_bytes, token_decimal_bytes, token_price in zip(
×
836
                token_balances, token_decimals, token_prices
837
            ):
838
                token_balance = bytes_to_float(token_balance_bytes)
×
839
                token_decimal = bytes_to_float(token_decimal_bytes)
×
840
                total_eth_value += (token_balance / 10**token_decimal) * token_price
×
841
            return total_eth_value / (total_supply / 1e18)
×
UNCOV
842
        except (Web3Exception, DecodingError, ValueError):
×
843
            raise CannotGetPriceFromOracle(
×
844
                f"Cannot get price for {pool_token_address}. "
845
                f"It is not a balancer pool token"
846
            )
847

848

849
class MooniswapOracle(BalancerOracle):
1✔
850
    def get_pool_token_price(self, pool_token_address: ChecksumAddress) -> float:
1✔
851
        """
852
        Estimate balancer pool token price based on its components
853

854
        :param pool_token_address: Moniswap pool token address
855
        :return: Eth price for pool token
856
        :raises: CannotGetPriceFromOracle
857
        """
UNCOV
858
        try:
×
UNCOV
859
            balancer_pool_contract = self.w3.eth.contract(
×
860
                pool_token_address, abi=mooniswap_abi
861
            )
UNCOV
862
            tokens, total_supply_response = self.ethereum_client.batch_call(
×
863
                [
864
                    balancer_pool_contract.functions.getTokens(),
865
                    balancer_pool_contract.functions.totalSupply(),
866
                ]
867
            )
UNCOV
868
            total_supply = bytes_to_float(total_supply_response)
×
869

UNCOV
870
            if not tokens:
×
871
                raise ValueError
×
UNCOV
872
            if len(tokens) == 1 or any(
×
873
                [token == NULL_ADDRESS for token in tokens]
874
            ):  # One of the tokens is ether
UNCOV
875
                ethereum_amount = self.ethereum_client.get_balance(pool_token_address)
×
UNCOV
876
                return ethereum_amount * 2 / total_supply
×
877
            else:
UNCOV
878
                for token in tokens:
×
UNCOV
879
                    try:
×
UNCOV
880
                        price = self.price_oracle.get_price(token)
×
UNCOV
881
                        token_contract = get_erc20_contract(
×
882
                            self.w3, ChecksumAddress(HexAddress(HexStr(str(token))))
883
                        )
UNCOV
884
                        (
×
885
                            token_balance_response,
886
                            token_decimals_response,
887
                        ) = self.ethereum_client.batch_call(
888
                            [
889
                                token_contract.functions.balanceOf(pool_token_address),
890
                                token_contract.functions.decimals(),
891
                            ]
892
                        )
UNCOV
893
                        token_balance = bytes_to_float(token_balance_response)
×
UNCOV
894
                        token_decimals = bytes_to_float(token_decimals_response)
×
895

UNCOV
896
                        total_value = (token_balance / 10**token_decimals) * price
×
UNCOV
897
                        return (total_value * 2) / (total_supply / 1e18)
×
UNCOV
898
                    except CannotGetPriceFromOracle:
×
UNCOV
899
                        continue
×
900

901
                raise CannotGetPriceFromOracle(
×
902
                    f"Cannot get price for {pool_token_address}. "
903
                    f"It is not a mooniswap pool token"
904
                )
905

UNCOV
906
        except (Web3Exception, DecodingError, ValueError):
×
UNCOV
907
            raise CannotGetPriceFromOracle(
×
908
                f"Cannot get price for {pool_token_address}. "
909
                f"It is not a mooniswap pool token"
910
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc