• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

ChristianTremblay / BAC0 / 10761719058

08 Sep 2024 05:13PM UTC coverage: 38.955% (+0.04%) from 38.92%
10761719058

push

github

ChristianTremblay
Merge branch 'async' of https://github.com/ChristianTremblay/BAC0 into async

90 of 171 new or added lines in 18 files covered. (52.63%)

1008 existing lines in 18 files now uncovered.

2072 of 5319 relevant lines covered (38.95%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.78
/BAC0/tasks/RecurringTask.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
5
# Licensed under LGPLv3, see file LICENSE in this source tree.
6
#
7
"""
8
RecurringTask.py - execute a recurring task
9
"""
10
import asyncio
1✔
11
from typing import Any, Callable, Tuple, Union, Coroutine
1✔
12
from concurrent.futures import ThreadPoolExecutor
1✔
13

14
from ..core.utils.notes import note_and_log
1✔
15
from .TaskManager import Task
1✔
16

17

18
@note_and_log
1✔
19
class RecurringTask(Task):
1✔
20
    """
21
    Start a recurring task (a function passed)
22
    """
23

24
    def __init__(
1✔
25
        self,
26
        fnc: Union[Tuple[Callable, Any], Callable, Coroutine],
27
        delay: int = 60,
28
        name: str = "recurring",
29
    ) -> None:
30
        """
31
        :param fnc: a function or a tuple (function, args)
32
        :param delay: (int) Delay between reads executions
33

34
        :returns: Nothing
35
        """
36
        self.fnc_args = None
1✔
37
        self.delay = delay
1✔
38
        if isinstance(fnc, tuple):
1✔
UNCOV
39
            self.func, self.fnc_args = fnc
×
40
        elif hasattr(fnc, "__call__"):
1✔
41
            self.func = fnc
1✔
42
        else:
UNCOV
43
            raise ValueError(
×
44
                "You must pass a function or a tuple (function,args) to this..."
45
            )
46
        Task.__init__(self, name=name, delay=delay)
1✔
47

48
    async def task(self) -> None:
1✔
49
        loop = asyncio.get_event_loop()
1✔
50
        executor = ThreadPoolExecutor()
1✔
51
        if self.fnc_args:
1✔
52
            if asyncio.iscoroutinefunction(self.func):
×
UNCOV
53
                await self.func(self.fnc_args)
×
54
            else:
UNCOV
55
                await loop.run_in_executor(executor, self.func, self.fnc_args)
×
56
                #self.func(self.fnc_args)
57
        else:
58
            if asyncio.iscoroutinefunction(self.func):
1✔
59
                await self.func()
1✔
60
            else:
UNCOV
61
                await loop.run_in_executor(executor, self.func)
×
62
                #self.func()
63
        await asyncio.sleep(self.delay)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc