• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snarfed / bridgy-fed / 8461c87c-a479-4c9d-8aa5-7e205561d701

21 May 2025 03:17AM UTC coverage: 93.061% (+0.03%) from 93.028%
8461c87c-a479-4c9d-8aa5-7e205561d701

push

circleci

snarfed
memcache.add_notification: add notify task if necessary

for #1200

4 of 4 new or added lines in 1 file covered. (100.0%)

12 existing lines in 3 files now uncovered.

4868 of 5231 relevant lines covered (93.06%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.92
/memcache.py
1
"""Utilities for caching data in memcache."""
2
from datetime import timedelta
1✔
3
import functools
1✔
4
import logging
1✔
5
import os
1✔
6

7
from google.cloud.ndb.global_cache import _InProcessGlobalCache, MemcacheCache
1✔
8
from granary import as1
1✔
9
from oauth_dropins.webutil import appengine_info, util
1✔
10
from pymemcache.client.base import PooledClient
1✔
11
from pymemcache.serde import PickleSerde
1✔
12

13
from tests.mock_memcache import CasMockMemcacheClient
1✔
14

15
logger = logging.getLogger(__name__)
1✔
16

17
# https://github.com/memcached/memcached/wiki/Commands#standard-protocol
18
KEY_MAX_LEN = 250
1✔
19

20
MEMOIZE_VERSION = 2
1✔
21

22
NOTIFY_TASK_FREQ = timedelta(hours=1)
1✔
23

24

25
if appengine_info.DEBUG or appengine_info.LOCAL_SERVER:
1✔
26
    logger.info('Using in memory mock memcache')
1✔
27
    memcache = CasMockMemcacheClient(allow_unicode_keys=True)
1✔
28
    pickle_memcache = CasMockMemcacheClient(allow_unicode_keys=True,
1✔
29
                                            serde=PickleSerde())
30
    global_cache = _InProcessGlobalCache()
1✔
31
else:
UNCOV
32
    logger.info('Using production Memorystore memcache')
×
33
    memcache = PooledClient(os.environ['MEMCACHE_HOST'], allow_unicode_keys=True,
×
34
                            timeout=10, connect_timeout=10) # seconds
UNCOV
35
    pickle_memcache = PooledClient(os.environ['MEMCACHE_HOST'],
×
36
                                   serde=PickleSerde(), allow_unicode_keys=True,
37
                                   timeout=10, connect_timeout=10)  # seconds
UNCOV
38
    global_cache = MemcacheCache(memcache)
×
39

40

41
def key(key):
1✔
42
    """Preprocesses a memcache key. Right now just truncates it to 250 chars.
43

44
    https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html
45
    https://github.com/memcached/memcached/wiki/Commands#standard-protocol
46

47
    TODO: truncate to 250 *UTF-8* chars, to handle Unicode chars in URLs. Related:
48
    pymemcache Client's allow_unicode_keys constructor kwarg.
49

50
    Args:
51
      key (str)
52

53
    Returns:
54
      bytes:
55
    """
56
    assert isinstance(key, str), repr(key)
1✔
57
    return key.replace(' ', '%20').encode()[:KEY_MAX_LEN]
1✔
58

59

60
def memoize_key(fn, *args, _version=MEMOIZE_VERSION, **kwargs):
1✔
61
    return key(f'{fn.__qualname__}-{_version}-{repr(args)}-{repr(kwargs)}')
1✔
62

63

64
NONE = ()  # empty tuple
1✔
65

66
def memoize(expire=None, key=None, write=True, version=MEMOIZE_VERSION):
1✔
67
    """Memoize function decorator that stores the cached value in memcache.
68

69
    Args:
70
      expire (timedelta): optional, expiration
71
      key (callable): function that takes the function's ``(*args, **kwargs)``
72
        and returns the cache key to use. If it returns None, memcache won't be
73
        used.
74
      write (bool or callable): whether to write to memcache. If this is a
75
        callable, it will be called with the function's ``(*args, **kwargs)``
76
        and should return True or False.
77
      version (int): overrides our default version number in the memcache key.
78
        Bumping this version can have the same effect as clearing the cache for
79
        just the affected function.
80
    """
81
    if expire:
1✔
82
        expire = int(expire.total_seconds())
1✔
83

84
    def decorator(fn):
1✔
85
        @functools.wraps(fn)
1✔
86
        def wrapped(*args, **kwargs):
1✔
87
            cache_key = None
1✔
88
            if key:
1✔
89
                key_val = key(*args, **kwargs)
1✔
90
                if key_val:
1✔
91
                    cache_key = memoize_key(fn, key_val, _version=version)
1✔
92
            else:
93
                cache_key = memoize_key(fn, *args, _version=version, **kwargs)
1✔
94

95
            if cache_key:
1✔
96
                val = pickle_memcache.get(cache_key)
1✔
97
                if val is not None:
1✔
98
                    logger.debug(f'cache hit {cache_key} {repr(val)[:100]}')
1✔
99
                    return None if val == NONE else val
1✔
100
                else:
101
                    logger.debug(f'cache miss {cache_key}')
1✔
102

103
            val = fn(*args, **kwargs)
1✔
104

105
            if cache_key:
1✔
106
                write_cache = (write if isinstance(write, bool)
1✔
107
                               else write(*args, **kwargs))
108
                if write_cache:
1✔
109
                    logger.debug(f'cache set {cache_key} {repr(val)[:100]}')
1✔
110
                    pickle_memcache.set(cache_key, NONE if val is None else val,
1✔
111
                                        expire=expire)
112

113
            return val
1✔
114

115
        return wrapped
1✔
116

117
    return decorator
1✔
118

119

120
def notification_key(user):
1✔
121
    return key(f'notifs-{user.key.id()}')
1✔
122

123

124
def add_notification(user, obj):
1✔
125
    """Adds a notification for a given user.
126

127
    The memcache key is ``notifs-{user id}``. The value is a space-separated list of
128
    object URLs to notify the user of.
129

130
    Uses gets/cas to create the cache entry if it doesn't exist.
131

132
    Args:
133
      user (models.User): the user to notify
134
      obj (models.Object): the object to notify about
135
    """
136
    key = notification_key(user)
1✔
137
    obj_url = as1.get_url(obj.as1) or obj.key.id()
1✔
138
    assert obj_url
1✔
139

140
    if not util.is_web(obj_url):
1✔
141
        logger.info(f'Dropping non-URL notif {obj_url} for {user.key.id()}')
1✔
142
        return
1✔
143

144
    logger.info(f'Adding notif {obj_url} for {user.key.id()}')
1✔
145

146
    notifs, cas_token = memcache.gets(key)
1✔
147

148
    if notifs is None:
1✔
149
        if memcache.cas(key, obj_url.encode(), cas_token) in (True, None):
1✔
150
            import common
1✔
151
            common.create_task(queue='notify', delay=NOTIFY_TASK_FREQ,
1✔
152
                               user_id=user.key.id(), protocol=user.LABEL)
153
            return
1✔
154

155
        # ...otherwise, if cas returned False, that means a notification was added
156
        # between our gets and our cas, so append to it, below
157

158
    elif notifs and obj_url in notifs.decode().split():
1✔
159
        # this notif URL has already been added
160
        return
1✔
161

162
    memcache.append(key, (' ' + obj_url).encode())
1✔
163

164

165
def get_notifications(user, clear=False):
1✔
166
    """Gets enqueued notifications for a given user.
167

168
    The memcache key is ``notifs-{user id}``.
169

170
    Args:
171
      user (models.User)
172
      clear (bool): clear notifications from memcache after fetching them
173

174
    Returns:
175
      list of str: URLs to notify the user of; possibly empty
176
    """
177
    key = notification_key(user)
1✔
178
    notifs = memcache.get(key, default=b'').decode().strip().split()
1✔
179

180
    if notifs and clear:
1✔
181
        memcache.delete(key)
1✔
182

183
    return notifs
1✔
184

185

186
###########################################
187

188
# https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945
189
#
190
# fixes "RuntimeError: Key has already been set in this batch" errors due to
191
# tasklets in pages.serve_feed
192
from logging import error as log_error
1✔
193
from sys import modules
1✔
194

195
from google.cloud.datastore_v1.types.entity import Key
1✔
196
from google.cloud.ndb._cache import (
1✔
197
    _GlobalCacheSetBatch,
198
    global_compare_and_swap,
199
    global_set_if_not_exists,
200
    global_watch,
201
)
202
from google.cloud.ndb.tasklets import Future, Return, tasklet
1✔
203

204
GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
1✔
205
LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
1✔
206
LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME
1✔
207

208

209
@tasklet
1✔
210
def custom_global_lock_for_read(key: str, value: str):
1✔
211
    if value is not None:
1✔
212
        yield global_watch(key, value)
1✔
213
        lock_acquired = yield global_compare_and_swap(
1✔
214
            key, LOCKED_FOR_READ, expires=LOCK_TIME
215
        )
216
    else:
217
        lock_acquired = yield global_set_if_not_exists(
1✔
218
            key, LOCKED_FOR_READ, expires=LOCK_TIME
219
        )
220

221
    if lock_acquired:
1✔
222
        raise Return(LOCKED_FOR_READ)
1✔
223

224
modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc