• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

safe-global / safe-eth-py / 10793540350

10 Sep 2024 01:31PM UTC coverage: 93.551% (-0.3%) from 93.892%
10793540350

push

github

falvaradorodriguez
Fix cowswap test

8777 of 9382 relevant lines covered (93.55%)

3.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.94
/safe_eth/eth/oracles/uniswap_v3.py
1
import functools
4✔
2
import logging
4✔
3
from functools import cached_property
4✔
4
from typing import Optional
4✔
5

6
from eth_abi.exceptions import DecodingError
4✔
7
from eth_typing import ChecksumAddress, HexAddress, HexStr
4✔
8
from web3.contract import Contract
4✔
9
from web3.exceptions import Web3Exception
4✔
10

11
from .. import EthereumClient, EthereumNetwork
4✔
12
from ..constants import NULL_ADDRESS
4✔
13
from ..contracts import get_erc20_contract
4✔
14
from .abis.uniswap_v3 import (
4✔
15
    uniswap_v3_factory_abi,
16
    uniswap_v3_pool_abi,
17
    uniswap_v3_router_abi,
18
)
19
from .exceptions import CannotGetPriceFromOracle
4✔
20
from .oracles import PriceOracle
4✔
21
from .utils import get_decimals
4✔
22

23
logger = logging.getLogger(__name__)
4✔
24

25

26
class UniswapV3Oracle(PriceOracle):
4✔
27
    # https://docs.uniswap.org/protocol/reference/deployments
28
    DEFAULT_ROUTER_ADDRESS = "0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45"
4✔
29
    ROUTER_ADDRESSES = {
4✔
30
        # SwapRouter02
31
        EthereumNetwork.MAINNET: DEFAULT_ROUTER_ADDRESS,
32
        EthereumNetwork.CELO_MAINNET: "0x5615CDAb10dc425a742d643d949a7F474C01abc4",
33
    }
34

35
    # Cache to optimize calculation: https://docs.uniswap.org/sdk/guides/fetching-prices#understanding-sqrtprice
36
    PRICE_CONVERSION_CONSTANT = 2**192
4✔
37

38
    def __init__(
4✔
39
        self,
40
        ethereum_client: EthereumClient,
41
        uniswap_v3_router_address: Optional[ChecksumAddress] = None,
42
    ):
43
        """
44
        :param ethereum_client:
45
        :param uniswap_v3_router_address: Provide a custom `SwapRouter02` address
46
        """
47
        self.ethereum_client = ethereum_client
4✔
48
        self.w3 = ethereum_client.w3
4✔
49

50
        self.router_address = uniswap_v3_router_address or self.ROUTER_ADDRESSES.get(
4✔
51
            self.ethereum_client.get_network(), self.DEFAULT_ROUTER_ADDRESS
52
        )
53
        self.factory = self.get_factory()
4✔
54

55
    @classmethod
4✔
56
    def is_available(
4✔
57
        cls,
58
        ethereum_client: EthereumClient,
59
        uniswap_v3_router_address: Optional[ChecksumAddress] = None,
60
    ) -> bool:
61
        """
62
        :param ethereum_client:
63
        :param uniswap_v3_router_address: Provide a custom `SwapRouter02` address
64
        :return: `True` if Uniswap V3 is available for the EthereumClient provided, `False` otherwise
65
        """
66
        router_address = ChecksumAddress(
4✔
67
            HexAddress(
68
                HexStr(
69
                    uniswap_v3_router_address
70
                    or cls.ROUTER_ADDRESSES.get(
71
                        ethereum_client.get_network(), cls.DEFAULT_ROUTER_ADDRESS
72
                    )
73
                )
74
            )
75
        )
76
        return ethereum_client.is_contract(router_address)
4✔
77

78
    def get_factory(self) -> Contract:
4✔
79
        """
80
        Factory contract creates the pools for token pairs
81

82
        :return: Uniswap V3 Factory Contract
83
        """
84
        try:
4✔
85
            factory_address = self.router.functions.factory().call()
4✔
86
        except Web3Exception:
4✔
87
            raise ValueError(
4✔
88
                f"Uniswap V3 Router Contract {self.router_address} does not exist"
89
            )
90
        return self.w3.eth.contract(factory_address, abi=uniswap_v3_factory_abi)
4✔
91

92
    @cached_property
4✔
93
    def router(self) -> Contract:
4✔
94
        """
95
        Router knows about the `Uniswap Factory` and `Wrapped Eth` addresses for the network
96

97
        :return: Uniswap V3 Router Contract
98
        """
99
        router_address = ChecksumAddress(HexAddress(HexStr(self.router_address)))
4✔
100
        return self.w3.eth.contract(address=router_address, abi=uniswap_v3_router_abi)
4✔
101

102
    @cached_property
4✔
103
    def weth_address(self) -> ChecksumAddress:
4✔
104
        """
105
        :return: Wrapped ether checksummed address
106
        """
107
        return self.router.functions.WETH9().call()
4✔
108

109
    @functools.lru_cache(maxsize=512)
4✔
110
    def get_pool_address(
4✔
111
        self, token_address: str, token_address_2: str, fee: Optional[int] = 3000
112
    ) -> Optional[ChecksumAddress]:
113
        """
114
        Get pool address for tokens with a given fee (by default, 0.3)
115

116
        :param token_address:
117
        :param token_address_2:
118
        :param fee: Uniswap V3 uses 0.3 as the default fee
119
        :return: Pool address
120
        """
121

122
        pool_address = self.factory.functions.getPool(
4✔
123
            token_address, token_address_2, fee
124
        ).call()
125
        if pool_address == NULL_ADDRESS:
4✔
126
            return None
4✔
127

128
        return pool_address
4✔
129

130
    def get_price(
4✔
131
        self, token_address: str, token_address_2: Optional[str] = None
132
    ) -> float:
133
        """
134
        :param token_address:
135
        :param token_address_2:
136
        :return: price for `token_address` related to `token_address_2`. If `token_address_2` is not
137
            provided, `Wrapped Eth` address will be used
138
        """
139
        token_address_2 = token_address_2 or self.weth_address
4✔
140
        if token_address == token_address_2:
4✔
141
            return 1.0
4✔
142

143
        reversed = token_address.lower() > token_address_2.lower()
4✔
144

145
        # Make it cache friendly as order does not matter
146
        args = (
4✔
147
            (token_address_2, token_address)
148
            if reversed
149
            else (token_address, token_address_2)
150
        )
151
        pool_address = self.get_pool_address(*args)
4✔
152

153
        if not pool_address:
4✔
154
            raise CannotGetPriceFromOracle(
4✔
155
                f"Uniswap V3 pool does not exist for {token_address} and {token_address_2}"
156
            )
157

158
        # Decimals needs to be adjusted
159
        token_decimals = get_decimals(token_address, self.ethereum_client)
4✔
160
        token_2_decimals = get_decimals(token_address_2, self.ethereum_client)
4✔
161

162
        token_address_checksum = ChecksumAddress(HexAddress(HexStr(token_address)))
4✔
163
        token_address2_checksum = (
4✔
164
            ChecksumAddress(HexAddress(HexStr(token_address_2)))
165
            if token_address_2
166
            else None
167
        )
168

169
        pool_contract = self.w3.eth.contract(pool_address, abi=uniswap_v3_pool_abi)
4✔
170
        try:
4✔
171
            (
4✔
172
                token_balance,
173
                token_2_balance,
174
                pool_contract_balance,
175
            ) = self.ethereum_client.batch_call(
176
                [
177
                    get_erc20_contract(
178
                        self.ethereum_client.w3, token_address_checksum
179
                    ).functions.balanceOf(pool_address),
180
                    get_erc20_contract(
181
                        self.ethereum_client.w3, token_address2_checksum
182
                    ).functions.balanceOf(pool_address),
183
                    pool_contract.functions.slot0(),
184
                ]
185
            )
186
            if pool_contract_balance is not None:
4✔
187
                (sqrt_price_x96, _, _, _, _, _, _) = pool_contract_balance
4✔
188
            else:
189
                sqrt_price_x96 = 0
×
190

191
            if (token_balance / 10**token_decimals) < 2 or (
4✔
192
                token_2_balance / 10**token_2_decimals
193
            ) < 2:
194
                message = (
4✔
195
                    f"Not enough liquidity on uniswap v3 for pair token_1={token_address} "
196
                    f"token_2={token_address_2}, at least 2 units of each token are required"
197
                )
198
                logger.debug(message)
4✔
199
                raise CannotGetPriceFromOracle(message)
4✔
200
        except (
4✔
201
            Web3Exception,
202
            DecodingError,
203
            ValueError,
204
        ) as e:
205
            message = (
×
206
                f"Cannot get uniswap v3 price for pair token_1={token_address} "
207
                f"token_2={token_address_2}"
208
            )
209
            logger.debug(message)
×
210
            raise CannotGetPriceFromOracle(message) from e
×
211

212
        # https://docs.uniswap.org/sdk/guides/fetching-prices
213
        if not reversed:
4✔
214
            # Multiplying by itself is way faster than exponential
215
            price = (sqrt_price_x96 * sqrt_price_x96) / self.PRICE_CONVERSION_CONSTANT
4✔
216
        else:
217
            price = self.PRICE_CONVERSION_CONSTANT / (sqrt_price_x96 * sqrt_price_x96)
4✔
218

219
        return price * 10 ** (token_decimals - token_2_decimals)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc