• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

safe-global / safe-eth-py / 10793540350

10 Sep 2024 01:31PM UTC coverage: 93.551% (-0.3%) from 93.892%
10793540350

push

github

falvaradorodriguez
Fix cowswap test

8777 of 9382 relevant lines covered (93.55%)

3.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.59
/safe_eth/eth/oracles/cowswap.py
1
import logging
4✔
2
from typing import Optional, Union, cast
4✔
3

4
from eth_typing import ChecksumAddress, HexAddress, HexStr
4✔
5

6
from safe_eth.eth.clients.cowswap import CowSwapAPI, OrderKind
4✔
7
from safe_eth.eth.clients.cowswap.cow_swap_api import AmountResponse, ErrorResponse
4✔
8

9
from .. import EthereumClient, EthereumNetworkNotSupported
4✔
10
from .exceptions import CannotGetPriceFromOracle
4✔
11
from .oracles import PriceOracle
4✔
12
from .utils import get_decimals
4✔
13

14
logger = logging.getLogger(__name__)
4✔
15

16

17
class CowswapOracle(PriceOracle):
4✔
18
    """
19
    CowSwap Oracle implementation
20

21
    https://docs.cow.fi/off-chain-services/api
22
    """
23

24
    def __init__(
4✔
25
        self,
26
        ethereum_client: EthereumClient,
27
    ):
28
        """
29
        :param ethereum_client:
30
        """
31
        self.ethereum_client = ethereum_client
×
32
        self.w3 = ethereum_client.w3
×
33
        self.api = CowSwapAPI(ethereum_client.get_network())
×
34

35
    @classmethod
4✔
36
    def is_available(
4✔
37
        cls,
38
        ethereum_client: EthereumClient,
39
    ) -> bool:
40
        """
41
        :param ethereum_client:
42
        :return: `True` if CowSwap is available for the EthereumClient provided, `False` otherwise
43
        """
44
        try:
×
45
            CowSwapAPI(ethereum_client.get_network())
×
46
            return True
×
47
        except EthereumNetworkNotSupported:
×
48
            return False
×
49

50
    def get_price(
4✔
51
        self, token_address_1: str, token_address_2: Optional[str] = None
52
    ) -> float:
53
        token_address_2 = token_address_2 or self.api.weth_address
×
54
        if token_address_1 == token_address_2:
×
55
            return 1.0
×
56
        token_address_1_checksum = ChecksumAddress(HexAddress(HexStr(token_address_1)))
×
57
        token_address_2_checksum = ChecksumAddress(HexAddress(HexStr(token_address_2)))
×
58
        token_1_decimals = get_decimals(token_address_1_checksum, self.ethereum_client)
×
59
        try:
×
60
            result: Union[
×
61
                AmountResponse, ErrorResponse
62
            ] = self.api.get_estimated_amount(
63
                token_address_1_checksum,
64
                token_address_2_checksum,
65
                OrderKind.SELL,
66
                10**token_1_decimals,
67
            )
68
            if "buyAmount" in result and "sellAmount" in result:
×
69
                result = cast(AmountResponse, result)
×
70
                # Decimals needs to be adjusted
71
                token_2_decimals = get_decimals(
×
72
                    token_address_2_checksum, self.ethereum_client
73
                )
74
                return (
×
75
                    float(result["buyAmount"])
76
                    / result["sellAmount"]
77
                    * 10 ** (token_1_decimals - token_2_decimals)
78
                )
79

80
            exception = None
×
81
        except IOError as exc:
×
82
            exception = exc
×
83

84
        message = (
×
85
            f"Cannot get price from CowSwap {{}} "
86
            f"for token-1={token_address_1} to token-2={token_address_2}"
87
        )
88
        logger.debug(message)
×
89
        raise CannotGetPriceFromOracle(message) from exception
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc