• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.0
/freqtrade/rpc/api_server/uvicorn_threaded.py
1
import contextlib
1✔
2
import threading
1✔
3
import time
1✔
4

5
import uvicorn
1✔
6

7

8
def asyncio_setup() -> None:  # pragma: no cover
9
    # Set eventloop for win32 setups
10
    # Reverts a change done in uvicorn 0.15.0 - which now sets the eventloop
11
    # via policy.
12
    import sys
13

14
    if sys.version_info >= (3, 8) and sys.platform == "win32":
15
        import asyncio
16
        import selectors
17
        selector = selectors.SelectSelector()
18
        loop = asyncio.SelectorEventLoop(selector)
19
        asyncio.set_event_loop(loop)
20

21

22
class UvicornServer(uvicorn.Server):
1✔
23
    """
24
    Multithreaded server - as found in https://github.com/encode/uvicorn/issues/742
25

26
    Removed install_signal_handlers() override based on changes from this commit:
27
        https://github.com/encode/uvicorn/commit/ce2ef45a9109df8eae038c0ec323eb63d644cbc6
28

29
    Cannot rely on asyncio.get_event_loop() to create new event loop because of this check:
30
        https://github.com/python/cpython/blob/4d7f11e05731f67fd2c07ec2972c6cb9861d52be/Lib/asyncio/events.py#L638
31

32
    Fix by overriding run() and forcing creation of new event loop if uvloop is available
33
    """
34

35
    def run(self, sockets=None):
1✔
36
        import asyncio
1✔
37

38
        """
1✔
39
        Parent implementation calls self.config.setup_event_loop(),
40
            but we need to create uvloop event loop manually
41
        """
42
        try:
1✔
43
            import uvloop  # noqa
1✔
44
        except ImportError:  # pragma: no cover
45

46
            asyncio_setup()
47
        else:
48
            asyncio.set_event_loop(uvloop.new_event_loop())
×
49
        try:
1✔
50
            loop = asyncio.get_running_loop()
1✔
51
        except RuntimeError:
1✔
52
            # When running in a thread, we'll not have an eventloop yet.
53
            loop = asyncio.new_event_loop()
1✔
54
        loop.run_until_complete(self.serve(sockets=sockets))
1✔
55

56
    @contextlib.contextmanager
1✔
57
    def run_in_thread(self):
1✔
58
        self.thread = threading.Thread(target=self.run, name='FTUvicorn')
1✔
59
        self.thread.start()
1✔
60
        while not self.started:
1✔
61
            time.sleep(1e-3)
×
62

63
    def cleanup(self):
1✔
64
        self.should_exit = True
1✔
65
        self.thread.join()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc