• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snarfed / bridgy-fed / 5abbabff-b03a-44c5-afeb-5abadbd2cd32

22 Oct 2025 09:31PM UTC coverage: 92.892%. Remained the same
5abbabff-b03a-44c5-afeb-5abadbd2cd32

push

circleci

snarfed
memcache: set ndb global cache strict_read/write to False

makes us ignore transient memcache errors in ndb ops

0 of 1 new or added line in 1 file covered. (0.0%)

5972 of 6429 relevant lines covered (92.89%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.74
/memcache.py
1
"""Utilities for caching data in memcache."""
2
from datetime import datetime, timedelta, timezone
1✔
3
import functools
1✔
4
import logging
1✔
5
import os
1✔
6

7
import config
1✔
8
from google.cloud.ndb._cache import global_cache_key
1✔
9
from google.cloud.ndb.global_cache import _InProcessGlobalCache, MemcacheCache
1✔
10
from oauth_dropins.webutil import appengine_info, util
1✔
11
from pymemcache.client.base import PooledClient
1✔
12
from pymemcache.serde import PickleSerde
1✔
13
from pymemcache.test.utils import MockMemcacheClient
1✔
14

15
logger = logging.getLogger(__name__)
1✔
16

17
# https://github.com/memcached/memcached/wiki/Commands#standard-protocol
18
KEY_MAX_LEN = 250
1✔
19

20
MEMOIZE_VERSION = 2
1✔
21

22
# per-user rates for running tasks. rate limits and spreads out tasks for bursty
23
# users. https://github.com/snarfed/bridgy-fed/issues/1788
24
PER_USER_TASK_RATES = {
1✔
25
    'receive': timedelta(seconds=5),
26
}
27

28
# https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html#pymemcache.client.base.Client.__init__
29
kwargs = {
1✔
30
    'server': os.environ.get('MEMCACHE_HOST', 'localhost'),
31
    'allow_unicode_keys': True,
32
    'default_noreply': False,
33
    'timeout': 10,   # seconds
34
    'connect_timeout': 10,   # seconds
35
}
36

37
if appengine_info.DEBUG or appengine_info.LOCAL_SERVER:
1✔
38
    logger.info('Using in memory mock memcache')
1✔
39
    memcache = PooledClient(max_pool_size=1, **kwargs)
1✔
40
    pickle_memcache = PooledClient(max_pool_size=1, serde=PickleSerde(), **kwargs)
1✔
41
    memcache.client_class = pickle_memcache.client_class = MockMemcacheClient
1✔
42
    global_cache = _InProcessGlobalCache()
1✔
43
else:
44
    logger.info('Using production Memorystore memcache')
×
45
    memcache = PooledClient(**kwargs)
×
46
    pickle_memcache = PooledClient(serde=PickleSerde(), **kwargs)
×
NEW
47
    global_cache = MemcacheCache(memcache, strict_read=False, strict_write=False)
×
48

49

50
def key(key):
1✔
51
    """Preprocesses a memcache key. Right now just truncates it to 250 chars.
52

53
    https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html
54
    https://github.com/memcached/memcached/wiki/Commands#standard-protocol
55

56
    TODO: truncate to 250 *UTF-8* chars, to handle Unicode chars in URLs. Related:
57
    pymemcache Client's allow_unicode_keys constructor kwarg.
58

59
    Args:
60
      key (str)
61

62
    Returns:
63
      bytes:
64
    """
65
    assert isinstance(key, str), repr(key)
1✔
66
    return key.replace(' ', '%20').encode()[:KEY_MAX_LEN]
1✔
67

68

69
def memoize_key(fn, *args, _version=MEMOIZE_VERSION, **kwargs):
1✔
70
    return key(f'{fn.__qualname__}-{_version}-{repr(args)}-{repr(kwargs)}')
1✔
71

72

73
NONE = ()  # empty tuple
1✔
74

75
def memoize(expire=None, key=None, write=True, version=MEMOIZE_VERSION):
1✔
76
    """Memoize function decorator that stores the cached value in memcache.
77

78
    Args:
79
      expire (datetime.timedelta): optional, expiration
80
      key (callable): function that takes the function's ``(*args, **kwargs)``
81
        and returns the cache key to use. If it returns None, memcache won't be
82
        used.
83
      write (bool or callable): whether to write to memcache. If this is a
84
        callable, it will be called with the function's ``(*args, **kwargs)``
85
        and should return True or False.
86
      version (int): overrides our default version number in the memcache key.
87
        Bumping this version can have the same effect as clearing the cache for
88
        just the affected function.
89
    """
90
    expire = int(expire.total_seconds()) if expire else 0
1✔
91

92
    def decorator(fn):
1✔
93
        @functools.wraps(fn)
1✔
94
        def wrapped(*args, **kwargs):
1✔
95
            cache_key = None
1✔
96
            if key:
1✔
97
                key_val = key(*args, **kwargs)
1✔
98
                if key_val:
1✔
99
                    cache_key = memoize_key(fn, key_val, _version=version)
1✔
100
            else:
101
                cache_key = memoize_key(fn, *args, _version=version, **kwargs)
1✔
102

103
            if pickle_memcache and cache_key:
1✔
104
                val = pickle_memcache.get(cache_key)
1✔
105
                if val is not None:
1✔
106
                    logger.debug(f'cache hit {cache_key} {repr(val)[:100]}')
1✔
107
                    return None if val == NONE else val
1✔
108
                else:
109
                    logger.debug(f'cache miss {cache_key}')
1✔
110

111
            val = fn(*args, **kwargs)
1✔
112

113
            if pickle_memcache and cache_key:
1✔
114
                write_cache = (write if isinstance(write, bool)
1✔
115
                               else write(*args, **kwargs))
116
                if write_cache:
1✔
117
                    logger.debug(f'cache set {cache_key} {repr(val)[:100]}')
1✔
118
                    pickle_memcache.set(cache_key, NONE if val is None else val,
1✔
119
                                        expire=expire)
120

121
            return val
1✔
122

123
        return wrapped
1✔
124

125
    return decorator
1✔
126

127

128
def evict(entity_key):
1✔
129
    """Evict a datastore entity from memcache.
130

131
    For :class:`models.User` and :class:`models.Object` entities, also clears their
132
    copies from the :func:`models.get_original_user_key` and
133
    :func:`models.get_original_object_key` memoize caches.
134

135
    Args:
136
      entity_key (google.cloud.ndb.Key)
137
    """
138
    if entity := entity_key.get():
1✔
139
        for val in getattr(entity, 'copies', []):
1✔
140
            entity.clear_get_original_cache(val.uri)
1✔
141

142
    global_cache.delete([global_cache_key(entity_key._key)])
1✔
143

144

145
def remote_evict(entity_key):
1✔
146
    """Send a request to production Bridgy Fed to evict an entity from memcache.
147

148
    Args:
149
      entity_key (google.cloud.ndb.Key)
150

151
    Returns:
152
      requests.Response:
153
    """
154
    from common import PRIMARY_DOMAIN
1✔
155

156
    return util.requests_post(f'https://{PRIMARY_DOMAIN}/admin/memcache-evict',
1✔
157
                              headers={'Authorization': config.SECRET_KEY},
158
                              data={'key': entity_key.urlsafe()})
159

160

161
def task_eta(queue, user_id):
1✔
162
    """Get the ETA to use for a given user's task in a given queue.
163

164
    Task rate limit delays are per user, stored in memcache with a key based on
165
    ``queue`` and ``user_id`` and an integer value of POSIX timestamp (UTC) in
166
    seconds.
167

168
    Only generates ETAs for task queues in :attr:`PER_USER_TASK_RATES`. Calls for
169
    other queues always return ``None``.
170

171
    Background: https://github.com/snarfed/bridgy-fed/issues/1788
172

173
    Args:
174
      queue (str)
175
      user_id (str)
176

177
    Returns:
178
      datetime.datetime: the ETA for this task, or ``None`` if the ETA is now
179
    """
180
    if not (delay := PER_USER_TASK_RATES.get(queue)):
1✔
181
        return None
1✔
182

183
    cache_key = key(f'task-delay-{queue}-{user_id}')
1✔
184

185
    now = util.now()
1✔
186
    if eta_s := memcache.incr(cache_key, int(delay.total_seconds())):
1✔
187
        eta = datetime.fromtimestamp(eta_s, timezone.utc)
1✔
188
        if eta > now:
1✔
189
            return eta
1✔
190

191
    # incr failed (key doesn't exist) or timestamp is in the past, set it to now
192
    #
193
    # note that this isn't synchronized; multiple callers may race and both get now
194
    # as the returned ETA. that's ok, we don't depend on this for correctness in any
195
    # way, just best-effort rate limiting.
196
    memcache.set(cache_key, int(now.timestamp()))
1✔
197
    return now
1✔
198

199

200
###########################################
201

202
# https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945
203
#
204
# fixes "RuntimeError: Key has already been set in this batch" errors due to
205
# tasklets in pages.serve_feed
206
from logging import error as log_error
1✔
207
from sys import modules
1✔
208

209
from google.cloud.datastore_v1.types.entity import Key
1✔
210
from google.cloud.ndb._cache import (
1✔
211
    _GlobalCacheSetBatch,
212
    global_compare_and_swap,
213
    global_set_if_not_exists,
214
    global_watch,
215
)
216
from google.cloud.ndb.tasklets import Future, Return, tasklet
1✔
217

218
GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
1✔
219
LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
1✔
220
LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME
1✔
221

222

223
@tasklet
1✔
224
def custom_global_lock_for_read(key: str, value: str):
1✔
225
    if value is not None:
1✔
226
        yield global_watch(key, value)
1✔
227
        lock_acquired = yield global_compare_and_swap(
1✔
228
            key, LOCKED_FOR_READ, expires=LOCK_TIME
229
        )
230
    else:
231
        lock_acquired = yield global_set_if_not_exists(
1✔
232
            key, LOCKED_FOR_READ, expires=LOCK_TIME
233
        )
234

235
    if lock_acquired:
1✔
236
        raise Return(LOCKED_FOR_READ)
1✔
237

238
modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc