• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sebvey / python-fp / 16287447979

15 Jul 2025 07:54AM UTC coverage: 92.5% (-0.08%) from 92.577%
16287447979

push

github

jeliermann
fix: bugfix dict reduce

7 of 9 new or added lines in 4 files covered. (77.78%)

38 existing lines in 3 files now uncovered.

888 of 960 relevant lines covered (92.5%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.59
/src/xfp/a/simple_async_handler.py
1
import asyncio
1✔
2
from dataclasses import dataclass
1✔
3
import functools
1✔
4
from typing import override, Iterable
1✔
5
from xfp.a.async_handler import AsyncHandler
1✔
6

7
from xfp.functions import XF1, XFunc
1✔
8

9

10
@dataclass
1✔
11
class SimpleAsyncHandler(AsyncHandler):
1✔
12
    @override
1✔
13
    def ap[X, Y](
1✔
14
        self, async_process: Iterable[XF1[[X], Y]]
15
    ) -> XFunc[[Iterable[X]], Iterable[Y]]:
NEW
16
        @XFunc.from_async
×
17
        async def h(iter: Iterable[X]) -> Iterable[Y]:
×
18
            return await asyncio.gather(
×
19
                *(map((lambda fel: XFunc(fel[0])(fel[1])), zip(async_process, iter)))
20
            )
21

NEW
22
        return h
×
23

24
    @override
1✔
25
    def reduce[X](self, f: XF1[[X, X], X], xs: Iterable[X]) -> X:
1✔
26
        return functools.reduce(XFunc(f).collect, xs)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc