• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

thomwiggers / onebot / 20855313676

09 Jan 2026 02:37PM UTC coverage: 67.247% (+0.4%) from 66.802%
20855313676

push

github

thomwiggers
Apply suggestion from @thomwiggers

850 of 1264 relevant lines covered (67.25%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.69
/onebot/plugins/users.py
1
# -*- coding: utf8 -*-
2
"""
3
==============================================
4
:mod:`onebot.plugins.users` Users plugin
5
==============================================
6

7
Keeps track of the users in channels. Also provides an authorisation system.
8
This plugin uses WHOIS to figure out someones NickServ account and then links
9
that to an automatically created, in-bot account.
10
"""
11

12
from __future__ import unicode_literals, print_function
5✔
13

14
import ast
5✔
15
import asyncio
5✔
16
import re
5✔
17
from typing import (
5✔
18
    Any,
19
    Awaitable,
20
    Callable,
21
    Dict,
22
    Iterable,
23
    Literal,
24
    Optional,
25
    Self,
26
    Set,
27
)
28

29
import irc3
5✔
30
from irc3.plugins.storage import Storage
5✔
31
from irc3.utils import IrcString
5✔
32

33

34
class User(object):
5✔
35
    """User object"""
36

37
    def __init__(
5✔
38
        self,
39
        mask: IrcString,
40
        channels: Iterable[str],
41
        id_: Callable[[], Awaitable[str]],
42
        database=None,
43
    ):
44
        self.nick = mask.nick
5✔
45
        self.host = mask.host
5✔
46
        self.channels: Set[str] = set()
5✔
47
        self.id: Callable[[], Awaitable[str]] = id_
5✔
48
        self.database: Optional[Storage] = database
5✔
49
        try:
5✔
50
            if isinstance(channels, str):
5✔
51
                raise ValueError("You must specify a list of channels!")
5✔
52
            for c in iter(channels):
5✔
53
                self.channels.add(c)
5✔
54
        except TypeError:
5✔
55
            raise ValueError("You need to specify in which channel this user is!")
5✔
56

57
    @property
5✔
58
    def mask(self) -> IrcString:
5✔
59
        """Get the mask of this user"""
60
        return IrcString("{}!{}".format(self.nick, self.host))
5✔
61

62
    def _get_database(self) -> Storage:
5✔
63
        if self.database is None:
5✔
64
            raise Exception("No database set for this user.")
×
65
        return self.database
5✔
66

67
    def set_settings(self, settings) -> None:
5✔
68
        """Replaces the settings with the provided dictionary"""
69

70
        async def wrapper() -> None:
5✔
71
            id_ = await self.id()
5✔
72
            self._get_database()[id_] = settings
5✔
73

74
        asyncio.ensure_future(wrapper())
5✔
75

76
    def set_setting(self, setting: str, value: Any) -> None:
5✔
77
        """Set a specified setting to a value"""
78
        print("Trying to set %s to %s" % (setting, value))
5✔
79

80
        async def wrapper():
5✔
81
            id_ = await self.id()
5✔
82
            self._get_database().set(id_, **{setting: value})
5✔
83

84
        asyncio.ensure_future(wrapper())
5✔
85

86
    async def get_settings(self) -> Dict[str, Any]:
5✔
87
        """Get this users settings"""
88
        id_ = await self.id()
5✔
89
        return self._get_database().get(id_, dict())  # type: ignore
5✔
90

91
    async def get_setting(self, setting, default=None) -> Any:
5✔
92
        """Gets a setting for the users. Can be any type."""
93
        settings = await self.get_settings()
5✔
94
        result = settings.get(setting, default)
5✔
95
        if isinstance(result, str):
5✔
96
            try:
5✔
97
                parsed = ast.literal_eval(result)
5✔
98
                return parsed
×
99
            except (ValueError, SyntaxError):
5✔
100
                pass
5✔
101

102
        return result
5✔
103

104
    def join(self, channel) -> None:
5✔
105
        """Register that the user joined a channel"""
106
        self.channels.add(channel)
5✔
107

108
    def part(self, channel) -> None:
5✔
109
        """Register that the user parted a channel"""
110
        self.channels.remove(channel)
5✔
111

112
    def still_in_channels(self) -> bool:
5✔
113
        """Is the user still in channels?"""
114
        return len(self.channels) > 0
5✔
115

116
    def __eq__(self, other: object) -> bool:
5✔
117
        """Compare users by nick
118

119
        Since nicks are unique this works for exactly one irc server.
120
        """
121
        if not isinstance(other, self.__class__):
5✔
122
            return False
×
123
        return self.nick == other.nick
5✔
124

125

126
def redact_nick(nick: str) -> str:
5✔
127
    """Inserts a middle dot after the first character of a nick"""
128
    if len(nick) <= 1:
5✔
129
        return nick
×
130
    return nick[0] + "·" + nick[1:]
5✔
131

132

133
@irc3.plugin
5✔
134
class UsersPlugin(object):
5✔
135
    """User management plugin for OneBot
136

137
    Doesn't do anything with NAMES because we can't get hosts through
138
    NAMES
139

140
    Configuration settings:
141
        - ``identify_by``: the identification method
142

143
    Identification methods available:
144
        - ``mask``: Use the hostmask
145
        - ``whatcd``: Get the what.cd username from the host mask
146
        - ``nickserv``: Parse nickserv info from ``WHOIS``.
147
    """
148

149
    requires = ["irc3.plugins.storage", "irc3.plugins.asynchronious"]
5✔
150

151
    def __init__(self, bot: irc3.IrcBot):
5✔
152
        """Initialises the plugin"""
153
        self.bot = bot
5✔
154
        config = bot.config.get(__name__, {})
5✔
155
        method = config.get("identify_by", "mask")
5✔
156
        if method not in ("mask", "nickserv", "whatcd"):
5✔
157
            raise Exception(
×
158
                "Invalid configuration: UsersPlugin.identifying_method invalid"
159
            )
160
        self.identifying_method: Literal["mask", "nickserv", "whatcd"] = method
5✔
161
        self.log = bot.log.getChild(__name__)
5✔
162
        self.connection_lost()
5✔
163

164
    @irc3.extend
5✔
165
    def get_user(self, nick: str):
5✔
166
        user = self.active_users.get(nick)
5✔
167
        if not user:
5✔
168
            self.log.warning("Couldn't find %s!", nick)
5✔
169
        return user
5✔
170

171
    @irc3.extend
5✔
172
    def redact_nicks(self, message: str, target: Optional[str] = None) -> str:
5✔
173
        """Redacts all known nicks in the message.
174

175
        If target is provided, only redacts nicks that are in that channel.
176
        """
177
        if target:
5✔
178
            nicks = [n for n, u in self.active_users.items() if target in u.channels]
5✔
179
        else:
180
            nicks = list(self.active_users.keys())
5✔
181

182
        if not nicks:
5✔
183
            return message
5✔
184

185
        def replace(match):
5✔
186
            return redact_nick(match.group(0))
5✔
187

188
        # Sort by length descending to match longest possible nick first
189
        nicks.sort(key=len, reverse=True)
5✔
190
        escaped_nicks = [re.escape(n) for n in nicks if len(n) > 1]
5✔
191
        if not escaped_nicks:
5✔
192
            return message
×
193

194
        nick_chars = r"A-Za-z0-9_\\\[\]\{\}^`|-"
5✔
195
        pattern = re.compile(
5✔
196
            rf"(?<![{nick_chars}])({'|'.join(escaped_nicks)})(?![{nick_chars}])",
197
            re.IGNORECASE,
198
        )
199
        return pattern.sub(replace, message)
5✔
200

201
    @irc3.event(irc3.rfc.JOIN_PART_QUIT)
5✔
202
    def on_join_part_quit(self, mask: IrcString, **kwargs):
5✔
203
        event = kwargs["event"]
5✔
204
        self.log.debug("%s %sed", mask.nick, event.lower())
5✔
205
        getattr(self, event.lower())(mask.nick, mask, **kwargs)
5✔
206

207
    @irc3.event(irc3.rfc.KICK)
5✔
208
    def on_kick(self, mask: IrcString, target: IrcString, **kwargs):
5✔
209
        self.log.debug("%s kicked %s", mask.nick, target.nick)
5✔
210
        self.part(target.nick, target, **kwargs)
5✔
211

212
    @irc3.event(irc3.rfc.NEW_NICK)
5✔
213
    def on_new_nick(self, nick: IrcString, new_nick: IrcString, **kwargs):
5✔
214
        self.log.debug("%s renamed to %s", nick.nick, new_nick)
5✔
215
        if nick.nick in self.active_users:
5✔
216
            user = self.active_users[nick.nick]
5✔
217
            user.nick = new_nick
5✔
218
            del self.active_users[nick.nick]
5✔
219
            self.active_users[new_nick] = user
5✔
220

221
    @irc3.event(irc3.rfc.PRIVMSG)
5✔
222
    def on_privmsg(
5✔
223
        self,
224
        mask: IrcString,
225
        event: Literal["PRIVMSG", "NOTICE"],
226
        target: IrcString,
227
        data=None,
228
    ):
229
        if target not in self.channels:
5✔
230
            return
5✔
231
        if mask.is_nick and mask.nick not in self.active_users:
5✔
232
            self.log.debug("Found user %s via PRIVMSG", mask.nick)
5✔
233
            self.active_users[mask.nick] = self.create_user(mask, [target])
5✔
234
        else:
235
            self.active_users[mask.nick].join(target)
5✔
236

237
    def connection_lost(self):
5✔
238
        self.channels = set()
5✔
239
        self.active_users = dict()
5✔
240

241
    def join(self, nick: IrcString, mask: IrcString, channel: IrcString, **kwargs):
5✔
242
        self.log.debug("%s joined channel %s", nick, channel)
5✔
243
        # This can only be observed if we're in that channel
244
        self.channels.add(channel)
5✔
245
        if nick == self.bot.nick:
5✔
246
            self.bot.send("WHO {}".format(channel))
5✔
247

248
        if nick not in self.active_users:
5✔
249
            self.active_users[nick] = self.create_user(mask, [channel])
5✔
250

251
        self.active_users[nick].join(channel)
5✔
252

253
    def quit(self, nick, _mask, **kwargs):
5✔
254
        if nick == self.bot.nick:
5✔
255
            self.connection_lost()
5✔
256

257
        if nick in self.active_users:
5✔
258
            del self.active_users[nick]
5✔
259

260
    def part(self, nick, mask, channel=None, **kwargs):
5✔
261
        if nick == self.bot.nick:
5✔
262
            self.log.info("%s left %s by %s", nick, channel, kwargs["event"])
5✔
263
            for n, user in self.active_users.copy().items():
5✔
264
                user.part(channel)
5✔
265
                if not user.still_in_channels():
5✔
266
                    del self.active_users[n]
5✔
267
            # Remove channel from administration
268
            self.channels.remove(channel)
5✔
269

270
        if nick not in self.active_users:
5✔
271
            return
5✔
272

273
        self.active_users[nick].part(channel)
5✔
274
        if not self.active_users[nick].still_in_channels():
5✔
275
            self.log.debug("Lost %s out of sight", mask.nick)
5✔
276
            del self.active_users[nick]
5✔
277

278
    @irc3.event(irc3.rfc.RPL_NAMREPLY)
5✔
279
    def on_names(self, channel: IrcString, data: str, **kwargs):
5✔
280
        """Initialise channel list and channel.modes"""
281
        # possible modes from server
282
        statusmsg = self.bot.server_config["STATUSMSG"]
5✔
283
        nicknames = data.split(" ")
5✔
284
        if channel not in self.channels:
5✔
285
            self.log.warning("I got NAMES for a channel I'm not in: %", channel)
×
286
            return
×
287
        for item in nicknames:
5✔
288
            nick = item.strip(statusmsg)
5✔
289
            if nick not in self.active_users:
5✔
290
                # We don't have the mask here, so skip setting up the user
291
                continue
5✔
292
            self.active_users[nick].join(channel)
×
293

294
    @irc3.event(irc3.rfc.RPL_WHOREPLY)
5✔
295
    def on_who(
5✔
296
        self,
297
        channel: IrcString,
298
        nick: IrcString,
299
        username=None,
300
        host=None,
301
        server=None,
302
        **kwargs,
303
    ):
304
        """Process a WHO reply since it could contain new information.
305

306
        Should only be processed for channels we are currently in!
307
        """
308
        if channel not in self.channels:
5✔
309
            self.log.debug(
5✔
310
                "Got WHO for channel I'm not in: {chan}".format(chan=channel)
311
            )
312
            return
5✔
313

314
        self.log.debug("Got WHO for %s: %s (%s)", channel, nick, host)
5✔
315

316
        if nick not in self.active_users:
5✔
317
            mask = IrcString("{}!{}@{}".format(nick, username, host))
5✔
318
            self.active_users[nick] = self.create_user(mask, [channel])
5✔
319
        else:
320
            self.active_users[nick].join(channel)
5✔
321

322
    def create_user(self, mask: IrcString, channels: Iterable[str | IrcString]):
5✔
323
        """Return a User object"""
324
        if self.identifying_method == "mask":
5✔
325

326
            async def mask_id_func() -> str:
5✔
327
                assert mask.host is not None
5✔
328
                return mask.host
5✔
329

330
            return User(mask, channels, mask_id_func, self.bot.db)
5✔
331
        if self.identifying_method == "nickserv":
5✔
332

333
            async def get_account() -> str:
5✔
334
                assert mask.nick is not None
5✔
335
                user = self.get_user(mask.nick)
5✔
336
                if hasattr(user, "account"):
5✔
337
                    return user.account
5✔
338
                result = await self.bot.async_cmds.whois(mask.nick)
5✔
339
                if result["success"] and "account" in result:
5✔
340
                    user.account = str(result["account"])
5✔
341
                    return user.account
5✔
342
                else:
343
                    assert mask.host is not None
×
344
                    return mask.host
×
345

346
            return User(mask, channels, get_account, self.bot.db)
5✔
347
        if self.identifying_method == "whatcd":
5✔
348

349
            async def id_func():
5✔
350
                assert mask.host is not None
5✔
351
                match = re.match(r"^\d+@(.*)\.\w+\.what\.cd", mask.host.lower())
5✔
352
                if match:
5✔
353
                    return match.group(1)
5✔
354
                else:
355
                    self.log.debug(
×
356
                        "Failed to extract what.cd user namefrom {mask}".format(
357
                            mask=mask
358
                        )
359
                    )
360
                    return mask.host
×
361

362
            return User(mask, channels, id_func, self.bot.db)
5✔
363
        else:  # pragma: no cover
364
            raise ValueError("A valid identifying method should be configured")
365

366
    @classmethod
367
    def reload(cls, old: Self) -> Self:  # pragma: no cover
368
        users = old.active_users
369
        newinstance = cls(old.bot)
370
        for user in users.values():
371
            user.database = newinstance.bot.db
372
        newinstance.channels = old.channels
373
        newinstance.active_users = users
374
        return newinstance
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc