• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

thomwiggers / onebot / 20855462621

09 Jan 2026 02:41PM UTC coverage: 66.802% (-2.3%) from 69.064%
20855462621

Pull #212

github

thomwiggers
ci: add workflow to build and publish python-sandbox image
Pull Request #212: ci: add workflow to build and publish python-sandbox image

823 of 1232 relevant lines covered (66.8%)

3.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.32
/onebot/plugins/users.py
1
# -*- coding: utf8 -*-
2
"""
3
==============================================
4
:mod:`onebot.plugins.users` Users plugin
5
==============================================
6

7
Keeps track of the users in channels. Also provides an authorisation system.
8
This plugin uses WHOIS to figure out someones NickServ account and then links
9
that to an automatically created, in-bot account.
10
"""
11

12
from __future__ import unicode_literals, print_function
5✔
13

14
import ast
5✔
15
import asyncio
5✔
16
import re
5✔
17
from typing import (
5✔
18
    Any,
19
    Awaitable,
20
    Callable,
21
    Dict,
22
    Iterable,
23
    Literal,
24
    Optional,
25
    Self,
26
    Set,
27
)
28

29
import irc3
5✔
30
from irc3.plugins.storage import Storage
5✔
31
from irc3.utils import IrcString
5✔
32

33

34
class User(object):
5✔
35
    """User object"""
36

37
    def __init__(
5✔
38
        self,
39
        mask: IrcString,
40
        channels: Iterable[str],
41
        id_: Callable[[], Awaitable[str]],
42
        database=None,
43
    ):
44
        self.nick = mask.nick
5✔
45
        self.host = mask.host
5✔
46
        self.channels: Set[str] = set()
5✔
47
        self.id: Callable[[], Awaitable[str]] = id_
5✔
48
        self.database: Optional[Storage] = database
5✔
49
        try:
5✔
50
            if isinstance(channels, str):
5✔
51
                raise ValueError("You must specify a list of channels!")
5✔
52
            for c in iter(channels):
5✔
53
                self.channels.add(c)
5✔
54
        except TypeError:
5✔
55
            raise ValueError("You need to specify in which channel this user is!")
5✔
56

57
    @property
5✔
58
    def mask(self) -> IrcString:
5✔
59
        """Get the mask of this user"""
60
        return IrcString("{}!{}".format(self.nick, self.host))
5✔
61

62
    def _get_database(self) -> Storage:
5✔
63
        if self.database is None:
5✔
64
            raise Exception("No database set for this user.")
×
65
        return self.database
5✔
66

67
    def set_settings(self, settings) -> None:
5✔
68
        """Replaces the settings with the provided dictionary"""
69

70
        async def wrapper() -> None:
5✔
71
            id_ = await self.id()
5✔
72
            self._get_database()[id_] = settings
5✔
73

74
        asyncio.ensure_future(wrapper())
5✔
75

76
    def set_setting(self, setting: str, value: Any) -> None:
5✔
77
        """Set a specified setting to a value"""
78
        print("Trying to set %s to %s" % (setting, value))
5✔
79

80
        async def wrapper():
5✔
81
            id_ = await self.id()
5✔
82
            self._get_database().set(id_, **{setting: value})
5✔
83

84
        asyncio.ensure_future(wrapper())
5✔
85

86
    async def get_settings(self) -> Dict[str, Any]:
5✔
87
        """Get this users settings"""
88
        id_ = await self.id()
5✔
89
        return self._get_database().get(id_, dict())  # type: ignore
5✔
90

91
    async def get_setting(self, setting, default=None) -> Any:
5✔
92
        """Gets a setting for the users. Can be any type."""
93
        settings = await self.get_settings()
5✔
94
        result = settings.get(setting, default)
5✔
95
        if isinstance(result, str):
5✔
96
            try:
5✔
97
                parsed = ast.literal_eval(result)
5✔
98
                return parsed
×
99
            except (ValueError, SyntaxError):
5✔
100
                pass
5✔
101

102
        return result
5✔
103

104
    def join(self, channel) -> None:
5✔
105
        """Register that the user joined a channel"""
106
        self.channels.add(channel)
5✔
107

108
    def part(self, channel) -> None:
5✔
109
        """Register that the user parted a channel"""
110
        self.channels.remove(channel)
5✔
111

112
    def still_in_channels(self) -> bool:
5✔
113
        """Is the user still in channels?"""
114
        return len(self.channels) > 0
5✔
115

116
    def __eq__(self, other: object) -> bool:
5✔
117
        """Compare users by nick
118

119
        Since nicks are unique this works for exactly one irc server.
120
        """
121
        if not isinstance(other, self.__class__):
5✔
122
            return False
×
123
        return self.nick == other.nick
5✔
124

125

126
@irc3.plugin
5✔
127
class UsersPlugin(object):
5✔
128
    """User management plugin for OneBot
129

130
    Doesn't do anything with NAMES because we can't get hosts through
131
    NAMES
132

133
    Configuration settings:
134
        - ``identify_by``: the identification method
135

136
    Identification methods available:
137
        - ``mask``: Use the hostmask
138
        - ``whatcd``: Get the what.cd username from the host mask
139
        - ``nickserv``: Parse nickserv info from ``WHOIS``.
140
    """
141

142
    requires = ["irc3.plugins.storage", "irc3.plugins.asynchronious"]
5✔
143

144
    def __init__(self, bot: irc3.IrcBot):
5✔
145
        """Initialises the plugin"""
146
        self.bot = bot
5✔
147
        config = bot.config.get(__name__, {})
5✔
148
        method = config.get("identify_by", "mask")
5✔
149
        if method not in ("mask", "nickserv", "whatcd"):
5✔
150
            raise Exception(
×
151
                "Invalid configuration: UsersPlugin.identifying_method invalid"
152
            )
153
        self.identifying_method: Literal["mask", "nickserv", "whatcd"] = method
5✔
154
        self.log = bot.log.getChild(__name__)
5✔
155
        self.connection_lost()
5✔
156

157
    @irc3.extend
5✔
158
    def get_user(self, nick: str):
5✔
159
        user = self.active_users.get(nick)
5✔
160
        if not user:
5✔
161
            self.log.warning("Couldn't find %s!", nick)
5✔
162
        return user
5✔
163

164
    @irc3.event(irc3.rfc.JOIN_PART_QUIT)
5✔
165
    def on_join_part_quit(self, mask: IrcString, **kwargs):
5✔
166
        event = kwargs["event"]
5✔
167
        self.log.debug("%s %sed", mask.nick, event.lower())
5✔
168
        getattr(self, event.lower())(mask.nick, mask, **kwargs)
5✔
169

170
    @irc3.event(irc3.rfc.KICK)
5✔
171
    def on_kick(self, mask: IrcString, target: IrcString, **kwargs):
5✔
172
        self.log.debug("%s kicked %s", mask.nick, target.nick)
5✔
173
        self.part(target.nick, target, **kwargs)
5✔
174

175
    @irc3.event(irc3.rfc.NEW_NICK)
5✔
176
    def on_new_nick(self, nick: IrcString, new_nick: IrcString, **kwargs):
5✔
177
        self.log.debug("%s renamed to %s", nick.nick, new_nick)
5✔
178
        if nick.nick in self.active_users:
5✔
179
            user = self.active_users[nick.nick]
5✔
180
            user.nick = new_nick
5✔
181
            del self.active_users[nick.nick]
5✔
182
            self.active_users[new_nick] = user
5✔
183

184
    @irc3.event(irc3.rfc.PRIVMSG)
5✔
185
    def on_privmsg(
5✔
186
        self,
187
        mask: IrcString,
188
        event: Literal["PRIVMSG", "NOTICE"],
189
        target: IrcString,
190
        data=None,
191
    ):
192
        if target not in self.channels:
5✔
193
            return
5✔
194
        if mask.is_nick and mask.nick not in self.active_users:
5✔
195
            self.log.debug("Found user %s via PRIVMSG", mask.nick)
5✔
196
            self.active_users[mask.nick] = self.create_user(mask, [target])
5✔
197
        else:
198
            self.active_users[mask.nick].join(target)
5✔
199

200
    def connection_lost(self):
5✔
201
        self.channels = set()
5✔
202
        self.active_users = dict()
5✔
203

204
    def join(self, nick: IrcString, mask: IrcString, channel: IrcString, **kwargs):
5✔
205
        self.log.debug("%s joined channel %s", nick, channel)
5✔
206
        # This can only be observed if we're in that channel
207
        self.channels.add(channel)
5✔
208
        if nick == self.bot.nick:
5✔
209
            self.bot.send("WHO {}".format(channel))
5✔
210

211
        if nick not in self.active_users:
5✔
212
            self.active_users[nick] = self.create_user(mask, [channel])
5✔
213

214
        self.active_users[nick].join(channel)
5✔
215

216
    def quit(self, nick, _mask, **kwargs):
5✔
217
        if nick == self.bot.nick:
5✔
218
            self.connection_lost()
5✔
219

220
        if nick in self.active_users:
5✔
221
            del self.active_users[nick]
5✔
222

223
    def part(self, nick, mask, channel=None, **kwargs):
5✔
224
        if nick == self.bot.nick:
5✔
225
            self.log.info("%s left %s by %s", nick, channel, kwargs["event"])
5✔
226
            for n, user in self.active_users.copy().items():
5✔
227
                user.part(channel)
5✔
228
                if not user.still_in_channels():
5✔
229
                    del self.active_users[n]
5✔
230
            # Remove channel from administration
231
            self.channels.remove(channel)
5✔
232

233
        if nick not in self.active_users:
5✔
234
            return
5✔
235

236
        self.active_users[nick].part(channel)
5✔
237
        if not self.active_users[nick].still_in_channels():
5✔
238
            self.log.debug("Lost %s out of sight", mask.nick)
5✔
239
            del self.active_users[nick]
5✔
240

241
    @irc3.event(irc3.rfc.RPL_NAMREPLY)
5✔
242
    def on_names(self, channel: IrcString, data: str, **kwargs):
5✔
243
        """Initialise channel list and channel.modes"""
244
        # possible modes from server
245
        statusmsg = self.bot.server_config["STATUSMSG"]
×
246
        nicknames = data.split(" ")
×
247
        if channel not in self.channels:
×
248
            self.log.warning("I got NAMES for a channel I'm not in: %", channel)
×
249
            return
×
250
        for item in nicknames:
×
251
            nick = item.strip(statusmsg)
×
252
            if nick not in self.active_users:
×
253
                # We don't have the mask here, so skip setting up the user
254
                continue
×
255
            self.active_users[nick].join(channel)
×
256

257
    @irc3.event(irc3.rfc.RPL_WHOREPLY)
5✔
258
    def on_who(
5✔
259
        self,
260
        channel: IrcString,
261
        nick: IrcString,
262
        username=None,
263
        host=None,
264
        server=None,
265
        **kwargs,
266
    ):
267
        """Process a WHO reply since it could contain new information.
268

269
        Should only be processed for channels we are currently in!
270
        """
271
        if channel not in self.channels:
5✔
272
            self.log.debug(
5✔
273
                "Got WHO for channel I'm not in: {chan}".format(chan=channel)
274
            )
275
            return
5✔
276

277
        self.log.debug("Got WHO for %s: %s (%s)", channel, nick, host)
5✔
278

279
        if nick not in self.active_users:
5✔
280
            mask = IrcString("{}!{}@{}".format(nick, username, host))
5✔
281
            self.active_users[nick] = self.create_user(mask, [channel])
5✔
282
        else:
283
            self.active_users[nick].join(channel)
5✔
284

285
    def create_user(self, mask: IrcString, channels: Iterable[str | IrcString]):
5✔
286
        """Return a User object"""
287
        if self.identifying_method == "mask":
5✔
288

289
            async def mask_id_func() -> str:
5✔
290
                assert mask.host is not None
5✔
291
                return mask.host
5✔
292

293
            return User(mask, channels, mask_id_func, self.bot.db)
5✔
294
        if self.identifying_method == "nickserv":
5✔
295

296
            async def get_account() -> str:
5✔
297
                assert mask.nick is not None
5✔
298
                user = self.get_user(mask.nick)
5✔
299
                if hasattr(user, "account"):
5✔
300
                    return user.account
5✔
301
                result = await self.bot.async_cmds.whois(mask.nick)
5✔
302
                if result["success"] and "account" in result:
5✔
303
                    user.account = str(result["account"])
5✔
304
                    return user.account
5✔
305
                else:
306
                    assert mask.host is not None
×
307
                    return mask.host
×
308

309
            return User(mask, channels, get_account, self.bot.db)
5✔
310
        if self.identifying_method == "whatcd":
5✔
311

312
            async def id_func():
5✔
313
                assert mask.host is not None
5✔
314
                match = re.match(r"^\d+@(.*)\.\w+\.what\.cd", mask.host.lower())
5✔
315
                if match:
5✔
316
                    return match.group(1)
5✔
317
                else:
318
                    self.log.debug(
×
319
                        "Failed to extract what.cd user namefrom {mask}".format(
320
                            mask=mask
321
                        )
322
                    )
323
                    return mask.host
×
324

325
            return User(mask, channels, id_func, self.bot.db)
5✔
326
        else:  # pragma: no cover
327
            raise ValueError("A valid identifying method should be configured")
328

329
    @classmethod
330
    def reload(cls, old: Self) -> Self:  # pragma: no cover
331
        users = old.active_users
332
        newinstance = cls(old.bot)
333
        for user in users.values():
334
            user.database = newinstance.bot.db
335
        newinstance.channels = old.channels
336
        newinstance.active_users = users
337
        return newinstance
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc