• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ChristianTremblay / BAC0 / 10756798981

08 Sep 2024 03:41AM UTC coverage: 38.92% (-1.3%) from 40.19%
10756798981

push

github

ChristianTremblay
forgot to commit the new lookfordependency

19 of 22 new or added lines in 1 file covered. (86.36%)

1357 existing lines in 25 files now uncovered.

2069 of 5316 relevant lines covered (38.92%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/BAC0/tasks/RecurringTask.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
5
# Licensed under LGPLv3, see file LICENSE in this source tree.
6
#
7
"""
8
RecurringTask.py - execute a recurring task
9
"""
10
import asyncio
1✔
11
from typing import Any, Callable, Tuple, Union, Coroutine
1✔
12

13
from ..core.utils.notes import note_and_log
1✔
14
from .TaskManager import Task
1✔
15

16

17
@note_and_log
1✔
18
class RecurringTask(Task):
1✔
19
    """
20
    Start a recurring task (a function passed)
21
    """
22

23
    def __init__(
1✔
24
        self,
25
        fnc: Union[Tuple[Callable, Any], Callable, Coroutine],
26
        delay: int = 60,
27
        name: str = "recurring",
28
    ) -> None:
29
        """
30
        :param fnc: a function or a tuple (function, args)
31
        :param delay: (int) Delay between reads executions
32

33
        :returns: Nothing
34
        """
35
        self.fnc_args = None
1✔
36
        self.delay = delay
1✔
37
        if isinstance(fnc, tuple):
1✔
38
            self.func, self.fnc_args = fnc
×
39
        elif hasattr(fnc, "__call__"):
1✔
40
            self.func = fnc
1✔
41
        else:
42
            raise ValueError(
×
43
                "You must pass a function or a tuple (function,args) to this..."
44
            )
45
        Task.__init__(self, name=name, delay=delay)
1✔
46

47
    async def task(self) -> None:
1✔
48
        if self.fnc_args:
1✔
49
            if asyncio.iscoroutinefunction(self.func):
×
50
                await self.func(self.fnc_args)
×
51
            else:
52
                self.func(self.fnc_args)
×
53
        else:
54
            if asyncio.iscoroutinefunction(self.func):
1✔
55
                await self.func()
1✔
56
            else:
UNCOV
57
                self.func()
×
58
        await asyncio.sleep(self.delay)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc