• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snarfed / bridgy-fed / 1de5d93a-d11b-4fc5-ab0e-f7b468137a48

21 May 2025 05:54PM UTC coverage: 92.423% (-0.008%) from 92.431%
1de5d93a-d11b-4fc5-ab0e-f7b468137a48

push

circleci

snarfed
login/settings: fix /pages tests for #1939

5038 of 5451 relevant lines covered (92.42%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.0
/memcache.py
1
"""Utilities for caching data in memcache."""
2
from datetime import timedelta
1✔
3
import functools
1✔
4
import logging
1✔
5
import os
1✔
6

7
from google.cloud.ndb.global_cache import _InProcessGlobalCache, MemcacheCache
1✔
8
from granary import as1
1✔
9
from oauth_dropins.webutil import appengine_info, util
1✔
10
from pymemcache.client.base import PooledClient
1✔
11
from pymemcache.serde import PickleSerde
1✔
12

13
from mock_memcache import CasMockMemcacheClient
1✔
14

15
logger = logging.getLogger(__name__)
1✔
16

17
# https://github.com/memcached/memcached/wiki/Commands#standard-protocol
18
KEY_MAX_LEN = 250
1✔
19

20
MEMOIZE_VERSION = 2
1✔
21

22
NOTIFY_TASK_FREQ = timedelta(hours=1)
1✔
23

24

25
if appengine_info.DEBUG or appengine_info.LOCAL_SERVER:
1✔
26
    logger.info('Using in memory mock memcache')
1✔
27
    memcache = CasMockMemcacheClient(allow_unicode_keys=True)
1✔
28
    pickle_memcache = CasMockMemcacheClient(allow_unicode_keys=True,
1✔
29
                                            serde=PickleSerde())
30
    global_cache = _InProcessGlobalCache()
1✔
31
else:
32
    logger.info('Using production Memorystore memcache')
×
33
    memcache = PooledClient(os.environ['MEMCACHE_HOST'], allow_unicode_keys=True,
×
34
                            timeout=10, connect_timeout=10) # seconds
35
    pickle_memcache = PooledClient(os.environ['MEMCACHE_HOST'],
×
36
                                   serde=PickleSerde(), allow_unicode_keys=True,
37
                                   timeout=10, connect_timeout=10)  # seconds
38
    global_cache = MemcacheCache(memcache)
×
39

40

41
def key(key):
1✔
42
    """Preprocesses a memcache key. Right now just truncates it to 250 chars.
43

44
    https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html
45
    https://github.com/memcached/memcached/wiki/Commands#standard-protocol
46

47
    TODO: truncate to 250 *UTF-8* chars, to handle Unicode chars in URLs. Related:
48
    pymemcache Client's allow_unicode_keys constructor kwarg.
49

50
    Args:
51
      key (str)
52

53
    Returns:
54
      bytes:
55
    """
56
    assert isinstance(key, str), repr(key)
1✔
57
    return key.replace(' ', '%20').encode()[:KEY_MAX_LEN]
1✔
58

59

60
def memoize_key(fn, *args, _version=MEMOIZE_VERSION, **kwargs):
1✔
61
    return key(f'{fn.__qualname__}-{_version}-{repr(args)}-{repr(kwargs)}')
1✔
62

63

64
NONE = ()  # empty tuple
1✔
65

66
def memoize(expire=None, key=None, write=True, version=MEMOIZE_VERSION):
1✔
67
    """Memoize function decorator that stores the cached value in memcache.
68

69
    Args:
70
      expire (timedelta): optional, expiration
71
      key (callable): function that takes the function's ``(*args, **kwargs)``
72
        and returns the cache key to use. If it returns None, memcache won't be
73
        used.
74
      write (bool or callable): whether to write to memcache. If this is a
75
        callable, it will be called with the function's ``(*args, **kwargs)``
76
        and should return True or False.
77
      version (int): overrides our default version number in the memcache key.
78
        Bumping this version can have the same effect as clearing the cache for
79
        just the affected function.
80
    """
81
    if expire:
1✔
82
        expire = int(expire.total_seconds())
1✔
83

84
    def decorator(fn):
1✔
85
        @functools.wraps(fn)
1✔
86
        def wrapped(*args, **kwargs):
1✔
87
            cache_key = None
1✔
88
            if key:
1✔
89
                key_val = key(*args, **kwargs)
1✔
90
                if key_val:
1✔
91
                    cache_key = memoize_key(fn, key_val, _version=version)
1✔
92
            else:
93
                cache_key = memoize_key(fn, *args, _version=version, **kwargs)
1✔
94

95
            if cache_key:
1✔
96
                val = pickle_memcache.get(cache_key)
1✔
97
                if val is not None:
1✔
98
                    logger.debug(f'cache hit {cache_key} {repr(val)[:100]}')
1✔
99
                    return None if val == NONE else val
1✔
100
                else:
101
                    logger.debug(f'cache miss {cache_key}')
1✔
102

103
            val = fn(*args, **kwargs)
1✔
104

105
            if cache_key:
1✔
106
                write_cache = (write if isinstance(write, bool)
1✔
107
                               else write(*args, **kwargs))
108
                if write_cache:
1✔
109
                    logger.debug(f'cache set {cache_key} {repr(val)[:100]}')
1✔
110
                    pickle_memcache.set(cache_key, NONE if val is None else val,
1✔
111
                                        expire=expire)
112

113
            return val
1✔
114

115
        return wrapped
1✔
116

117
    return decorator
1✔
118

119

120
def notification_key(user):
1✔
121
    return key(f'notifs-{user.key.id()}')
1✔
122

123

124
def add_notification(user, obj):
1✔
125
    """Adds a notification for a given user.
126

127
    The memcache key is ``notifs-{user id}``. The value is a space-separated list of
128
    object URLs to notify the user of.
129

130
    Uses gets/cas to create the cache entry if it doesn't exist.
131

132
    Args:
133
      user (models.User): the user to notify
134
      obj (models.Object): the object to notify about
135
    """
136
    import common
1✔
137

138
    key = notification_key(user)
1✔
139
    obj_url = as1.get_url(obj.as1) or obj.key.id()
1✔
140
    assert obj_url
1✔
141

142
    # TODO: remove to launch
143
    if (user.key.id() not in common.BETA_USER_IDS
1✔
144
            and not (appengine_info.DEBUG or appengine_info.LOCAL_SERVER)):
145
        return
×
146

147
    if not util.is_web(obj_url):
1✔
148
        logger.info(f'Dropping non-URL notif {obj_url} for {user.key.id()}')
1✔
149
        return
1✔
150

151
    logger.info(f'Adding notif {obj_url} for {user.key.id()}')
1✔
152

153
    notifs, cas_token = memcache.gets(key)
1✔
154

155
    if notifs is None:
1✔
156
        if memcache.cas(key, obj_url.encode(), cas_token) in (True, None):
1✔
157
            common.create_task(queue='notify', delay=NOTIFY_TASK_FREQ,
1✔
158
                               user_id=user.key.id(), protocol=user.LABEL)
159
            return
1✔
160

161
        # ...otherwise, if cas returned False, that means a notification was added
162
        # between our gets and our cas, so append to it, below
163

164
    elif notifs and obj_url in notifs.decode().split():
1✔
165
        # this notif URL has already been added
166
        return
1✔
167

168
    memcache.append(key, (' ' + obj_url).encode())
1✔
169

170

171
def get_notifications(user, clear=False):
1✔
172
    """Gets enqueued notifications for a given user.
173

174
    The memcache key is ``notifs-{user id}``.
175

176
    Args:
177
      user (models.User)
178
      clear (bool): clear notifications from memcache after fetching them
179

180
    Returns:
181
      list of str: URLs to notify the user of; possibly empty
182
    """
183
    key = notification_key(user)
1✔
184
    notifs = memcache.get(key, default=b'').decode().strip().split()
1✔
185

186
    if notifs and clear:
1✔
187
        memcache.delete(key)
1✔
188

189
    return notifs
1✔
190

191

192
###########################################
193

194
# https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945
195
#
196
# fixes "RuntimeError: Key has already been set in this batch" errors due to
197
# tasklets in pages.serve_feed
198
from logging import error as log_error
1✔
199
from sys import modules
1✔
200

201
from google.cloud.datastore_v1.types.entity import Key
1✔
202
from google.cloud.ndb._cache import (
1✔
203
    _GlobalCacheSetBatch,
204
    global_compare_and_swap,
205
    global_set_if_not_exists,
206
    global_watch,
207
)
208
from google.cloud.ndb.tasklets import Future, Return, tasklet
1✔
209

210
GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
1✔
211
LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
1✔
212
LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME
1✔
213

214

215
@tasklet
1✔
216
def custom_global_lock_for_read(key: str, value: str):
1✔
217
    if value is not None:
1✔
218
        yield global_watch(key, value)
1✔
219
        lock_acquired = yield global_compare_and_swap(
1✔
220
            key, LOCKED_FOR_READ, expires=LOCK_TIME
221
        )
222
    else:
223
        lock_acquired = yield global_set_if_not_exists(
1✔
224
            key, LOCKED_FOR_READ, expires=LOCK_TIME
225
        )
226

227
    if lock_acquired:
1✔
228
        raise Return(LOCKED_FOR_READ)
1✔
229

230
modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc