• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snarfed / bridgy-fed / 4c1ad163-32f0-404b-944a-68bd86aa19ad

18 Jan 2026 05:17PM UTC coverage: 93.26% (+0.2%) from 93.028%
4c1ad163-32f0-404b-944a-68bd86aa19ad

push

circleci

snarfed
circle: use dummy encrypted_property_key file for tests

6655 of 7136 relevant lines covered (93.26%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.83
/memcache.py
1
"""Utilities for caching data in memcache.
2

3
TODO: move most or all of this to webutil?
4
"""
5
from datetime import datetime, timedelta, timezone
1✔
6
import functools
1✔
7
import logging
1✔
8
import os
1✔
9

10
import config
1✔
11
from google.cloud.ndb._cache import global_cache_key
1✔
12
from google.cloud.ndb.global_cache import _InProcessGlobalCache, MemcacheCache
1✔
13
from oauth_dropins.webutil import appengine_info, util
1✔
14
from pymemcache.client.base import PooledClient
1✔
15
from pymemcache.serde import PickleSerde
1✔
16
from pymemcache.test.utils import MockMemcacheClient
1✔
17

18
from domains import PRIMARY_DOMAIN
1✔
19

20
logger = logging.getLogger(__name__)
1✔
21

22
# https://github.com/memcached/memcached/wiki/Commands#standard-protocol
23
KEY_MAX_LEN = 250
1✔
24

25
MEMOIZE_VERSION = 2
1✔
26

27
# per-user rates for running tasks. rate limits and spreads out tasks for bursty
28
# users. https://github.com/snarfed/bridgy-fed/issues/1788
29
PER_USER_TASK_RATES = {
1✔
30
    'receive': timedelta(seconds=5),
31
}
32

33
# https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html#pymemcache.client.base.Client.__init__
34
kwargs = {
1✔
35
    'server': os.environ.get('MEMCACHE_HOST', 'localhost'),
36
    'allow_unicode_keys': True,
37
    'default_noreply': False,
38
    'timeout': 10,   # seconds
39
    'connect_timeout': 10,   # seconds
40
}
41

42
if appengine_info.DEBUG or appengine_info.LOCAL_SERVER:
1✔
43
    logger.info(f'Using in memory mock memcache: {kwargs}')
1✔
44
    memcache = PooledClient(max_pool_size=1, **kwargs)
1✔
45
    pickle_memcache = PooledClient(max_pool_size=1, serde=PickleSerde(), **kwargs)
1✔
46
    memcache.client_class = pickle_memcache.client_class = MockMemcacheClient
1✔
47
    global_cache = _InProcessGlobalCache()
1✔
48
else:
49
    logger.info(f'Using production Memorystore memcache: {kwargs}')
×
50
    memcache = PooledClient(**kwargs)
×
51
    pickle_memcache = PooledClient(serde=PickleSerde(), **kwargs)
×
52
    global_cache = MemcacheCache(memcache, strict_read=False, strict_write=False)
×
53

54

55
def key(key):
1✔
56
    """Preprocesses a memcache key. Right now just truncates it to 250 chars.
57

58
    https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html
59
    https://github.com/memcached/memcached/wiki/Commands#standard-protocol
60

61
    TODO: truncate to 250 *UTF-8* chars, to handle Unicode chars in URLs. Related:
62
    pymemcache Client's allow_unicode_keys constructor kwarg.
63

64
    Args:
65
      key (str)
66

67
    Returns:
68
      bytes:
69
    """
70
    assert isinstance(key, str), repr(key)
1✔
71
    return key.replace(' ', '%20').encode()[:KEY_MAX_LEN]
1✔
72

73

74
def memoize_key(fn, *args, _version=MEMOIZE_VERSION, **kwargs):
1✔
75
    return key(f'{fn.__qualname__}-{_version}-{repr(args)}-{repr(kwargs)}')
1✔
76

77

78
NONE = ()  # empty tuple
1✔
79

80
def memoize(expire=None, key=None, write=True, version=MEMOIZE_VERSION):
1✔
81
    """Memoize function decorator that stores the cached value in memcache.
82

83
    Args:
84
      expire (datetime.timedelta): optional, expiration
85
      key (callable): function that takes the function's ``(*args, **kwargs)``
86
        and returns the cache key to use. If it returns None, memcache won't be
87
        used.
88
      write (bool or callable): whether to write to memcache. If this is a
89
        callable, it will be called with the function's ``(*args, **kwargs)``
90
        and should return True or False.
91
      version (int): overrides our default version number in the memcache key.
92
        Bumping this version can have the same effect as clearing the cache for
93
        just the affected function.
94
    """
95
    expire = int(expire.total_seconds()) if expire else 0
1✔
96

97
    def decorator(fn):
1✔
98
        @functools.wraps(fn)
1✔
99
        def wrapped(*args, **kwargs):
1✔
100
            cache_key = None
1✔
101
            if key:
1✔
102
                key_val = key(*args, **kwargs)
1✔
103
                if key_val:
1✔
104
                    cache_key = memoize_key(fn, key_val, _version=version)
1✔
105
            else:
106
                cache_key = memoize_key(fn, *args, _version=version, **kwargs)
1✔
107

108
            if pickle_memcache and cache_key:
1✔
109
                val = pickle_memcache.get(cache_key)
1✔
110
                if val is not None:
1✔
111
                    logger.debug(f'cache hit {cache_key} {repr(val)[:100]}')
1✔
112
                    return None if val == NONE else val
1✔
113
                else:
114
                    logger.debug(f'cache miss {cache_key}')
1✔
115

116
            val = fn(*args, **kwargs)
1✔
117

118
            if pickle_memcache and cache_key:
1✔
119
                write_cache = (write if isinstance(write, bool)
1✔
120
                               else write(*args, **kwargs))
121
                if write_cache:
1✔
122
                    logger.debug(f'cache set {cache_key} {repr(val)[:100]}')
1✔
123
                    pickle_memcache.set(cache_key, NONE if val is None else val,
1✔
124
                                        expire=expire)
125

126
            return val
1✔
127

128
        return wrapped
1✔
129

130
    return decorator
1✔
131

132

133
def evict(entity_key):
1✔
134
    """Evict a datastore entity from memcache.
135

136
    For :class:`models.User` and :class:`models.Object` entities, also clears their
137
    copies from the :func:`models.get_original_user_key` and
138
    :func:`models.get_original_object_key` memoize caches.
139

140
    Args:
141
      entity_key (google.cloud.ndb.Key)
142
    """
143
    if entity := entity_key.get():
1✔
144
        for val in getattr(entity, 'copies', []):
1✔
145
            entity.clear_get_original_cache(val.uri)
1✔
146

147
    global_cache.delete([global_cache_key(entity_key._key)])
1✔
148

149

150
def evict_raw(key):
1✔
151
    """Evict a key from memcache.
152

153
    Args:
154
      key (str)
155

156
    Returns:
157
      bool: whether the key existed and was deleted
158
    """
159
    return memcache.delete(key)
1✔
160

161

162
def remote_evict(entity_key):
1✔
163
    """Send a request to production Bridgy Fed to evict an entity from memcache.
164

165
    Args:
166
      entity_key (google.cloud.ndb.Key)
167

168
    Returns:
169
      requests.Response:
170
    """
171
    return util.requests_post(f'https://{PRIMARY_DOMAIN}/admin/memcache/evict',
1✔
172
                              headers={'Authorization': config.SECRET_KEY},
173
                              data={'key': entity_key.urlsafe()})
174

175

176
def task_eta(queue, user_id):
1✔
177
    """Get the ETA to use for a given user's task in a given queue.
178

179
    Task rate limit delays are per user, stored in memcache with a key based on
180
    ``queue`` and ``user_id`` and an integer value of POSIX timestamp (UTC) in
181
    seconds.
182

183
    Only generates ETAs for task queues in :attr:`PER_USER_TASK_RATES`. Calls for
184
    other queues always return ``None``.
185

186
    Background: https://github.com/snarfed/bridgy-fed/issues/1788
187

188
    Args:
189
      queue (str)
190
      user_id (str)
191

192
    Returns:
193
      datetime.datetime: the ETA for this task, or ``None`` if the ETA is now
194
    """
195
    if not (delay := PER_USER_TASK_RATES.get(queue)):
1✔
196
        return None
1✔
197

198
    cache_key = key(f'task-delay-{queue}-{user_id}')
1✔
199

200
    now = util.now()
1✔
201
    if eta_s := memcache.incr(cache_key, int(delay.total_seconds())):
1✔
202
        eta = datetime.fromtimestamp(eta_s, timezone.utc)
1✔
203
        if eta > now:
1✔
204
            return eta
1✔
205

206
    # incr failed (key doesn't exist) or timestamp is in the past, set it to now
207
    #
208
    # note that this isn't synchronized; multiple callers may race and both get now
209
    # as the returned ETA. that's ok, we don't depend on this for correctness in any
210
    # way, just best-effort rate limiting.
211
    memcache.set(cache_key, int(now.timestamp()))
1✔
212
    return now
1✔
213

214

215
###########################################
216

217
# https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945
218
#
219
# fixes "RuntimeError: Key has already been set in this batch" errors due to
220
# tasklets in pages.serve_feed
221
from logging import error as log_error
1✔
222
from sys import modules
1✔
223

224
from google.cloud.datastore_v1.types.entity import Key
1✔
225
from google.cloud.ndb._cache import (
1✔
226
    _GlobalCacheSetBatch,
227
    global_compare_and_swap,
228
    global_set_if_not_exists,
229
    global_watch,
230
)
231
from google.cloud.ndb.tasklets import Future, Return, tasklet
1✔
232

233
GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
1✔
234
LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
1✔
235
LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME
1✔
236

237

238
@tasklet
1✔
239
def custom_global_lock_for_read(key: str, value: str):
1✔
240
    if value is not None:
1✔
241
        yield global_watch(key, value)
1✔
242
        lock_acquired = yield global_compare_and_swap(
1✔
243
            key, LOCKED_FOR_READ, expires=LOCK_TIME
244
        )
245
    else:
246
        lock_acquired = yield global_set_if_not_exists(
1✔
247
            key, LOCKED_FOR_READ, expires=LOCK_TIME
248
        )
249

250
    if lock_acquired:
1✔
251
        raise Return(LOCKED_FOR_READ)
1✔
252

253
modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc