• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

flowkeeper-org / fk-desktop / 12629598236

06 Jan 2025 09:03AM CUT coverage: 84.322%. Remained the same
12629598236

push

github

web-flow
Merge pull request #59 from bth/main

Fix typo in "Building for Linux and macOS" section

2915 of 3457 relevant lines covered (84.32%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.93
/src/fk/core/abstract_event_source.py
1
#  Flowkeeper - Pomodoro timer for power users and teams
2
#  Copyright (c) 2023 Constantine Kulak
3
#
4
#  This program is free software: you can redistribute it and/or modify
5
#  it under the terms of the GNU General Public License as published by
6
#  the Free Software Foundation; either version 3 of the License, or
7
#  (at your option) any later version.
8
#
9
#  This program is distributed in the hope that it will be useful,
10
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
#  GNU General Public License for more details.
13
#
14
#  You should have received a copy of the GNU General Public License
15
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
from __future__ import annotations
1✔
17

18
import datetime
1✔
19
import logging
1✔
20
from abc import ABC, abstractmethod
1✔
21
from typing import Iterable, Callable, TypeVar, Generic
1✔
22

23
from fk.core import events
1✔
24
from fk.core.abstract_cryptograph import AbstractCryptograph
1✔
25
from fk.core.abstract_event_emitter import AbstractEventEmitter
1✔
26
from fk.core.abstract_serializer import AbstractSerializer
1✔
27
from fk.core.abstract_settings import AbstractSettings
1✔
28
from fk.core.abstract_strategy import AbstractStrategy
1✔
29
from fk.core.auto_seal import auto_seal
1✔
30
from fk.core.backlog import Backlog
1✔
31
from fk.core.pomodoro import Pomodoro
1✔
32
from fk.core.tag import Tag
1✔
33
from fk.core.tenant import ADMIN_USER
1✔
34
from fk.core.user import User
1✔
35
from fk.core.user_strategies import CreateUserStrategy
1✔
36
from fk.core.workitem import Workitem
1✔
37

38
logger = logging.getLogger(__name__)
1✔
39
TRoot = TypeVar('TRoot')
1✔
40

41

42
class AbstractEventSource(AbstractEventEmitter, ABC, Generic[TRoot]):
1✔
43

44
    _serializer: AbstractSerializer
1✔
45
    _settings: AbstractSettings
1✔
46
    _cryptograph: AbstractCryptograph
1✔
47
    _last_seq: int
1✔
48
    _estimated_count: int
1✔
49
    _ignore_invalid_sequences: bool
1✔
50
    _ignore_errors: bool
1✔
51

52
    def __init__(self,
1✔
53
                 serializer: AbstractSerializer,
54
                 settings: AbstractSettings,
55
                 cryptograph: AbstractCryptograph):
56
        AbstractEventEmitter.__init__(self, [
1✔
57
            events.BeforeUserCreate,
58
            events.AfterUserCreate,
59
            events.BeforeUserDelete,
60
            events.AfterUserDelete,
61
            events.BeforeUserRename,
62
            events.AfterUserRename,
63
            events.BeforeBacklogCreate,
64
            events.AfterBacklogCreate,
65
            events.BeforeBacklogDelete,
66
            events.AfterBacklogDelete,
67
            events.BeforeBacklogRename,
68
            events.AfterBacklogRename,
69
            events.BeforeBacklogReorder,
70
            events.AfterBacklogReorder,
71
            events.BeforeWorkitemCreate,
72
            events.AfterWorkitemCreate,
73
            events.BeforeWorkitemComplete,
74
            events.AfterWorkitemComplete,
75
            events.BeforeWorkitemStart,
76
            events.AfterWorkitemStart,
77
            events.BeforeWorkitemDelete,
78
            events.AfterWorkitemDelete,
79
            events.BeforeWorkitemRename,
80
            events.AfterWorkitemRename,
81
            events.BeforeWorkitemReorder,
82
            events.AfterWorkitemReorder,
83
            events.BeforePomodoroAdd,
84
            events.AfterPomodoroAdd,
85
            events.BeforePomodoroRemove,
86
            events.AfterPomodoroRemove,
87
            events.BeforePomodoroWorkStart,
88
            events.AfterPomodoroWorkStart,
89
            events.BeforePomodoroRestStart,
90
            events.AfterPomodoroRestStart,
91
            events.BeforePomodoroComplete,
92
            events.AfterPomodoroComplete,
93
            events.TagCreated,
94
            events.TagDeleted,
95
            events.TagContentChanged,
96
            events.SourceMessagesRequested,
97
            events.SourceMessagesProcessed,
98
            events.BeforeMessageProcessed,
99
            events.AfterMessageProcessed,
100
            events.PongReceived,
101
        ], settings.invoke_callback)
102
        # TODO - Generate client uid for each connection. This will help us do master/slave for strategies.
103
        self._serializer = serializer
1✔
104
        self._settings = settings
1✔
105
        self._cryptograph = cryptograph
1✔
106
        self._last_seq = 0
1✔
107
        self._estimated_count = 0
1✔
108
        self._ignore_invalid_sequences = settings.get('Source.ignore_invalid_sequence') == 'True'
1✔
109
        self._ignore_errors = settings.get('Source.ignore_errors') == 'True'
1✔
110

111
    # Override
112
    @abstractmethod
1✔
113
    def get_data(self) -> TRoot:
1✔
114
        pass
×
115

116
    # Override
117
    @abstractmethod
1✔
118
    def get_name(self) -> str:
1✔
119
        pass
×
120

121
    def get_config_parameter(self, name: str) -> str:
1✔
122
        return self._settings.get(name)
1✔
123

124
    def set_config_parameters(self, values: dict[str, str]) -> None:
1✔
125
        self._settings.set(values)
×
126

127
    # Assuming those strategies have been already executed. We do not replay them here.
128
    # Override
129
    @abstractmethod
1✔
130
    def _append(self, strategies: list[AbstractStrategy[TRoot]]) -> None:
1✔
131
        pass
×
132

133
    # This will initiate connection, which will trigger replay
134
    @abstractmethod
1✔
135
    def start(self, mute_events: bool = True) -> None:
1✔
136
        pass
×
137

138
    def execute_prepared_strategy(self, strategy: AbstractStrategy[TRoot], auto: bool = False, persist: bool = False) -> None:
1✔
139
        params = {'strategy': strategy, 'auto': auto}
1✔
140
        self._emit(events.BeforeMessageProcessed, params)
1✔
141
        # UC-2: All executed strategies are wrapped in BeforeMessageProcessed / AfterMessageProcessed events
142
        res = strategy.execute(self._emit, self.get_data())
1✔
143
        self._emit(events.AfterMessageProcessed, params)
1✔
144
        if res is not None and res[0] == 'auto-seal':
1✔
145
            # A special case for auto-seal. Can be used for other unusual "retry" cases, too.
146
            # UC-3: Certain strategies may request the "auto-seal" BEFORE they execute
147
            self.auto_seal()
1✔
148
            res = strategy.execute(self._emit, self.get_data())
1✔
149
            if res is not None and res[0] == 'auto-seal':
1✔
150
                raise Exception(f'There is another running pomodoro in "{res[1].get_name()}"')
×
151
        self._estimated_count += 1
1✔
152
        if persist:
1✔
153
            self._append([strategy])
1✔
154
            # UC-2: Strategy sequence is incremented only after it is persisted
155
            self._last_seq = strategy.get_sequence()   # Only save it if all went well
1✔
156

157
    def execute(self,
1✔
158
                strategy_class: type[AbstractStrategy[TRoot]],
159
                params: list[str],
160
                persist: bool = True,
161
                when: datetime.datetime = None,
162
                auto: bool = False,
163
                carry: any = None) -> None:
164
        # This method is called when the user does something in the UI on THIS instance
165
        # TODO: Get username from the login provider instead
166
        if when is None:
1✔
167
            when = datetime.datetime.now(datetime.timezone.utc)
1✔
168
        s = strategy_class(
1✔
169
            self._last_seq + 1,
170
            when,
171
            self._settings.get_username(),  # UC-2: Strategy owner is taken from the source settings
172
            params,
173
            self._settings,
174
            carry)
175
        self.execute_prepared_strategy(s, auto, persist)
1✔
176

177
    def auto_seal(self, when: datetime.datetime | None = None) -> None:
1✔
178
        auto_seal(self.workitems(),
1✔
179
                  lambda strategy_class, params, persist, exec_when: self.execute(strategy_class,
180
                                                                                  params,
181
                                                                                  persist=persist,
182
                                                                                  when=exec_when,
183
                                                                                  auto=True),
184
                  when)
185

186
    def users(self) -> Iterable[User]:
1✔
187
        for user in self.get_data().values():
1✔
188
            yield user
1✔
189

190
    def backlogs(self) -> Iterable[Backlog]:
1✔
191
        for user in self.get_data().values():
1✔
192
            for backlog in user.values():
1✔
193
                yield backlog
1✔
194

195
    def tags(self) -> Iterable[Tag]:
1✔
196
        for user in self.get_data().values():
1✔
197
            for tag in user.get_tags().values():
1✔
198
                yield tag
1✔
199

200
    def workitems(self) -> Iterable[Workitem]:
1✔
201
        for backlog in self.backlogs():
1✔
202
            for workitem in backlog.values():
1✔
203
                yield workitem
1✔
204

205
    def find_workitem(self, uid: str) -> Workitem | None:
1✔
206
        for workitem in self.workitems():
1✔
207
            if workitem.get_uid() == uid:
1✔
208
                return workitem
1✔
209

210
    def find_backlog(self, uid: str) -> Backlog | None:
1✔
211
        for backlog in self.backlogs():
×
212
            if backlog.get_uid() == uid:
×
213
                return backlog
×
214

215
    def find_tag(self, uid: str) -> Tag | None:
1✔
216
        for tag in self.tags():
1✔
217
            if tag.get_uid() == uid:
1✔
218
                return tag
1✔
219

220
    def find_user(self, identity: str) -> User | None:
1✔
221
        for user in self.users():
1✔
222
            if user.get_identity() == identity:
1✔
223
                return user
1✔
224

225
    def pomodoros(self) -> Iterable[Pomodoro]:
1✔
226
        for workitem in self.workitems():
×
227
            for pomodoro in workitem.values():
×
228
                yield pomodoro
×
229

230
    @abstractmethod
1✔
231
    def clone(self, new_root: TRoot) -> AbstractEventSource[TRoot]:
1✔
232
        pass
×
233

234
    def _sequence_error(self, prev: int, next_: int) -> None:
1✔
235
        raise Exception(f"Strategies must go in sequence. "
×
236
                        f"Received {next_} after {prev}. "
237
                        f"To attempt a repair go to Settings > Connection > Repair.")
238

239
    @abstractmethod
1✔
240
    def disconnect(self):
1✔
241
        pass
×
242

243
    def get_settings(self) -> AbstractSettings:
1✔
244
        return self._settings
1✔
245

246
    @abstractmethod
1✔
247
    def send_ping(self) -> str | None:
1✔
248
        pass
×
249

250
    @abstractmethod
1✔
251
    def can_connect(self):
1✔
252
        pass
×
253

254
    @abstractmethod
1✔
255
    def repair(self) -> list[str] | None:
1✔
256
        pass
×
257

258
    def connect(self):
1✔
259
        raise Exception('Connect is not supported on this type of event source')
×
260

261
    def get_init_strategy(self, emit: Callable[[str, dict[str, any], any], None]) -> AbstractStrategy[AbstractEventSource[TRoot]]:
1✔
262
        return CreateUserStrategy(1,
1✔
263
                                  datetime.datetime.now(datetime.timezone.utc),
264
                                  ADMIN_USER,
265
                                  [self._settings.get_username(), self._settings.get_fullname()],
266
                                  self._settings)
267

268
    def get_last_sequence(self):
1✔
269
        return self._last_seq
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc