• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

safe-global / safe-transaction-service / 13588281340

28 Feb 2025 12:37PM UTC coverage: 94.407% (+0.001%) from 94.406%
13588281340

Pull #2443

github

web-flow
Merge 412836604 into 3fa777aac
Pull Request #2443: Add flag to disable download collectibles

36 of 36 new or added lines in 4 files covered. (100.0%)

3 existing lines in 1 file now uncovered.

16355 of 17324 relevant lines covered (94.41%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.85
/safe_transaction_service/history/services/collectibles_service.py
1
import base64
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import operator
1✔
6
import random
1✔
7
from typing import Any, Dict, List, Optional, Sequence, Tuple
1✔
8
from urllib.parse import urljoin
1✔
9

10
from django.conf import settings
1✔
11
from django.core.cache import cache as django_cache
1✔
12

13
import gevent
1✔
14
import requests
1✔
15
from cache_memoize import cache_memoize
1✔
16
from cachetools import TTLCache, cachedmethod
1✔
17
from eth_typing import ChecksumAddress
1✔
18
from redis import Redis
1✔
19
from safe_eth.eth import EthereumClient, EthereumNetwork, get_auto_ethereum_client
1✔
20
from safe_eth.eth.clients import EnsClient
1✔
21

22
from safe_transaction_service.tokens.constants import (
1✔
23
    CRYPTO_KITTIES_CONTRACT_ADDRESSES,
24
    ENS_CONTRACTS_WITH_TLD,
25
)
26
from safe_transaction_service.tokens.models import Token
1✔
27
from safe_transaction_service.utils.redis import get_redis
1✔
28
from safe_transaction_service.utils.utils import chunks
1✔
29

30
from ..exceptions import NodeConnectionException
1✔
31
from ..models import ERC721Transfer
1✔
32

33
logger = logging.getLogger(__name__)
1✔
34

35

36
class CollectiblesServiceException(Exception):
1✔
37
    pass
38

39

40
class MetadataRetrievalException(CollectiblesServiceException):
1✔
41
    pass
42

43

44
class MetadataRetrievalExceptionTimeout(CollectiblesServiceException):
1✔
45
    pass
46

47

48
def ipfs_to_http(uri: Optional[str]) -> Optional[str]:
1✔
49
    if uri and uri.startswith("ipfs://"):
1✔
50
        uri = uri.replace("ipfs://ipfs/", "ipfs://")
1✔
51
        return urljoin(
1✔
52
            settings.IPFS_GATEWAY, uri.replace("ipfs://", "", 1)
53
        )  # Use ipfs gateway
54
    return uri
1✔
55

56

57
@dataclasses.dataclass
1✔
58
class Erc721InfoWithLogo:
1✔
59
    """
60
    ERC721 info from Blockchain
61
    """
62

63
    address: str
1✔
64
    name: str
1✔
65
    symbol: str
1✔
66
    logo_uri: str
1✔
67

68
    @classmethod
1✔
69
    def from_token(cls, token: Token):
1✔
70
        return cls(
1✔
71
            token.address,
72
            token.name,
73
            token.symbol,
74
            token.get_full_logo_uri(),
75
        )
76

77

78
@dataclasses.dataclass
1✔
79
class Collectible:
1✔
80
    """
81
    Collectible built from ERC721InfoWithLogo
82
    """
83

84
    token_name: str
1✔
85
    token_symbol: str
1✔
86
    logo_uri: str
1✔
87
    address: str
1✔
88
    id: int
1✔
89
    uri: str
1✔
90

91

92
@dataclasses.dataclass
1✔
93
class CollectibleWithMetadata(Collectible):
1✔
94
    """
95
    Collectible with metadata parsed if possible
96
    """
97

98
    metadata: Dict[str, Any]
1✔
99
    name: Optional[str] = dataclasses.field(init=False)
1✔
100
    description: Optional[str] = dataclasses.field(init=False)
1✔
101
    image_uri: Optional[str] = dataclasses.field(init=False)
1✔
102

103
    def get_name(self) -> Optional[str]:
1✔
104
        if self.metadata:
1✔
105
            for key in ("name",):
1✔
106
                if key in self.metadata:
1✔
107
                    return self.metadata[key]
1✔
108

109
    def get_description(self) -> Optional[str]:
1✔
110
        if self.metadata:
1✔
111
            for key in ("description",):
1✔
112
                if key in self.metadata:
1✔
113
                    return self.metadata[key]
1✔
114

115
    def get_metadata_image(self) -> Optional[str]:
1✔
116
        if not self.metadata:
1✔
117
            return None
1✔
118

119
        for key in ("image", "image_url", "image_uri", "imageUri", "imageUrl"):
1✔
120
            if key in self.metadata:
1✔
121
                return self.metadata[key]
1✔
122

123
        for key, value in self.metadata.items():
×
124
            if (
×
125
                key.lower().startswith("image")
126
                and isinstance(value, str)
127
                and value.startswith("http")
128
            ):
129
                return value
×
130

131
    def __post_init__(self):
1✔
132
        self.name = self.get_name()
1✔
133
        self.description = self.get_description()
1✔
134
        self.image_uri = ipfs_to_http(self.get_metadata_image())
1✔
135

136

137
class CollectiblesServiceProvider:
1✔
138
    def __new__(cls):
1✔
139
        if not hasattr(cls, "instance"):
1✔
140
            cls.instance = CollectiblesService(get_auto_ethereum_client(), get_redis())
1✔
141

142
        return cls.instance
1✔
143

144
    @classmethod
1✔
145
    def del_singleton(cls):
1✔
146
        if hasattr(cls, "instance"):
×
147
            del cls.instance
×
148

149

150
class CollectiblesService:
1✔
151
    METADATA_MAX_CONTENT_LENGTH = int(
1✔
152
        0.2 * 1024 * 1024
153
    )  # 0.2Mb is the maximum metadata size allowed
154
    COLLECTIBLE_EXPIRATION = int(
1✔
155
        60 * 60 * 24 * 2
156
    )  # Keep collectibles by 2 days in cache
157
    TOKEN_EXPIRATION = int(60 * 60)
1✔
158

159
    def __init__(self, ethereum_client: EthereumClient, redis: Redis):
1✔
160
        self.ethereum_client = ethereum_client
1✔
161
        self.ethereum_network = ethereum_client.get_network()
1✔
162
        self.redis = redis
1✔
163

164
        base_url = settings.ENS_SUBGRAPH_URL
1✔
165
        api_key = settings.ENS_SUBGRAPH_API_KEY
1✔
166
        subgraph_id = settings.ENS_SUBGRAPH_ID
1✔
167

168
        # If the ENS subgraph is configured, always use it
169
        if base_url and api_key and subgraph_id:
1✔
170
            config = EnsClient.SubgraphConfig(
×
171
                base_url=base_url,
172
                api_key=api_key,
173
                subgraph_id=subgraph_id,
174
            )
175
        # Else, provide fallback for Sepolia, Holesky or empty configuration.
176
        else:
177
            logger.warning(
1✔
178
                "Using fallback EnsClient configuration. This configuration is not suitable for production and it is "
179
                "recommended to setup a Subgraph API key. Mandatory for networks other than Sepolia or Holesky."
180
                "See https://docs.ens.domains/web/subgraph"
181
            )
182
            config = self.fallback_ens_client()
1✔
183

184
        self.ens_service: EnsClient = EnsClient(config=config)
1✔
185

186
        self.cache_token_info: TTLCache[ChecksumAddress, Erc721InfoWithLogo] = TTLCache(
1✔
187
            maxsize=4096, ttl=self.TOKEN_EXPIRATION
188
        )
189
        self.ens_image_url = settings.TOKENS_ENS_IMAGE_URL
1✔
190

191
    def fallback_ens_client(self) -> EnsClient.Config:
1✔
192
        if self.ethereum_network == EthereumNetwork.SEPOLIA:
1✔
193
            return EnsClient.Config(
×
194
                "https://api.studio.thegraph.com/query/49574/enssepolia/version/latest",
195
            )
196
        elif self.ethereum_network == EthereumNetwork.HOLESKY:
1✔
197
            return EnsClient.Config(
×
198
                "https://api.studio.thegraph.com/query/49574/ensholesky/version/latest",
199
            )
200
        else:
201
            logger.warning(
1✔
202
                "No fallback Ens Client configuration for network=%s available",
203
                self.ethereum_network,
204
            )
205
            return EnsClient.Config("")
1✔
206

207
    def get_metadata_cache_key(self, address: str, token_id: int):
1✔
208
        return f"metadata:{address}:{token_id}"
1✔
209

210
    def _decode_base64_uri(self, uri: str) -> Optional[Dict[str, Any]]:
1✔
211
        """
212
        Decodes data:application/json;base64 uris
213

214
        :param uri: Base64 uri
215
        :return: `Dict` if `base 64 json` is valid, `None` otherwise
216
        """
217
        pattern = "data:application/json;base64,"
1✔
218

219
        if uri and uri.startswith(pattern):
1✔
220
            try:
1✔
221
                return json.loads(base64.b64decode(uri[len(pattern) :]))
1✔
222
            except ValueError:
1✔
223
                # b64decode can raise ValueError and binascii.Error (inherits from ValueError)
224
                # json.loads can raise a JSONDecodeError (inherits from ValueError)
225
                return None
1✔
226

227
    def _retrieve_metadata_from_uri(self, uri: str) -> Any:
1✔
228
        """
229
        Get metadata from URI. IPFS, HTTP/S, and BASE64/JSON are supported
230

231
        :param uri: Metadata URI, like http://example.org/token/3 or ipfs://<keccak256>
232
        :return: Metadata as a decoded json
233
        """
234
        if not uri:
1✔
235
            raise MetadataRetrievalException("Empty URI")
1✔
236

237
        # Check base64
238
        if base_64_decoded := self._decode_base64_uri(uri):
1✔
239
            return base_64_decoded
1✔
240

241
        uri = ipfs_to_http(uri)
1✔
242

243
        if not uri.startswith("http"):
1✔
244
            raise MetadataRetrievalException(uri)
1✔
245

246
        try:
1✔
247
            logger.debug("Getting metadata for uri=%s", uri)
1✔
248
            with requests.get(uri, timeout=10, stream=True) as response:
1✔
249
                if not response.ok:
1✔
250
                    logger.debug("Cannot get metadata for uri=%s", uri)
1✔
251
                    raise MetadataRetrievalException(uri)
1✔
252

253
                content_length = response.headers.get("content-length", 0)
1✔
254
                content_type = response.headers.get("content-type", "")
1✔
255
                if int(content_length) > self.METADATA_MAX_CONTENT_LENGTH:
1✔
256
                    raise MetadataRetrievalException(
1✔
257
                        f"Content-length={content_length} for uri={uri} is too big"
258
                    )
259

260
                if "application/json" not in content_type:
1✔
261
                    raise MetadataRetrievalException(
1✔
262
                        f"Content-type={content_type} for uri={uri} is not valid, "
263
                        f'expected "application/json"'
264
                    )
265

266
                logger.debug("Got metadata for uri=%s", uri)
1✔
267

268
                # Some requests don't provide `Content-Length` on the headers
269
                if len(response.content) > self.METADATA_MAX_CONTENT_LENGTH:
1✔
270
                    raise MetadataRetrievalException(
×
271
                        f"Retrieved content for uri={uri} is too big"
272
                    )
273

274
                return response.json()
1✔
275
        except (IOError, ValueError) as e:
×
276
            raise MetadataRetrievalExceptionTimeout(uri) from e
×
277

278
    def build_collectible(
1✔
279
        self,
280
        token_info: Optional[Erc721InfoWithLogo],
281
        token_address: ChecksumAddress,
282
        token_id: int,
283
        token_metadata_uri: Optional[str],
284
    ) -> Collectible:
285
        """
286
        Build a collectible from the input parameters
287
        :param token_info: information of collectible like name, symbol...
288
        :param token_address:
289
        :param token_id:
290
        :param token_metadata_uri:
291
        """
292
        if not token_metadata_uri:
1✔
293
            if token_address in CRYPTO_KITTIES_CONTRACT_ADDRESSES:
1✔
294
                token_metadata_uri = f"https://api.cryptokitties.co/kitties/{token_id}"
×
295
            else:
296
                logger.info(
1✔
297
                    "Not available token_uri to retrieve metadata for ERC721 token=%s with token-id=%d",
298
                    token_address,
299
                    token_id,
300
                )
301
        name = token_info.name if token_info else ""
1✔
302
        symbol = token_info.symbol if token_info else ""
1✔
303
        logo_uri = token_info.logo_uri if token_info else ""
1✔
304
        return Collectible(
1✔
305
            name, symbol, logo_uri, token_address, token_id, token_metadata_uri
306
        )
307

308
    def get_metadata(self, collectible: Collectible | CollectibleWithMetadata) -> Any:
1✔
309
        """
310
        Return metadata for a collectible
311
        :param collectible
312
        """
313
        if not settings.COLLECTIBLES_ENABLE_DOWNLOAD_METADATA:
1✔
314
            logger.warning("Downloading collectibles metadata is disabled")
1✔
315
            return None
1✔
316
        if tld := ENS_CONTRACTS_WITH_TLD.get(
1✔
317
            collectible.address
318
        ):  # Special case for ENS
319
            label_name = self.ens_service.query_by_domain_hash(collectible.id)
×
320
            return {
×
321
                "name": f"{label_name}.{tld}" if label_name else f".{tld}",
322
                "description": ("" if label_name else "Unknown ")
323
                + f".{tld} ENS Domain",
324
                "image": self.ens_image_url,
325
            }
326

327
        return self._retrieve_metadata_from_uri(collectible.uri)
1✔
328

329
    def get_collectibles(
1✔
330
        self,
331
        safe_address: ChecksumAddress,
332
        only_trusted: bool = False,
333
        exclude_spam: bool = False,
334
        limit: Optional[int] = None,
335
        offset: int = 0,
336
    ) -> Tuple[List[Collectible], int]:
337
        """
338
        :param safe_address:
339
        :param only_trusted: If True, return balance only for trusted tokens
340
        :param exclude_spam: If True, exclude spam tokens
341
        :param limit: page size
342
        :param offset: page position
343
        :return: Collectibles (using the owner, addresses and the token_ids) and count (total of collectibles)
344
        """
345

346
        # Cache based on the number of erc721 events
347
        number_erc721_events = ERC721Transfer.objects.to_or_from(safe_address).count()
1✔
348

349
        if number_erc721_events == 0:
1✔
350
            # No need for further DB/Cache calls
351
            return [], 0
1✔
352

353
        cache_key = f"collectibles:{safe_address}:{only_trusted}:{exclude_spam}:{limit}{offset}:{number_erc721_events}"
1✔
354
        cache_key_count = (
1✔
355
            f"collectibles_count:{safe_address}:{only_trusted}:{exclude_spam}"
356
        )
357
        if collectibles := django_cache.get(cache_key):
1✔
358
            count = django_cache.get(cache_key_count)
×
359
            return collectibles, count
×
360
        else:
361
            collectibles, count = self._get_collectibles(
1✔
362
                safe_address,
363
                only_trusted,
364
                exclude_spam,
365
                limit=limit,
366
                offset=offset,
367
            )
368
            django_cache.set(cache_key, collectibles, 60 * 10)  # 10 minutes cache
1✔
369
            django_cache.set(cache_key_count, count, 60 * 10)  # 10 minutes cache
1✔
370
            return collectibles, count
1✔
371

372
    def _get_collectibles(
1✔
373
        self,
374
        safe_address: ChecksumAddress,
375
        only_trusted: bool = False,
376
        exclude_spam: bool = False,
377
        limit: Optional[int] = None,
378
        offset: int = 0,
379
    ) -> Tuple[List[Collectible], int]:
380
        """
381
        :param safe_address:
382
        :param only_trusted: If True, return balance only for trusted tokens
383
        :param exclude_spam: If True, exclude spam tokens
384
        :param limit: page size
385
        :param offset: page position
386
        :return: Collectibles (using the owner, addresses and the token_ids) and count (total of collectibles)
387
        """
388
        addresses_with_token_ids = ERC721Transfer.objects.erc721_owned_by(
1✔
389
            safe_address, only_trusted=only_trusted, exclude_spam=exclude_spam
390
        )
391
        if not addresses_with_token_ids:
1✔
392
            return [], 0
×
393

394
        count = len(addresses_with_token_ids)
1✔
395
        # TODO Paginate on DB
396
        if limit is not None:
1✔
397
            addresses_with_token_ids = addresses_with_token_ids[offset : offset + limit]
1✔
398

399
        for address, _ in addresses_with_token_ids:
1✔
400
            # Store tokens in database if not present
401
            self.get_token_info(address)  # This is cached
1✔
402

403
        logger.debug("Getting token_uris for %s", addresses_with_token_ids)
1✔
404
        # Chunk token uris to prevent stressing the node
405
        token_uris = []
1✔
406

407
        for addresses_with_token_ids_chunk in chunks(addresses_with_token_ids, 25):
1✔
408
            token_uris.extend(self.get_token_uris(addresses_with_token_ids_chunk))
1✔
409
        logger.debug("Got token_uris for %s", addresses_with_token_ids)
1✔
410
        collectibles = []
1✔
411
        for (token_address, token_id), token_uri in zip(
1✔
412
            addresses_with_token_ids, token_uris
413
        ):
414
            token_info = self.get_token_info(token_address)
1✔
415
            collectible = self.build_collectible(
1✔
416
                token_info, token_address, token_id, token_uri
417
            )
418
            collectibles.append(collectible)
1✔
419

420
        return collectibles, count
1✔
421

422
    def _get_collectibles_with_metadata(
1✔
423
        self,
424
        safe_address: ChecksumAddress,
425
        only_trusted: bool = False,
426
        exclude_spam: bool = False,
427
        limit: Optional[int] = None,
428
        offset: int = 0,
429
    ) -> Tuple[List[CollectibleWithMetadata], int]:
430
        """
431
        Get collectibles using the owner, addresses and the token_ids
432

433
        :param safe_address:
434
        :param only_trusted: If True, return balance only for trusted tokens
435
        :param exclude_spam: If True, exclude spam tokens
436
        :param limit: page size
437
        :param offset: page position
438
        :return: collectibles and count
439
        """
440

441
        # Async retry for getting metadata if fetching fails
442
        from ..tasks import retry_get_metadata_task
1✔
443

444
        collectibles_with_metadata: List[CollectibleWithMetadata] = []
1✔
445
        collectibles, count = self.get_collectibles(
1✔
446
            safe_address,
447
            only_trusted=only_trusted,
448
            exclude_spam=exclude_spam,
449
            limit=limit,
450
            offset=offset,
451
        )
452
        metadata_cache_keys = [
1✔
453
            self.get_metadata_cache_key(collectible.address, collectible.id)
454
            for collectible in collectibles
455
        ]
456
        cached_results = self.redis.mget(metadata_cache_keys)
1✔
457

458
        collectibles_not_cached = []
1✔
459
        jobs = []
1✔
460
        for cached, collectible in zip(cached_results, collectibles):
1✔
461
            if cached:
1✔
462
                collectible_cache = json.loads(cached)
1✔
463
                collectibles_with_metadata.append(
1✔
464
                    CollectibleWithMetadata(
465
                        collectible_cache["token_name"],
466
                        collectible_cache["token_symbol"],
467
                        collectible_cache["logo_uri"],
468
                        collectible_cache["address"],
469
                        collectible_cache["id"],
470
                        collectible_cache["uri"],
471
                        collectible_cache["metadata"],
472
                    )
473
                )
474
            else:
475
                collectibles_not_cached.append(collectible)
1✔
476
                jobs.append(gevent.spawn(self.get_metadata, collectible))
1✔
477
                collectibles_with_metadata.append(None)  # Keeps the order
1✔
478

479
        _ = gevent.joinall(jobs)
1✔
480
        collectibles_with_metadata_not_cached = []
1✔
481
        redis_pipe = self.redis.pipeline()
1✔
482
        for collectible, job in zip(collectibles_not_cached, jobs):
1✔
483
            try:
1✔
484
                metadata = job.get()
1✔
485
                if not isinstance(metadata, dict):
1✔
486
                    metadata = {}
1✔
487
                    logger.warning(
1✔
488
                        "A dictionary metadata was expected on token-uri=%s for token-address=%s",
489
                        collectible.uri,
490
                        collectible.address,
491
                    )
UNCOV
492
            except MetadataRetrievalException:
×
UNCOV
493
                metadata = {}
×
UNCOV
494
                logger.warning(
×
495
                    "Cannot retrieve metadata on token-uri=%s for token-address=%s",
496
                    collectible.uri,
497
                    collectible.address,
498
                )
499
            except MetadataRetrievalExceptionTimeout:
×
500
                metadata = {}
×
501
                logger.warning(
×
502
                    "Timeout retrieving metadata on token-uri=%s for token-address=%s, retrying asyncronous ",
503
                    collectible.uri,
504
                    collectible.address,
505
                )
506
                retry_get_metadata_task.apply_async(
×
507
                    (collectible.address, collectible.id),
508
                    countdown=random.randint(0, 60),  # Don't retry all at once
509
                )
510

511
            collectible_with_metadata = CollectibleWithMetadata(
1✔
512
                collectible.token_name,
513
                collectible.token_symbol,
514
                collectible.logo_uri,
515
                collectible.address,
516
                collectible.id,
517
                collectible.uri,
518
                metadata,
519
            )
520
            collectibles_with_metadata_not_cached.append(collectible_with_metadata)
1✔
521
            redis_pipe.set(
1✔
522
                self.get_metadata_cache_key(collectible.address, collectible.id),
523
                json.dumps(dataclasses.asdict(collectible_with_metadata)),
524
                self.COLLECTIBLE_EXPIRATION,
525
            )
526
        redis_pipe.execute()
1✔
527

528
        # Creates a collectibles metadata keeping the initial order
529
        for collectible_metadata_cached_index in range(len(collectibles_with_metadata)):
1✔
530
            if collectibles_with_metadata[collectible_metadata_cached_index] is None:
1✔
531
                collectibles_with_metadata[collectible_metadata_cached_index] = (
1✔
532
                    collectibles_with_metadata_not_cached.pop(0)
533
                )
534

535
        return collectibles_with_metadata, count
1✔
536

537
    def get_collectibles_with_metadata_paginated(
1✔
538
        self,
539
        safe_address: ChecksumAddress,
540
        only_trusted: bool = False,
541
        exclude_spam: bool = False,
542
        limit: int = 10,
543
        offset: int = 0,
544
    ) -> Tuple[List[CollectibleWithMetadata], int]:
545
        """
546
        Get collectibles paginated
547

548
        :param safe_address:
549
        :param only_trusted: If True, return balance only for trusted tokens
550
        :param exclude_spam: If True, exclude spam tokens
551
        :param limit: page size
552
        :param offset: page position
553
        :return: collectibles and count
554
        """
555
        return self._get_collectibles_with_metadata(
1✔
556
            safe_address, only_trusted, exclude_spam, limit=limit, offset=offset
557
        )
558

559
    @cachedmethod(cache=operator.attrgetter("cache_token_info"))
1✔
560
    @cache_memoize(TOKEN_EXPIRATION, prefix="collectibles-get_token_info")  # 1 hour
1✔
561
    def get_token_info(
1✔
562
        self, token_address: ChecksumAddress
563
    ) -> Optional[Erc721InfoWithLogo]:
564
        """
565
        :param token_address:
566
        :return: Erc721 name and symbol. If it cannot be found, `name=''` and `symbol=''`
567
        """
568
        try:
1✔
569
            token = Token.objects.get(address=token_address)
1✔
570
            return Erc721InfoWithLogo.from_token(token)
1✔
571
        except Token.DoesNotExist:
1✔
572
            if token := Token.objects.create_from_blockchain(token_address):
1✔
573
                return Erc721InfoWithLogo.from_token(token)
1✔
574

575
    def get_token_uris(
1✔
576
        self, addresses_with_token_ids: Sequence[Tuple[ChecksumAddress, int]]
577
    ) -> List[Optional[str]]:
578
        """
579
        Cache token_uris, as they shouldn't change
580

581
        :param addresses_with_token_ids:
582
        :return: List of token_uris in the same other that `addresses_with_token_ids` were provided
583
        """
584

585
        def get_redis_key(address_with_token_id: Tuple[ChecksumAddress, int]) -> str:
1✔
586
            token_address, token_id = address_with_token_id
1✔
587
            return f"token-uri:{token_address}:{token_id}"
1✔
588

589
        # Try finding missing token uris in redis
590
        redis_token_uris = self.redis.mget(
1✔
591
            get_redis_key(address_with_token_id)
592
            for address_with_token_id in addresses_with_token_ids
593
        )
594
        # Redis does not allow `None`, so empty string is used for uris searched but not found
595
        found_uris: Dict[Tuple[ChecksumAddress, int], Optional[str]] = {}
1✔
596
        not_found_uris: List[Tuple[ChecksumAddress, int]] = []
1✔
597

598
        for address_with_token_id, token_uri in zip(
1✔
599
            addresses_with_token_ids, redis_token_uris
600
        ):
601
            if token_uri is None:
1✔
602
                not_found_uris.append(address_with_token_id)
1✔
603
            else:
604
                found_uris[address_with_token_id] = (
1✔
605
                    token_uri.decode() if token_uri else None
606
                )
607

608
        try:
1✔
609
            # Find missing token uris in blockchain
610
            logger.debug(
1✔
611
                "Getting token uris from blockchain for %d addresses with tokenIds",
612
                len(not_found_uris),
613
            )
614
            blockchain_token_uris = {
1✔
615
                address_with_token_id: token_uri if token_uri else None
616
                for address_with_token_id, token_uri in zip(
617
                    not_found_uris,
618
                    self.ethereum_client.erc721.get_token_uris(not_found_uris),
619
                )
620
            }
621
            logger.debug("Got token uris from blockchain")
1✔
622
        except (IOError, ValueError):
1✔
623
            logger.warning(
1✔
624
                "Problem when getting token uris from blockchain, trying individually",
625
                exc_info=True,
626
            )
627
            blockchain_token_uris = {}
1✔
628
            for not_found_uri in not_found_uris:
1✔
629
                try:
1✔
630
                    token_uri = self.ethereum_client.erc721.get_token_uris(
1✔
631
                        [not_found_uri]
632
                    )[0]
633
                    blockchain_token_uris[not_found_uri] = (
1✔
634
                        token_uri if token_uri else None
635
                    )
636
                except ValueError:
1✔
637
                    blockchain_token_uris[not_found_uri] = None
1✔
638
                    logger.warning(
1✔
639
                        "ValueError when getting token uri from blockchain for token and tokenId %s",
640
                        not_found_uri,
641
                        exc_info=True,
642
                    )
643
                except IOError as exc:
×
644
                    raise NodeConnectionException from exc
×
645

646
        if blockchain_token_uris:
1✔
647
            pipe = self.redis.pipeline()
1✔
648
            redis_map_to_store = {
1✔
649
                get_redis_key(address_with_token_id): (
650
                    token_uri if token_uri is not None else ""
651
                )
652
                for address_with_token_id, token_uri in blockchain_token_uris.items()
653
            }
654
            pipe.mset(redis_map_to_store)
1✔
655
            for key in redis_map_to_store.keys():
1✔
656
                pipe.expire(key, self.COLLECTIBLE_EXPIRATION)
1✔
657
            pipe.execute()
1✔
658
            found_uris.update(blockchain_token_uris)
1✔
659

660
        return [
1✔
661
            found_uris[address_with_token_id]
662
            for address_with_token_id in addresses_with_token_ids
663
        ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc