• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

inventree / InvenTree / 4268417976

pending completion
4268417976

push

github

GitHub
Bugfix for auto-backup task (#4406) (#4414)

14 of 14 new or added lines in 5 files covered. (100.0%)

24989 of 28483 relevant lines covered (87.73%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.18
/InvenTree/InvenTree/tasks.py
1
"""Functions for tasks and a few general async tasks."""
2

3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import random
1✔
7
import re
1✔
8
import time
1✔
9
import warnings
1✔
10
from dataclasses import dataclass
1✔
11
from datetime import datetime, timedelta
1✔
12
from typing import Callable, List
1✔
13

14
from django.conf import settings
1✔
15
from django.core import mail as django_mail
1✔
16
from django.core.exceptions import AppRegistryNotReady
1✔
17
from django.core.management import call_command
1✔
18
from django.db.utils import OperationalError, ProgrammingError
1✔
19
from django.utils import timezone
1✔
20

21
import requests
1✔
22

23
logger = logging.getLogger("inventree")
1✔
24

25

26
def schedule_task(taskname, **kwargs):
1✔
27
    """Create a scheduled task.
28

29
    If the task has already been scheduled, ignore!
30
    """
31
    # If unspecified, repeat indefinitely
32
    repeats = kwargs.pop('repeats', -1)
1✔
33
    kwargs['repeats'] = repeats
1✔
34

35
    try:
1✔
36
        from django_q.models import Schedule
1✔
37
    except AppRegistryNotReady:  # pragma: no cover
38
        logger.info("Could not start background tasks - App registry not ready")
39
        return
40

41
    try:
1✔
42
        # If this task is already scheduled, don't schedule it again
43
        # Instead, update the scheduling parameters
44
        if Schedule.objects.filter(func=taskname).exists():
1✔
45
            logger.debug(f"Scheduled task '{taskname}' already exists - updating!")
1✔
46

47
            Schedule.objects.filter(func=taskname).update(**kwargs)
1✔
48
        else:
49
            logger.info(f"Creating scheduled task '{taskname}'")
1✔
50

51
            Schedule.objects.create(
1✔
52
                name=taskname,
53
                func=taskname,
54
                **kwargs
55
            )
56
    except (OperationalError, ProgrammingError):  # pragma: no cover
57
        # Required if the DB is not ready yet
58
        pass
59

60

61
def raise_warning(msg):
1✔
62
    """Log and raise a warning."""
63
    logger.warning(msg)
1✔
64

65
    # If testing is running raise a warning that can be asserted
66
    if settings.TESTING:
1✔
67
        warnings.warn(msg)
1✔
68

69

70
def offload_task(taskname, *args, force_async=False, force_sync=False, **kwargs):
1✔
71
    """Create an AsyncTask if workers are running. This is different to a 'scheduled' task, in that it only runs once!
72

73
    If workers are not running or force_sync flag
74
    is set then the task is ran synchronously.
75
    """
76
    try:
1✔
77
        import importlib
1✔
78

79
        from django_q.tasks import AsyncTask
1✔
80

81
        from InvenTree.status import is_worker_running
1✔
82
    except AppRegistryNotReady:  # pragma: no cover
83
        logger.warning(f"Could not offload task '{taskname}' - app registry not ready")
84
        return
85
    except (OperationalError, ProgrammingError):  # pragma: no cover
86
        raise_warning(f"Could not offload task '{taskname}' - database not ready")
87

88
    if force_async or (is_worker_running() and not force_sync):
1✔
89
        # Running as asynchronous task
90
        try:
1✔
91
            task = AsyncTask(taskname, *args, **kwargs)
1✔
92
            task.run()
1✔
93
        except ImportError:
×
94
            raise_warning(f"WARNING: '{taskname}' not started - Function not found")
×
95
    else:
96

97
        if callable(taskname):
1✔
98
            # function was passed - use that
99
            _func = taskname
1✔
100
        else:
101
            # Split path
102
            try:
1✔
103
                app, mod, func = taskname.split('.')
1✔
104
                app_mod = app + '.' + mod
1✔
105
            except ValueError:
1✔
106
                raise_warning(f"WARNING: '{taskname}' not started - Malformed function path")
1✔
107
                return
1✔
108

109
            # Import module from app
110
            try:
1✔
111
                _mod = importlib.import_module(app_mod)
1✔
112
            except ModuleNotFoundError:
1✔
113
                raise_warning(f"WARNING: '{taskname}' not started - No module named '{app_mod}'")
1✔
114
                return
1✔
115

116
            # Retrieve function
117
            try:
1✔
118
                _func = getattr(_mod, func)
1✔
119
            except AttributeError:  # pragma: no cover
120
                # getattr does not work for local import
121
                _func = None
122

123
            try:
1✔
124
                if not _func:
1✔
125
                    _func = eval(func)  # pragma: no cover
126
            except NameError:
1✔
127
                raise_warning(f"WARNING: '{taskname}' not started - No function named '{func}'")
1✔
128
                return
1✔
129

130
        # Workers are not running: run it as synchronous task
131
        _func(*args, **kwargs)
1✔
132

133

134
@dataclass()
1✔
135
class ScheduledTask:
1✔
136
    """A scheduled task.
137

138
    - interval: The interval at which the task should be run
139
    - minutes: The number of minutes between task runs
140
    - func: The function to be run
141
    """
142

143
    func: Callable
1✔
144
    interval: str
1✔
145
    minutes: int = None
1✔
146

147
    MINUTES = "I"
1✔
148
    HOURLY = "H"
1✔
149
    DAILY = "D"
1✔
150
    WEEKLY = "W"
1✔
151
    MONTHLY = "M"
1✔
152
    QUARTERLY = "Q"
1✔
153
    YEARLY = "Y"
1✔
154
    TYPE = [MINUTES, HOURLY, DAILY, WEEKLY, MONTHLY, QUARTERLY, YEARLY]
1✔
155

156

157
class TaskRegister:
1✔
158
    """Registery for periodicall tasks."""
159
    task_list: List[ScheduledTask] = []
1✔
160

161
    def register(self, task, schedule, minutes: int = None):
1✔
162
        """Register a task with the que."""
163
        self.task_list.append(ScheduledTask(task, schedule, minutes))
1✔
164

165

166
tasks = TaskRegister()
1✔
167

168

169
def scheduled_task(interval: str, minutes: int = None, tasklist: TaskRegister = None):
1✔
170
    """Register the given task as a scheduled task.
171

172
    Example:
173
    ```python
174
    @register(ScheduledTask.DAILY)
175
    def my_custom_funciton():
176
        ...
177
    ```
178

179
    Args:
180
        interval (str): The interval at which the task should be run
181
        minutes (int, optional): The number of minutes between task runs. Defaults to None.
182
        tasklist (TaskRegister, optional): The list the tasks should be registered to. Defaults to None.
183

184
    Raises:
185
        ValueError: If decorated object is not callable
186
        ValueError: If interval is not valid
187

188
    Returns:
189
        _type_: _description_
190
    """
191

192
    def _task_wrapper(admin_class):
1✔
193
        if not isinstance(admin_class, Callable):
1✔
194
            raise ValueError('Wrapped object must be a function')
×
195

196
        if interval not in ScheduledTask.TYPE:
1✔
197
            raise ValueError(f'Invalid interval. Must be one of {ScheduledTask.TYPE}')
×
198

199
        _tasks = tasklist if tasklist else tasks
1✔
200
        _tasks.register(admin_class, interval, minutes=minutes)
1✔
201

202
        return admin_class
1✔
203
    return _task_wrapper
1✔
204

205

206
@scheduled_task(ScheduledTask.MINUTES, 5)
1✔
207
def heartbeat():
1✔
208
    """Simple task which runs at 5 minute intervals, so we can determine that the background worker is actually running.
209

210
    (There is probably a less "hacky" way of achieving this)?
211
    """
212
    try:
1✔
213
        from django_q.models import Success
1✔
214
    except AppRegistryNotReady:  # pragma: no cover
215
        logger.info("Could not perform heartbeat task - App registry not ready")
216
        return
217

218
    threshold = timezone.now() - timedelta(minutes=30)
1✔
219

220
    # Delete heartbeat results more than half an hour old,
221
    # otherwise they just create extra noise
222
    heartbeats = Success.objects.filter(
1✔
223
        func='InvenTree.tasks.heartbeat',
224
        started__lte=threshold
225
    )
226

227
    heartbeats.delete()
1✔
228

229

230
@scheduled_task(ScheduledTask.DAILY)
1✔
231
def delete_successful_tasks():
1✔
232
    """Delete successful task logs which are older than a specified period"""
233
    try:
1✔
234
        from django_q.models import Success
1✔
235

236
        from common.models import InvenTreeSetting
1✔
237

238
        days = InvenTreeSetting.get_setting('INVENTREE_DELETE_TASKS_DAYS', 30)
1✔
239
        threshold = timezone.now() - timedelta(days=days)
1✔
240

241
        # Delete successful tasks
242
        results = Success.objects.filter(
1✔
243
            started__lte=threshold
244
        )
245

246
        if results.count() > 0:
1✔
247
            logger.info(f"Deleting {results.count()} successful task records")
1✔
248
            results.delete()
1✔
249

250
    except AppRegistryNotReady:  # pragma: no cover
251
        logger.info("Could not perform 'delete_successful_tasks' - App registry not ready")
252

253

254
@scheduled_task(ScheduledTask.DAILY)
1✔
255
def delete_failed_tasks():
1✔
256
    """Delete failed task logs which are older than a specified period"""
257

258
    try:
×
259
        from django_q.models import Failure
×
260

261
        from common.models import InvenTreeSetting
×
262

263
        days = InvenTreeSetting.get_setting('INVENTREE_DELETE_TASKS_DAYS', 30)
×
264
        threshold = timezone.now() - timedelta(days=days)
×
265

266
        # Delete failed tasks
267
        results = Failure.objects.filter(
×
268
            started__lte=threshold
269
        )
270

271
        if results.count() > 0:
×
272
            logger.info(f"Deleting {results.count()} failed task records")
×
273
            results.delete()
×
274

275
    except AppRegistryNotReady:  # pragma: no cover
276
        logger.info("Could not perform 'delete_failed_tasks' - App registry not ready")
277

278

279
@scheduled_task(ScheduledTask.DAILY)
1✔
280
def delete_old_error_logs():
1✔
281
    """Delete old error logs from the server."""
282
    try:
1✔
283
        from error_report.models import Error
1✔
284

285
        from common.models import InvenTreeSetting
1✔
286

287
        days = InvenTreeSetting.get_setting('INVENTREE_DELETE_ERRORS_DAYS', 30)
1✔
288
        threshold = timezone.now() - timedelta(days=days)
1✔
289

290
        errors = Error.objects.filter(
1✔
291
            when__lte=threshold,
292
        )
293

294
        if errors.count() > 0:
1✔
295
            logger.info(f"Deleting {errors.count()} old error logs")
1✔
296
            errors.delete()
1✔
297

298
    except AppRegistryNotReady:  # pragma: no cover
299
        # Apps not yet loaded
300
        logger.info("Could not perform 'delete_old_error_logs' - App registry not ready")
301

302

303
@scheduled_task(ScheduledTask.DAILY)
1✔
304
def delete_old_notifications():
1✔
305
    """Delete old notification logs"""
306

307
    try:
×
308
        from common.models import (InvenTreeSetting, NotificationEntry,
×
309
                                   NotificationMessage)
310

311
        days = InvenTreeSetting.get_setting('INVENTREE_DELETE_NOTIFICATIONS_DAYS', 30)
×
312
        threshold = timezone.now() - timedelta(days=days)
×
313

314
        items = NotificationEntry.objects.filter(
×
315
            updated__lte=threshold
316
        )
317

318
        if items.count() > 0:
×
319
            logger.info(f"Deleted {items.count()} old notification entries")
×
320
            items.delete()
×
321

322
        items = NotificationMessage.objects.filter(
×
323
            creation__lte=threshold
324
        )
325

326
        if items.count() > 0:
×
327
            logger.info(f"Deleted {items.count()} old notification messages")
×
328
            items.delete()
×
329

330
    except AppRegistryNotReady:
×
331
        logger.info("Could not perform 'delete_old_notifications' - App registry not ready")
×
332

333

334
@scheduled_task(ScheduledTask.DAILY)
1✔
335
def check_for_updates():
1✔
336
    """Check if there is an update for InvenTree."""
337
    try:
1✔
338
        import common.models
1✔
339
    except AppRegistryNotReady:  # pragma: no cover
340
        # Apps not yet loaded!
341
        logger.info("Could not perform 'check_for_updates' - App registry not ready")
342
        return
343

344
    headers = {}
1✔
345

346
    # If running within github actions, use authentication token
347
    if settings.TESTING:
1✔
348
        token = os.getenv('GITHUB_TOKEN', None)
1✔
349

350
        if token:
1✔
351
            headers['Authorization'] = f"Bearer {token}"
1✔
352

353
    response = requests.get('https://api.github.com/repos/inventree/inventree/releases/latest', headers=headers)
1✔
354

355
    if response.status_code != 200:
1✔
356
        raise ValueError(f'Unexpected status code from GitHub API: {response.status_code}')  # pragma: no cover
357

358
    data = json.loads(response.text)
1✔
359

360
    tag = data.get('tag_name', None)
1✔
361

362
    if not tag:
1✔
363
        raise ValueError("'tag_name' missing from GitHub response")  # pragma: no cover
364

365
    match = re.match(r"^.*(\d+)\.(\d+)\.(\d+).*$", tag)
1✔
366

367
    if len(match.groups()) != 3:  # pragma: no cover
368
        logger.warning(f"Version '{tag}' did not match expected pattern")
369
        return
370

371
    latest_version = [int(x) for x in match.groups()]
1✔
372

373
    if len(latest_version) != 3:
1✔
374
        raise ValueError(f"Version '{tag}' is not correct format")  # pragma: no cover
375

376
    logger.info(f"Latest InvenTree version: '{tag}'")
1✔
377

378
    # Save the version to the database
379
    common.models.InvenTreeSetting.set_setting(
1✔
380
        '_INVENTREE_LATEST_VERSION',
381
        tag,
382
        None
383
    )
384

385

386
@scheduled_task(ScheduledTask.DAILY)
1✔
387
def update_exchange_rates():
1✔
388
    """Update currency exchange rates."""
389
    try:
1✔
390
        from djmoney.contrib.exchange.models import ExchangeBackend, Rate
1✔
391

392
        from common.settings import currency_code_default, currency_codes
1✔
393
        from InvenTree.exchange import InvenTreeExchange
1✔
394
    except AppRegistryNotReady:  # pragma: no cover
395
        # Apps not yet loaded!
396
        logger.info("Could not perform 'update_exchange_rates' - App registry not ready")
397
        return
398
    except Exception:  # pragma: no cover
399
        # Other error?
400
        return
401

402
    # Test to see if the database is ready yet
403
    try:
1✔
404
        backend = ExchangeBackend.objects.get(name='InvenTreeExchange')
1✔
405
    except ExchangeBackend.DoesNotExist:
1✔
406
        pass
1✔
407
    except Exception:  # pragma: no cover
408
        # Some other error
409
        logger.warning("update_exchange_rates: Database not ready")
410
        return
411

412
    backend = InvenTreeExchange()
1✔
413
    logger.info(f"Updating exchange rates from {backend.url}")
1✔
414

415
    base = currency_code_default()
1✔
416

417
    logger.info(f"Using base currency '{base}'")
1✔
418

419
    try:
1✔
420
        backend.update_rates(base_currency=base)
1✔
421

422
        # Remove any exchange rates which are not in the provided currencies
423
        Rate.objects.filter(backend="InvenTreeExchange").exclude(currency__in=currency_codes()).delete()
1✔
424
    except Exception as e:  # pragma: no cover
425
        logger.error(f"Error updating exchange rates: {e}")
426

427

428
@scheduled_task(ScheduledTask.DAILY)
1✔
429
def run_backup():
1✔
430
    """Run the backup command."""
431
    from common.models import InvenTreeSetting
×
432

433
    if not InvenTreeSetting.get_setting('INVENTREE_BACKUP_ENABLE', False, cache=False):
×
434
        # Backups are not enabled - exit early
435
        return
×
436

437
    logger.info("Performing automated database backup task")
×
438

439
    # Sleep a random number of seconds to prevent worker conflict
440
    time.sleep(random.randint(1, 5))
×
441

442
    # Check for records of previous backup attempts
443
    last_attempt = InvenTreeSetting.get_setting('_INVENTREE_BACKUP_ATTEMPT', '', cache=False)
×
444
    last_success = InvenTreeSetting.get_setting('_INVENTREE_BACKUP_SUCCESS', '', cache=False)
×
445

446
    try:
×
447
        backup_n_days = int(InvenTreeSetting.get_setting('_INVENTREE_BACKUP_DAYS', 1, cache=False))
×
448
    except Exception:
×
449
        backup_n_days = 1
×
450

451
    if last_attempt:
×
452
        try:
×
453
            last_attempt = datetime.fromisoformat(last_attempt)
×
454
        except ValueError:
×
455
            last_attempt = None
×
456

457
    if last_attempt:
×
458
        # Do not attempt if the 'last attempt' at backup was within 12 hours
459
        threshold = datetime.now() - timedelta(hours=12)
×
460

461
        if last_attempt > threshold:
×
462
            logger.info('Last backup attempt was too recent - skipping backup operation')
×
463
            return
×
464

465
    # Record the timestamp of most recent backup attempt
466
    InvenTreeSetting.set_setting('_INVENTREE_BACKUP_ATTEMPT', datetime.now().isoformat(), None)
×
467

468
    if not last_attempt:
×
469
        # If there is no record of a previous attempt, exit quickly
470
        # This prevents the backup operation from happening when the server first launches, for example
471
        logger.info("No previous backup attempts recorded - waiting until tomorrow")
×
472
        return
×
473

474
    if last_success:
×
475
        try:
×
476
            last_success = datetime.fromisoformat(last_success)
×
477
        except ValueError:
×
478
            last_success = None
×
479

480
    # Exit early if the backup was successful within the number of required days
481
    if last_success:
×
482
        threshold = datetime.now() - timedelta(days=backup_n_days)
×
483

484
        if last_success > threshold:
×
485
            logger.info('Last successful backup was too recent - skipping backup operation')
×
486
            return
×
487

488
    call_command("dbbackup", noinput=True, clean=True, compress=True, interactive=False)
×
489
    call_command("mediabackup", noinput=True, clean=True, compress=True, interactive=False)
×
490

491
    # Record the timestamp of most recent backup success
492
    InvenTreeSetting.set_setting('_INVENTREE_BACKUP_SUCCESS', datetime.now().isoformat(), None)
×
493

494

495
def send_email(subject, body, recipients, from_email=None, html_message=None):
1✔
496
    """Send an email with the specified subject and body, to the specified recipients list."""
497
    if type(recipients) == str:
1✔
498
        recipients = [recipients]
×
499

500
    offload_task(
1✔
501
        django_mail.send_mail,
502
        subject,
503
        body,
504
        from_email,
505
        recipients,
506
        fail_silently=False,
507
        html_message=html_message
508
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc