• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

thomst / django-async-actions / 6703463578

31 Oct 2023 07:40AM UTC coverage: 94.177% (-1.0%) from 95.17%
6703463578

push

github

thomst
[tests] some testapp cleanup

372 of 395 relevant lines covered (94.18%)

6.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.9
/async_actions/processor.py
1
import celery
7✔
2
from django.contrib.contenttypes.models import ContentType
7✔
3
from .models import ActionTaskState
7✔
4
from .utils import get_object_checksum
7✔
5
from .utils import get_task_verbose_name
7✔
6
from .tasks import get_locks
7✔
7
from .tasks import release_locks
7✔
8
from .tasks import release_locks_on_error
7✔
9

10

11
class Processor:
7✔
12
    """
7✔
13
        A processor builds a celery workflow for a specific
14
    :class:`~.task.ActionTask` and queryset and launches this workflow.
15

16
    :param _type_ queryset: _description_
17
    :param _type_ sig: _description_
18
    :param _type_ runtime_data: _description_, defaults to None
19
    :param bool inner_lock: _description_, defaults to True
20
    :param bool outer_lock: _description_, defaults to False
21
    """
22

23
    #: Task based locking.
24
    TASK_LOCK = 'tasklock'
7✔
25

26
    #: Chain based locking used for chains.
27
    CHAIN_LOCK = 'chainlock'
7✔
28

29
    #: Chord based locking used for groups.
30
    GROUP_LOCK = 'chordlock'
7✔
31

32
    #: Disabled locking.
33
    NO_LOCK = 'nolock'
7✔
34

35
    def __init__(self, queryset, sig, runtime_data=None, lock_mode=None):
7✔
36
        self._queryset = queryset
7✔
37
        self._sig = sig
7✔
38
        self._runtime_data = runtime_data or dict()
7✔
39
        self._lock_mode = lock_mode or self._get_lock_mode(sig)
7✔
40
        self._results = list()
7✔
41
        self._task_states = list()
7✔
42
        self._signatures = None
7✔
43
        self._workflow = None
7✔
44

45
    def _get_lock_mode(self, sig):
7✔
46
        if isinstance(sig, sig.TYPES['chain']):
7✔
47
            return self.CHAIN_LOCK
7✔
48
        elif isinstance(sig, sig.TYPES['group']):
7✔
49
            return self.GROUP_LOCK
×
50
        else:
51
            return self.TASK_LOCK
7✔
52

53
    def _get_lock_ids(self, obj):
7✔
54
        """
55
        _summary_
56

57
        :param _type_ obj: _description_
58
        """
59
        lock_id = get_object_checksum(obj)
7✔
60
        return [lock_id]
7✔
61

62
    def _get_task_state(self, obj, signature):
7✔
63
        """
64
        Create :class:`~.models.TaskState` instance for a signature or for all
65
        signatures of a :class:`~celery.canvas.chain`.
66

67
        :param obj: object to run the action task with
68
        :type obj: :class:`~django.db.models.Model`
69
        :param signature: signature or chain
70
        :type signature: :class:`~celery.canvas.Signature` or :class:`~celery.canvas.chain`
71
        :return list of :class:`~.models.TaskState`: list of TaskState instances
72
        """
73
        content_type = ContentType.objects.get_for_model(type(obj))
7✔
74
        params = dict(
7✔
75
            ctype=content_type,
76
            obj_id=obj.pk,
77
            task_id=signature.id,
78
            task_name=signature.task,
79
            verbose_name=get_task_verbose_name(signature),
80
            status=celery.states.PENDING
81
        )
82
        task_state = ActionTaskState(**params)
7✔
83
        task_state.save()
7✔
84
        return task_state
7✔
85

86
    def _get_signature(self, obj):
7✔
87
        """
88
        _summary_
89

90
        :return _type_: _description_
91
        """
92
        # Clone original signature and add runtime data as kwargs.
93
        # FIXME: sig.clone(kwargs=self._runtime_data) does not work for chained
94
        # tasks. See: https://github.com/celery/celery/issues/5193.
95
        sig = self._sig.clone(kwargs=self._runtime_data)
7✔
96

97
        # Pass the lock ids as headers and let the task handle the locks.
98
        if self._lock_mode == self.TASK_LOCK:
7✔
99
            lock_ids = self._get_lock_ids(obj)
7✔
100
            sig = sig.clone(headers={'lock_ids': lock_ids})
7✔
101

102
        # Let a get_locks task precede the original signature and equip the
103
        # chain with release_locks tasks as callback.
104
        elif self._lock_mode == self.CHAIN_LOCK:
7✔
105
            lock_ids = self._get_lock_ids(obj)
7✔
106
            # Make the signature immutable. Otherwise it would recieve a `None`
107
            # as positional argument from the get_locks task.
108
            sig.set_immutable(True)
7✔
109
            sig = get_locks.si(*lock_ids) | sig
7✔
110
            sig.set(link=release_locks.si(*lock_ids))
7✔
111
            sig.set(link_error=release_locks_on_error.s(*lock_ids))
7✔
112

113
        # Chain the get_locks with a chord of the original signature as header
114
        # and a release_locks callback as body of the chord.
115
        elif self._lock_mode == self.GROUP_LOCK:
×
116
            lock_ids = self._get_lock_ids(obj)
×
117
            callback = release_locks.si(*lock_ids)
×
118
            sig.set_immutable(True)
×
119
            sig = get_locks.si(*lock_ids) | (sig | callback)
×
120
            sig.set(link_error=release_locks_on_error.s(*lock_ids))
×
121

122
        return sig
7✔
123

124
    def _get_task_states(self, obj, signature):
7✔
125
        """
126
        _summary_
127

128
        :param _type_ obj: _description_
129
        :param _type_ signature: _description_
130
        """
131
        # TODO: Exlude release_locks tasks.
132
        task_states = list()
7✔
133
        if signature.name == release_locks.name:
7✔
134
            pass
×
135
        elif isinstance(signature, (signature.TYPES['chain'], signature.TYPES['group'])):
7✔
136
            for sig in signature.tasks:
7✔
137
                task_states.extend(self._get_task_states(obj, sig))
7✔
138
        elif isinstance(signature, signature.TYPES['chord']):
7✔
139
            task_states.extend(self._get_task_states(obj, signature.tasks))
×
140
            task_states.extend(self._get_task_states(obj, signature.body))
×
141
        elif signature.id:
7✔
142
            task_states.append(self._get_task_state(obj, signature))
7✔
143
        return task_states
7✔
144

145
    def _get_signatures(self):
7✔
146
        """
147
        _summary_
148

149
        :return _type_: _description_
150
        """
151
        signatures = list()
7✔
152
        for obj in self._queryset:
7✔
153
            signature = self._get_signature(obj)
7✔
154
            signature.freeze()
7✔
155
            signatures.append(signature)
7✔
156

157
            # For primitives we loop over the tasks attribute of the
158
            # signature. Otherwise we simply use the signature in a
159
            # one-item-list.
160
            self._task_states.extend(self._get_task_states(obj, signature))
7✔
161

162
        return signatures
7✔
163

164
    def _get_workflow(self):
7✔
165
        """
166
        Build a celery workflow. See also
167
        `https://docs.celeryq.dev/en/stable/userguide/canvas.html#the-primitives`_.
168
        By default we build a simple :class:`~celery.canvas.group` of all
169
        signatures we got. By overwriting this method it is possible to build
170
        more advanced workflows.
171

172
        :return :class:`~celery.canvas.group`: celery workflow
173
        """
174
        return celery.group(*self.signatures)
7✔
175

176
    # FIXME: Do we need results property?
177
    @property
7✔
178
    def results(self):
7✔
179
        """
180
        Results returned from the workflow.delay call. Populated by :meth:`.run`
181
        method.
182
        """
183
        return self._results
7✔
184

185
    @property
7✔
186
    def task_states(self):
7✔
187
        """
188
        Nested list of :class:`.models.ActionTaskState` instances. If not
189
        working with a :class:`~celery.canvas.chain` the inner lists have only a
190
        single item. Populated by :meth:`.run` method.
191
        """
192
        return self._task_states
7✔
193

194
    # FIXME: Do we need signatures and workflow properties?
195
    @property
7✔
196
    def signatures(self):
7✔
197
        """
198
        List of signatures for all objects that are not locked. Populated by
199
        :meth:`.run` method.
200
        """
201
        if self._signatures is None:
7✔
202
            self._signatures = self._get_signatures()
7✔
203
        return self._signatures
7✔
204

205
    @property
7✔
206
    def workflow(self):
7✔
207
        """
208
        List of signatures for all objects that are not locked. Set by
209
        :meth:`.run` method.
210
        """
211
        if self._workflow is None:
7✔
212
            self._workflow = self._get_workflow()
7✔
213
        return self._workflow
7✔
214

215
    def run(self):
7✔
216
        """
217
        Run the workflow build by :meth:`.get_workflow`. By default we do a
218
        simple :meth:`~celery.canvas.Signature.delay` call.
219

220
        :return :class:`~celery.result.AsyncResult: result object
221
        """
222
        # FIXME: Do we want to use runtime-data as positional argument for
223
        # better chain support:
224
        # args = [self._runtime_data] if self._runtime_data else []
225
        # self._results = self.workflow.delay(*args)
226
        self._results = self.workflow.delay()
7✔
227
        self._results.save()
7✔
228
        return self._results
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc