• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snarfed / bridgy-fed / 4008196f-fd56-4e0b-95aa-774a12f506c6

08 Mar 2026 11:09PM UTC coverage: 0.095% (-93.9%) from 94.002%
4008196f-fd56-4e0b-95aa-774a12f506c6

push

circleci

snarfed
circle bug fix: use existing oauth-dropins static symlinks, clone for router deploy too

7 of 7343 relevant lines covered (0.1%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/memcache.py
1
"""Utilities for caching data in memcache.
2

3
TODO: move most or all of this to webutil?
4
"""
5
from datetime import datetime, timedelta, timezone
×
6
import functools
×
7
import logging
×
8
import os
×
9
import re
×
10
import string
×
11

12
import config
×
13
from google.cloud.ndb._cache import global_cache_key
×
14
from google.cloud.ndb.global_cache import _InProcessGlobalCache, MemcacheCache
×
15
from oauth_dropins.webutil import appengine_info, util
×
16
from pymemcache.client.base import PooledClient
×
17
from pymemcache.serde import PickleSerde
×
18
from pymemcache.test.utils import MockMemcacheClient
×
19

20
from domains import PRIMARY_DOMAIN
×
21

22
logger = logging.getLogger(__name__)
×
23

24
# https://github.com/memcached/memcached/wiki/Commands#standard-protocol
25
KEY_MAX_LEN = 250
×
26

27
MEMOIZE_VERSION = 2
×
28

29
# per-user rates for running tasks. rate limits and spreads out tasks for bursty
30
# users. values map protocol label to delay. None means all protocols.
31
# https://github.com/snarfed/bridgy-fed/issues/1788
32
PER_USER_TASK_RATES = {
×
33
    'receive': {
34
        None: timedelta(seconds=5),  # all protocols
35
    },
36
    'send': {
37
        'atproto': timedelta(seconds=10),
38
    },
39
}
40

41
WHITESPACE_RE = re.compile(f'[{string.whitespace}]')
×
42

43
# https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html#pymemcache.client.base.Client.__init__
44
kwargs = {
×
45
    'server': os.environ.get('MEMCACHE_HOST', 'localhost'),
46
    'allow_unicode_keys': True,
47
    'default_noreply': False,
48
    'timeout': 10,   # seconds
49
    'connect_timeout': 10,   # seconds
50
}
51

52
if appengine_info.DEBUG or appengine_info.LOCAL_SERVER:
×
53
    logger.info(f'Using in memory mock memcache: {kwargs}')
×
54
    memcache = PooledClient(max_pool_size=1, **kwargs)
×
55
    pickle_memcache = PooledClient(max_pool_size=1, serde=PickleSerde(), **kwargs)
×
56
    memcache.client_class = pickle_memcache.client_class = MockMemcacheClient
×
57
    global_cache = _InProcessGlobalCache()
×
58
else:
59
    logger.info(f'Using production Memorystore memcache: {kwargs}')
×
60
    memcache = PooledClient(**kwargs)
×
61
    pickle_memcache = PooledClient(serde=PickleSerde(), **kwargs)
×
62
    global_cache = MemcacheCache(memcache, strict_read=False, strict_write=False)
×
63

64

65
def key(key):
×
66
    """Preprocesses a memcache key. Right now just truncates it to 250 chars.
67

68
    https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html
69
    https://github.com/memcached/memcached/wiki/Commands#standard-protocol
70

71
    TODO: truncate to 250 *UTF-8* chars, to handle Unicode chars in URLs. Related:
72
    pymemcache Client's allow_unicode_keys constructor kwarg.
73

74
    Args:
75
      key (str)
76

77
    Returns:
78
      bytes:
79
    """
80
    assert isinstance(key, str), repr(key)
×
81
    return WHITESPACE_RE.sub('_', key).encode()[:KEY_MAX_LEN]
×
82

83

84
def memoize_key(fn, *args, _version=MEMOIZE_VERSION, **kwargs):
×
85
    return key(f'{fn.__qualname__}-{_version}-{repr(args)}-{repr(kwargs)}')
×
86

87

88
NONE = ()  # empty tuple
×
89

90
def memoize(expire=None, key=None, write=True, version=MEMOIZE_VERSION):
×
91
    """Memoize function decorator that stores the cached value in memcache.
92

93
    Args:
94
      expire (datetime.timedelta): optional, expiration
95
      key (callable): function that takes the function's ``(*args, **kwargs)``
96
        and returns the cache key to use. If it returns None, memcache won't be
97
        used.
98
      write (bool or callable): whether to write to memcache. If this is a
99
        callable, it will be called with the function's ``(*args, **kwargs)``
100
        and should return True or False.
101
      version (int): overrides our default version number in the memcache key.
102
        Bumping this version can have the same effect as clearing the cache for
103
        just the affected function.
104
    """
105
    expire = int(expire.total_seconds()) if expire else 0
×
106

107
    def decorator(fn):
×
108
        @functools.wraps(fn)
×
109
        def wrapped(*args, **kwargs):
×
110
            cache_key = None
×
111
            if key:
×
112
                key_val = key(*args, **kwargs)
×
113
                if key_val:
×
114
                    cache_key = memoize_key(fn, key_val, _version=version)
×
115
            else:
116
                cache_key = memoize_key(fn, *args, _version=version, **kwargs)
×
117

118
            if pickle_memcache and cache_key:
×
119
                val = pickle_memcache.get(cache_key)
×
120
                if val is not None:
×
121
                    logger.debug(f'cache hit {cache_key} {repr(val)[:100]}')
×
122
                    return None if val == NONE else val
×
123
                else:
124
                    logger.debug(f'cache miss {cache_key}')
×
125

126
            val = fn(*args, **kwargs)
×
127

128
            if pickle_memcache and cache_key:
×
129
                write_cache = (write if isinstance(write, bool)
×
130
                               else write(*args, **kwargs))
131
                if write_cache:
×
132
                    logger.debug(f'cache set {cache_key} {repr(val)[:100]}')
×
133
                    pickle_memcache.set(cache_key, NONE if val is None else val,
×
134
                                        expire=expire)
135

136
            return val
×
137

138
        return wrapped
×
139

140
    return decorator
×
141

142

143
def evict(entity_key):
×
144
    """Evict a datastore entity from memcache.
145

146
    For :class:`models.User` and :class:`models.Object` entities, also clears their
147
    copies from the :func:`models.get_original_user_key` and
148
    :func:`models.get_original_object_key` memoize caches.
149

150
    Args:
151
      entity_key (google.cloud.ndb.Key)
152
    """
153
    if entity := entity_key.get():
×
154
        for val in getattr(entity, 'copies', []):
×
155
            entity.clear_get_original_cache(val.uri)
×
156

157
    global_cache.delete([global_cache_key(entity_key._key)])
×
158

159

160
def evict_raw(key):
×
161
    """Evict a key from memcache.
162

163
    Args:
164
      key (str)
165

166
    Returns:
167
      bool: whether the key existed and was deleted
168
    """
169
    return memcache.delete(key)
×
170

171

172
def remote_evict(entity_key):
×
173
    """Send a request to production Bridgy Fed to evict an entity from memcache.
174

175
    Args:
176
      entity_key (google.cloud.ndb.Key)
177

178
    Returns:
179
      requests.Response:
180
    """
181
    return util.requests_post(f'https://{PRIMARY_DOMAIN}/admin/memcache/evict',
×
182
                              headers={'Authorization': config.SECRET_KEY},
183
                              data={'key': entity_key.urlsafe()})
184

185

186
def task_eta(queue, user_id, protocol=None):
×
187
    """Get the ETA to use for a given user's task in a given queue.
188

189
    Task rate limit delays are per user, stored in memcache with a key based on
190
    ``queue`` and ``user_id`` and an integer value of POSIX timestamp (UTC) in
191
    seconds.
192

193
    Only generates ETAs for task queues in :attr:`PER_USER_TASK_RATES`. Calls for
194
    other queues always return ``None``.
195

196
    Background: https://github.com/snarfed/bridgy-fed/issues/1788
197

198
    Args:
199
      queue (str)
200
      user_id (str)
201
      protocol (str): optional protocol label to look up protocol-specific delay
202

203
    Returns:
204
      datetime.datetime: the ETA for this task, or ``None`` if the ETA is now
205
    """
206
    if not (delays := PER_USER_TASK_RATES.get(queue)):
×
207
        return None
×
208

209
    # look up delay for protocol, fall back to None (all protocols)
210
    if not (delay := delays.get(protocol) or delays.get(None)):
×
211
        return None
×
212

213
    cache_key = key(f'task-delay-{queue}-{user_id}')
×
214

215
    now = util.now()
×
216
    if eta_s := memcache.incr(cache_key, int(delay.total_seconds())):
×
217
        eta = datetime.fromtimestamp(eta_s, timezone.utc)
×
218
        if eta > now:
×
219
            return eta
×
220

221
    # incr failed (key doesn't exist) or timestamp is in the past, set it to now
222
    #
223
    # note that this isn't synchronized; multiple callers may race and both get now
224
    # as the returned ETA. that's ok, we don't depend on this for correctness in any
225
    # way, just best-effort rate limiting.
226
    memcache.set(cache_key, int(now.timestamp()))
×
227
    return now
×
228

229

230
###########################################
231

232
# https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945
233
#
234
# fixes "RuntimeError: Key has already been set in this batch" errors due to
235
# tasklets in pages.serve_feed
236
from logging import error as log_error
×
237
from sys import modules
×
238

239
from google.cloud.datastore_v1.types.entity import Key
×
240
from google.cloud.ndb._cache import (
×
241
    _GlobalCacheSetBatch,
242
    global_compare_and_swap,
243
    global_set_if_not_exists,
244
    global_watch,
245
)
246
from google.cloud.ndb.tasklets import Future, Return, tasklet
×
247

248
GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
×
249
LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
×
250
LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME
×
251

252

253
@tasklet
×
254
def custom_global_lock_for_read(key: str, value: str):
×
255
    if value is not None:
×
256
        yield global_watch(key, value)
×
257
        lock_acquired = yield global_compare_and_swap(
×
258
            key, LOCKED_FOR_READ, expires=LOCK_TIME
259
        )
260
    else:
261
        lock_acquired = yield global_set_if_not_exists(
×
262
            key, LOCKED_FOR_READ, expires=LOCK_TIME
263
        )
264

265
    if lock_acquired:
×
266
        raise Return(LOCKED_FOR_READ)
×
267

268
modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc