• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VedaWebProject / Tekst / 15961202044

30 Jun 2025 12:41AM UTC coverage: 100.0%. First build
15961202044

Pull #1050

github

web-flow
Merge 707a7465b into 154e7a1d1
Pull Request #1050: Update axllent/mailpit Docker tag to v1.27

4256 of 4256 relevant lines covered (100.0%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

100.0
/Tekst-API/tekst/auth.py
1
import contextlib
1✔
2
import re
1✔
3

4
from collections.abc import Callable
1✔
5
from typing import Annotated, Any
1✔
6

7
import fastapi_users.models as fapi_users_models
1✔
8

9
from beanie import Document, PydanticObjectId
1✔
10
from beanie.operators import In, Pull
1✔
11
from fastapi import (
1✔
12
    APIRouter,
13
    Depends,
14
    FastAPI,
15
    HTTPException,
16
    Request,
17
    Response,
18
    status,
19
)
20
from fastapi_users import (
1✔
21
    BaseUserManager,
22
    FastAPIUsers,
23
    InvalidPasswordException,
24
)
25
from fastapi_users.authentication import (
1✔
26
    AuthenticationBackend,
27
    BearerTransport,
28
    CookieTransport,
29
    JWTStrategy,
30
)
31
from fastapi_users.authentication.strategy.db import (
1✔
32
    AccessTokenDatabase,
33
    DatabaseStrategy,
34
)
35
from fastapi_users_db_beanie import (
1✔
36
    UP_BEANIE,
37
    BeanieUserDatabase,
38
    ObjectIDIDMixin,
39
)
40
from fastapi_users_db_beanie.access_token import (
1✔
41
    BeanieAccessTokenDatabase,
42
    BeanieBaseAccessToken,
43
)
44
from humps import decamelize
1✔
45

46
from tekst.config import TekstConfig, get_config
1✔
47
from tekst.logs import log
1✔
48
from tekst.models.content import ContentBaseDocument
1✔
49
from tekst.models.message import UserMessageDocument
1✔
50
from tekst.models.resource import ResourceBaseDocument
1✔
51
from tekst.models.user import UserCreate, UserDocument, UserRead, UserUpdate
1✔
52
from tekst.notifications import (
1✔
53
    TemplateIdentifier,
54
    broadcast_admin_notification,
55
    send_notification,
56
)
57

58

59
_cfg: TekstConfig = get_config()
1✔
60

61

62
class AccessTokenDocument(BeanieBaseAccessToken, Document):
1✔
63
    class Settings(BeanieBaseAccessToken.Settings):
1✔
64
        name = "tokens"
1✔
65

66

67
_cookie_transport = CookieTransport(
1✔
68
    cookie_name=_cfg.security.auth_cookie_name,
69
    cookie_max_age=_cfg.security.auth_cookie_lifetime or None,
70
    cookie_domain=_cfg.security.auth_cookie_domain or None,
71
    cookie_path=_cfg.api_path or "/",
72
    cookie_secure=not _cfg.dev_mode,
73
    cookie_httponly=True,
74
    cookie_samesite="Lax",
75
)
76

77
_bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
1✔
78

79

80
class CustomBeanieUserDatabase(BeanieUserDatabase):
1✔
81
    # This class is necessary to make our model logic work with FastAPI-Users :(
82

83
    async def create(self, create_dict: dict[str, Any]) -> UP_BEANIE:
1✔
84
        """Create a user."""
85
        return await super().create(decamelize(create_dict))
1✔
86

87
    async def update(self, user: UP_BEANIE, update_dict: dict[str, Any]) -> UP_BEANIE:
1✔
88
        """Update a user."""
89
        return await super().update(user, decamelize(update_dict))
1✔
90

91

92
async def get_user_db():
1✔
93
    yield CustomBeanieUserDatabase(UserDocument)
1✔
94

95

96
async def get_access_token_db():
1✔
97
    yield BeanieAccessTokenDatabase(AccessTokenDocument)
1✔
98

99

100
def _get_database_strategy(
1✔
101
    access_token_db: AccessTokenDatabase[AccessTokenDocument] = Depends(
102
        get_access_token_db
103
    ),
104
) -> DatabaseStrategy:
105
    return DatabaseStrategy(
1✔
106
        access_token_db,
107
        lifetime_seconds=_cfg.security.access_token_lifetime,
108
    )
109

110

111
def _get_jwt_strategy() -> JWTStrategy:
1✔
112
    return JWTStrategy(
1✔
113
        secret=_cfg.security.secret,
114
        lifetime_seconds=_cfg.security.auth_jwt_lifetime,
115
        token_audience="tekst:jwt",
116
    )
117

118

119
_auth_backend_cookie = AuthenticationBackend(
1✔
120
    name="cookie",
121
    transport=_cookie_transport,
122
    get_strategy=_get_database_strategy,
123
)
124

125
_auth_backend_jwt = AuthenticationBackend(
1✔
126
    name="jwt",
127
    transport=_bearer_transport,
128
    get_strategy=_get_jwt_strategy,
129
)
130

131

132
async def _get_enabled_backends() -> list[AuthenticationBackend]:
1✔
133
    """Returns the enabled backends following custom logic"""
134
    enabled_backends = []
1✔
135
    if _cfg.security.enable_cookie_auth:
1✔
136
        enabled_backends.append(_auth_backend_cookie)
1✔
137
    if _cfg.security.enable_jwt_auth:
1✔
138
        enabled_backends.append(_auth_backend_jwt)
1✔
139
    return enabled_backends
1✔
140

141

142
def _validate_required_password_chars(password: str):
1✔
143
    return (
1✔
144
        re.search(r"[a-z]", password)
145
        and re.search(r"[A-Z]", password)
146
        and re.search(r"[0-9]", password)
147
    )
148

149

150
class UserManager(ObjectIDIDMixin, BaseUserManager[UserDocument, PydanticObjectId]):
1✔
151
    reset_password_token_secret = _cfg.security.secret
1✔
152
    verification_token_secret = _cfg.security.secret
1✔
153
    reset_password_token_lifetime_seconds = _cfg.security.reset_pw_token_lifetime
1✔
154
    verification_token_lifetime_seconds = _cfg.security.verification_token_lifetime
1✔
155
    reset_password_token_audience = "tekst:reset"
1✔
156
    verification_token_audience = "tekst:verify"
1✔
157

158
    async def on_after_register(
1✔
159
        self, user: UserDocument, request: Request | None = None
160
    ):
161
        if not _cfg.security.users_active_by_default and not user.is_active:
1✔
162
            await broadcast_admin_notification(
1✔
163
                TemplateIdentifier.EMAIL_USER_AWAITS_ACTIVATION,
164
                username=user.username,
165
                name=user.name,
166
                affiliation=user.affiliation,
167
            )
168

169
    async def on_after_update(
1✔
170
        self,
171
        user: UserDocument,
172
        update_dict: dict[str, Any],
173
        request: Request | None = None,
174
    ):
175
        if "is_active" in update_dict:
1✔
176
            if update_dict.get("is_active"):
1✔
177
                await send_notification(user, TemplateIdentifier.EMAIL_ACTIVATED)
1✔
178
            else:
179
                await send_notification(user, TemplateIdentifier.EMAIL_DEACTIVATED)
1✔
180
        if "is_superuser" in update_dict:
1✔
181
            if update_dict.get("is_superuser"):
1✔
182
                await send_notification(user, TemplateIdentifier.EMAIL_SUPERUSER_SET)
1✔
183
            else:
184
                await send_notification(user, TemplateIdentifier.EMAIL_SUPERUSER_UNSET)
1✔
185
        if "password" in update_dict:
1✔
186
            await send_notification(
1✔
187
                user,
188
                TemplateIdentifier.EMAIL_PASSWORD_RESET,
189
            )
190

191
    async def on_after_login(
1✔
192
        self,
193
        user: UserDocument,
194
        request: Request | None = None,
195
        response: Response | None = None,
196
    ):
197
        if not user.seen:
1✔
198
            if user.seen is None:
1✔
199
                user.seen = False
1✔
200
            else:
201
                user.seen = True
1✔
202
            await user.replace()
1✔
203

204
    async def on_after_request_verify(
1✔
205
        self, user: UserDocument, token: str, request: Request | None = None
206
    ):
207
        await send_notification(  # pragma: no cover
208
            user,
209
            TemplateIdentifier.EMAIL_VERIFY,
210
            token=token,
211
            token_lifetime_hours=int(
212
                _cfg.security.verification_token_lifetime / 60 / 60
213
            ),
214
        )
215

216
    async def on_after_verify(self, user: UserDocument, request: Request | None = None):
1✔
217
        await send_notification(
218
            user, TemplateIdentifier.EMAIL_VERIFIED
219
        )  # pragma: no cover
220

221
    async def on_after_forgot_password(
1✔
222
        self, user: UserDocument, token: str, request: Request | None = None
223
    ):
224
        await send_notification(
1✔
225
            user,
226
            TemplateIdentifier.EMAIL_PASSWORD_FORGOT,
227
            token=token,
228
            token_lifetime_hours=int(_cfg.security.reset_pw_token_lifetime / 60 / 60),
229
        )
230

231
    async def on_after_reset_password(
1✔
232
        self, user: UserDocument, request: Request | None = None
233
    ):
234
        await send_notification(  # pragma: no cover
235
            user,
236
            TemplateIdentifier.EMAIL_PASSWORD_RESET,
237
        )
238

239
    async def on_before_delete(
1✔
240
        self, user: UserDocument, request: Request | None = None
241
    ):
242
        # find owned resources
243
        resources_docs = await ResourceBaseDocument.find(
1✔
244
            ResourceBaseDocument.owner_id == user.id,
245
            with_children=True,
246
        ).to_list()
247
        owned_resources_ids = [resource.id for resource in resources_docs]
1✔
248

249
        # delete contents of owned resources
250
        await ContentBaseDocument.find(
1✔
251
            In(ContentBaseDocument.resource_id, owned_resources_ids),
252
            with_children=True,
253
        ).delete()
254

255
        # delete owned resources
256
        await ResourceBaseDocument.find_one(
1✔
257
            In(ResourceBaseDocument.id, owned_resources_ids),
258
            with_children=True,
259
        ).delete()
260

261
        # remove user ID from resource shares
262
        await ResourceBaseDocument.find(
1✔
263
            ResourceBaseDocument.shared_read == user.id,
264
            with_children=True,
265
        ).update(
266
            Pull(ResourceBaseDocument.shared_read == user.id),
267
        )
268
        await ResourceBaseDocument.find(
1✔
269
            ResourceBaseDocument.shared_write == user.id,
270
            with_children=True,
271
        ).update(
272
            Pull(ResourceBaseDocument.shared_write == user.id),
273
        )
274

275
        # delete user messages sent by user
276
        await UserMessageDocument.find(UserMessageDocument.sender == user.id).delete()
1✔
277

278
    async def on_after_delete(self, user: UserDocument, request: Request | None = None):
1✔
279
        await send_notification(
1✔
280
            user,
281
            TemplateIdentifier.EMAIL_DELETED,
282
        )
283
        pass
1✔
284

285
    async def validate_password(
1✔
286
        self,
287
        password: str,
288
        user: UserCreate | UserDocument,
289
    ) -> None:
290
        # validate length
291
        if len(password) < 8:
1✔
292
            raise InvalidPasswordException(
1✔
293
                reason="Password should be at least 8 characters long"
294
            )
295
        # validate characters
296
        if not _validate_required_password_chars(password):
1✔
297
            raise InvalidPasswordException(
1✔
298
                reason="Password must contain at least one of each a-z, A-Z, 0-9"
299
            )
300
        # check if password contains email
301
        if user.email.lower() in password.lower():
1✔
302
            raise InvalidPasswordException(
1✔
303
                reason="Password should not contain e-mail address"
304
            )
305

306
    async def create(self, user_create, **kwargs) -> fapi_users_models.UP:
1✔
307
        """
308
        Overrides FastAPI-User's BaseUserManager's create method to check if the
309
        username already exists and respond with a meaningful HTTP exception.
310
        """
311
        if await UserDocument.find_one(
1✔
312
            UserDocument.username == user_create.username
313
        ).exists():
314
            # We're not using the prepared errors from tekst.errors because
315
            # the auth-related endpoints from fastapi-users have a different error model
316
            # that we want to follow here.
317
            raise HTTPException(
1✔
318
                status_code=status.HTTP_400_BAD_REQUEST,
319
                detail="REGISTER_USERNAME_ALREADY_EXISTS",
320
            )
321
        return await super().create(user_create, **kwargs)
1✔
322

323

324
async def get_user_manager(user_db=Depends(get_user_db)):
1✔
325
    yield UserManager(user_db)
1✔
326

327

328
_fastapi_users = FastAPIUsers[UserDocument, PydanticObjectId](
1✔
329
    get_user_manager,
330
    [
331
        _auth_backend_cookie,
332
        _auth_backend_jwt,
333
    ],
334
)
335

336

337
def setup_auth_routes(app: FastAPI) -> list[APIRouter]:
1✔
338
    # cookie auth
339
    if _cfg.security.enable_cookie_auth:
1✔
340
        app.include_router(
1✔
341
            _fastapi_users.get_auth_router(
342
                _auth_backend_cookie,
343
                requires_verification=not _cfg.security.closed_mode,
344
            ),
345
            prefix="/auth/cookie",
346
            tags=["auth"],
347
        )
348
    # jwt auth
349
    if _cfg.security.enable_jwt_auth:
1✔
350
        app.include_router(
1✔
351
            _fastapi_users.get_auth_router(
352
                _auth_backend_jwt,
353
                requires_verification=not _cfg.security.closed_mode,
354
            ),
355
            prefix="/auth/jwt",
356
            tags=["auth"],
357
        )
358
    # register
359
    app.include_router(
1✔
360
        _fastapi_users.get_register_router(UserRead, UserCreate),
361
        prefix="/auth",
362
        tags=["auth"],
363
        dependencies=[Depends(get_current_superuser)]
364
        if _cfg.security.closed_mode
365
        else [],
366
    )
367
    # verify
368
    if not _cfg.security.closed_mode:
1✔
369
        app.include_router(
1✔
370
            _fastapi_users.get_verify_router(UserRead),
371
            prefix="/auth",
372
            tags=["auth"],
373
        )
374
    # reset pw
375
    app.include_router(
1✔
376
        _fastapi_users.get_reset_password_router(),
377
        prefix="/auth",
378
        tags=["auth"],
379
    )
380
    # users
381
    app.include_router(
1✔
382
        _fastapi_users.get_users_router(
383
            UserRead,
384
            UserUpdate,
385
            requires_verification=not _cfg.security.closed_mode,
386
        ),
387
        prefix="/users",
388
        tags=["users"],
389
    )
390

391

392
def _current_user(**kwargs) -> Callable:
1✔
393
    """Returns auth dependencies for API routes (optional auth in dev mode)"""
394
    return _fastapi_users.current_user(
1✔
395
        get_enabled_backends=_get_enabled_backends,
396
        **kwargs,
397
    )
398

399

400
# auth dependencies for API routes
401
get_current_user = _current_user(
1✔
402
    verified=not _cfg.security.closed_mode,
403
    active=True,
404
)
405
get_current_superuser = _current_user(
1✔
406
    verified=not _cfg.security.closed_mode,
407
    active=True,
408
    superuser=True,
409
)
410
get_current_optional_user = _current_user(
1✔
411
    optional=True,
412
    verified=not _cfg.security.closed_mode,
413
    active=True,
414
)
415
UserDep = Annotated[UserRead, Depends(get_current_user)]
1✔
416
SuperuserDep = Annotated[UserRead, Depends(get_current_superuser)]
1✔
417
OptionalUserDep = Annotated[UserRead | None, Depends(get_current_optional_user)]
1✔
418

419

420
async def _create_user(user: UserCreate) -> UserRead:
1✔
421
    """
422
    Creates/registers a new user programmatically
423
    """
424
    get_user_db_context = contextlib.asynccontextmanager(get_user_db)
1✔
425
    get_user_manager_context = contextlib.asynccontextmanager(get_user_manager)
1✔
426
    async with get_user_db_context() as user_db:  # noqa: SIM117
1✔
427
        async with get_user_manager_context(user_db) as user_manager:
1✔
428
            return await user_manager.create(user, safe=False)
1✔
429

430

431
async def create_initial_superuser(cfg: TekstConfig = _cfg):
1✔
432
    if cfg.dev_mode:
1✔
433
        return
1✔
434
    log.info("Creating initial superuser account...")
1✔
435
    # check if user collection contains users, abort if so
436
    if await UserDocument.find_one().exists():  # pragma: no cover
437
        log.warning(
438
            "User collection already contains users. "
439
            f"Skipping creation of inital admin {cfg.security.init_admin_email}."
440
        )
441
        return
442
    # check if initial admin account is properly configured
443
    if (
444
        not cfg.security.init_admin_email or not cfg.security.init_admin_password
445
    ):  # pragma: no cover
446
        log.warning("No initial admin account configured, skipping creation.")
447
        return
448
    # create inital admin account
449
    user = UserCreate(
1✔
450
        email=cfg.security.init_admin_email,
451
        password=cfg.security.init_admin_password,
452
        username="admin",
453
        name="Admin Admin",
454
        affiliation="Admin",
455
    )
456
    user.is_active = True
1✔
457
    user.is_verified = True
1✔
458
    user.is_superuser = True
1✔
459
    await _create_user(user)
1✔
460
    log.warning(
1✔
461
        f"Created initial admin account for email {cfg.security.init_admin_email}. "
462
        "PLEASE CHANGE ITS PASSWORD ASAP!"
463
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc