• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

FEniCS / ufl / 18629405325

19 Oct 2025 10:56AM UTC coverage: 77.06% (+0.4%) from 76.622%
18629405325

Pull #401

github

schnellerhase
Ruff
Pull Request #401: Removal of custom type system

494 of 533 new or added lines in 41 files covered. (92.68%)

6 existing lines in 2 files now uncovered.

9325 of 12101 relevant lines covered (77.06%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.41
/ufl/algorithms/domain_analysis.py
1
"""Algorithms for building canonical data structure for integrals over subdomains."""
2

3
# Copyright (C) 2009-2016 Anders Logg and Martin Sandve Alnæs
4
#
5
# This file is part of UFL (https://www.fenicsproject.org)
6
#
7
# SPDX-License-Identifier:    LGPL-3.0-or-later
8

9
from __future__ import annotations
1✔
10

11
import numbers
1✔
12
from collections import defaultdict
1✔
13
from typing import Literal
1✔
14

15
import ufl
1✔
16
from ufl.algorithms.coordinate_derivative_helpers import (
1✔
17
    attach_coordinate_derivatives,
18
    strip_coordinate_derivatives,
19
)
20
from ufl.algorithms.renumbering import renumber_indices
1✔
21
from ufl.core.compute_expr_hash import compute_expr_hash
1✔
22
from ufl.domain import Mesh, sort_domains
1✔
23
from ufl.form import Form
1✔
24
from ufl.integral import Integral
1✔
25
from ufl.protocols import id_or_none
1✔
26
from ufl.sorting import cmp_expr, sorted_expr
1✔
27
from ufl.utils.sorting import canonicalize_metadata, sorted_by_key
1✔
28

29

30
class IntegralData:
1✔
31
    """Utility class.
32

33
    This class has members (domain, integral_type, subdomain_id,
34
    integrals, metadata), where metadata is an empty dictionary that may
35
    be used for associating metadata with each object.
36
    """
37

38
    __slots__ = (
1✔
39
        "domain",
40
        "domain_integral_type_map",
41
        "enabled_coefficients",
42
        "integral_coefficients",
43
        "integral_type",
44
        "integrals",
45
        "metadata",
46
        "subdomain_id",
47
    )
48

49
    def __init__(
1✔
50
        self,
51
        domain: Mesh,
52
        integral_type: str,
53
        subdomain_id: Literal["everywhere"] | numbers.Integral | tuple[numbers.Integral, ...],
54
        integrals: list[Integral],
55
        metadata: dict,
56
        domain_integral_type_map: dict[Mesh, str],
57
    ):
58
        """Initialise.
59

60
        Args:
61
            domain: primal domain
62
            integral_type: integral type on the primal domain
63
            subdomain_id: subdmain id on the primal domain
64
            integrals: integrals
65
            metadata: metadata
66
            domain_integral_type_map: map from domains to the integral types on those domains.
67
                First domain must be the primal domain, and the other domains must be sorted.
68
                Only significant in multi-domain problems
69

70
        """
71
        if 1 != len(set(itg.ufl_domain() for itg in integrals)):
1✔
72
            raise ValueError("Multiple domains mismatch in integral data.")
×
73
        if not all(integral_type == itg.integral_type() for itg in integrals):
1✔
74
            raise ValueError("Integral type mismatch in integral data.")
×
75
        if not all(subdomain_id == itg.subdomain_id() for itg in integrals):
1✔
76
            raise ValueError("Subdomain id mismatch in integral data.")
×
77

78
        self.domain = domain
1✔
79
        self.integral_type = integral_type
1✔
80
        self.subdomain_id = subdomain_id
1✔
81

82
        self.integrals = integrals
1✔
83

84
        # This is populated in preprocess using data not available at
85
        # this stage:
86
        self.integral_coefficients = None
1✔
87
        self.enabled_coefficients = None
1✔
88
        non_primal_domains = tuple(domain_integral_type_map.keys())[1:]
1✔
89
        if sort_domains(non_primal_domains) != non_primal_domains:
1✔
90
            raise ValueError("domain_integral_type_map must have been sorted by domains")
×
91
        self.domain_integral_type_map = domain_integral_type_map
1✔
92

93
        # TODO: I think we can get rid of this with some refactoring
94
        # in ffc:
95
        self.metadata = metadata
1✔
96

97
    def __lt__(self, other):
1✔
98
        """Check if self is less than other."""
99
        # To preserve behaviour of extract_integral_data:
100
        return (
×
101
            self.integral_type,
102
            self.subdomain_id,
103
            self.integrals,
104
            self.metadata,
105
            tuple(
106
                (d._ufl_sort_key_(), itype) for d, itype in self.domain_integral_type_map.items()
107
            ),
108
        ) < (
109
            other.integral_type,
110
            other.subdomain_id,
111
            other.integrals,
112
            other.metadata,
113
            tuple(
114
                (d._ufl_sort_key_(), itype) for d, itype in other.domain_integral_type_map.items()
115
            ),
116
        )
117

118
    def __eq__(self, other):
1✔
119
        """Check for equality."""
120
        # Currently only used for tests:
121
        return (
×
122
            self.integral_type == other.integral_type
123
            and self.subdomain_id == other.subdomain_id
124
            and self.integrals == other.integrals
125
            and self.metadata == other.metadata
126
            and self.domain_integral_type_map == other.domain_integral_type_map
127
        )
128

129
    def __hash__(self):
1✔
130
        """Return hash."""
NEW
131
        return compute_expr_hash(self)
×
132

133
    def __str__(self):
1✔
134
        """Format as a string."""
135
        s = f"IntegralData over domain({self.integral_type}, {self.subdomain_id})"
×
136
        s += " with integrals:\n"
×
137
        s += "\n\n".join(map(str, self.integrals))
×
138
        s += "\nand metadata:\n{metadata}"
×
139
        return s
×
140

141

142
class ExprTupleKey:
1✔
143
    """Tuple comparison helper."""
144

145
    __slots__ = ("x",)
1✔
146

147
    def __init__(self, x):
1✔
148
        """Initialise."""
149
        self.x = x
1✔
150

151
    def __lt__(self, other):
1✔
152
        """Check if self is less than other."""
153
        # Comparing expression first
154
        c = cmp_expr(self.x[0], other.x[0])
×
155
        if c < 0:
×
156
            return True
×
157
        elif c > 0:
×
158
            return False
×
159
        else:
160
            # Comparing form compiler data
161
            mds = canonicalize_metadata(self.x[1])
×
162
            mdo = canonicalize_metadata(other.x[1])
×
163
            return mds < mdo
×
164

165

166
def group_integrals_by_domain_and_type(integrals, domains):
1✔
167
    """Group integrals by domain and type.
168

169
    Args:
170
        integrals: list of Integral objects
171
        domains: list of AbstractDomain objects from the parent Form
172

173
    Returns:
174
        Dictionary mapping (domain, integral_type) to a
175
        dictionary mapping (extra_domain_integral_type_map, list(Integral)).
176
    """
177
    integrals_by_domain_and_type = defaultdict(lambda: defaultdict(list))
1✔
178
    for itg in integrals:
1✔
179
        if itg.ufl_domain() is None:
1✔
180
            raise ValueError("Integral has no domain.")
×
181
        key0 = (itg.ufl_domain(), itg.integral_type())
1✔
182
        key1 = tuple(itg.extra_domain_integral_type_map().items())
1✔
183
        # Append integral to list of integrals with shared key
184
        integrals_by_domain_and_type[key0][key1].append(itg)
1✔
185

186
    return integrals_by_domain_and_type
1✔
187

188

189
def integral_subdomain_ids(integral):
1✔
190
    """Get a tuple of integer subdomains or a valid string subdomain from integral."""
191
    did = integral.subdomain_id()
1✔
192
    if isinstance(did, numbers.Integral):
1✔
193
        return (did,)
1✔
194
    elif isinstance(did, tuple):
1✔
195
        if not all(isinstance(d, numbers.Integral) for d in did):
×
196
            raise ValueError("Expecting only integer subdomains in tuple.")
×
197
        return did
×
198
    elif did in ("everywhere", "otherwise"):
1✔
199
        # TODO: Define list of valid strings somewhere more central
200
        return did
1✔
201
    else:
202
        raise ValueError(f"Invalid domain id {did}.")
×
203

204

205
def rearrange_integrals_by_single_subdomains(
1✔
206
    integrals: list[Integral], do_append_everywhere_integrals: bool
207
) -> dict[int, list[Integral]]:
208
    """Rearrange integrals over multiple subdomains to single subdomain integrals.
209

210
    Args:
211
        integrals:
212
            List of integrals
213
        do_append_everywhere_integrals:
214
            Boolean indicating if integrals defined on the whole domain should just be restricted to
215
            the set of input subdomain ids.
216

217
    Returns:
218
        The integrals reconstructed with single subdomain_id
219
    """
220
    # Split integrals into lists of everywhere and subdomain integrals
221
    everywhere_integrals = []
1✔
222
    subdomain_integrals = []
1✔
223
    for itg in integrals:
1✔
224
        dids = integral_subdomain_ids(itg)
1✔
225
        if dids == "otherwise":
1✔
226
            raise ValueError("'otherwise' integrals should never occur before preprocessing.")
×
227
        elif dids == "everywhere":
1✔
228
            everywhere_integrals.append(itg)
1✔
229
        else:
230
            subdomain_integrals.append((dids, itg))
1✔
231

232
    # Fill single_subdomain_integrals with lists of integrals from
233
    # subdomain_integrals, but split and restricted to single
234
    # subdomain ids
235
    single_subdomain_integrals = defaultdict(list)
1✔
236
    for dids, itg in subdomain_integrals:
1✔
237
        # Region or single subdomain id
238
        for did in dids:
1✔
239
            # Restrict integral to this subdomain!
240
            single_subdomain_integrals[did].append(itg.reconstruct(subdomain_id=did))
1✔
241

242
    # Add everywhere integrals to each single subdomain id integral
243
    # list
244
    otherwise_integrals = []
1✔
245
    for ev_itg in everywhere_integrals:
1✔
246
        # Restrict everywhere integral to 'otherwise'
247
        otherwise_integrals.append(ev_itg.reconstruct(subdomain_id="otherwise"))
1✔
248

249
        # Restrict everywhere integral to each subdomain
250
        # and append to each integral list
251
        if do_append_everywhere_integrals:
1✔
252
            for subdomain_id in sorted(single_subdomain_integrals.keys()):
1✔
253
                single_subdomain_integrals[subdomain_id].append(
×
254
                    ev_itg.reconstruct(subdomain_id=subdomain_id)
255
                )
256

257
    if otherwise_integrals:
1✔
258
        single_subdomain_integrals["otherwise"] = otherwise_integrals
1✔
259

260
    return single_subdomain_integrals
1✔
261

262

263
def accumulate_integrands_with_same_metadata(integrals):
1✔
264
    """Accumulate integrands with the same metedata.
265

266
    Args:
267
        integrals: a list of integrals
268

269
    Returns:
270
        A list of the form [(integrand0, metadata0), (integrand1,
271
        metadata1), ...] where integrand0 < integrand1 by the canonical
272
        ufl expression ordering criteria.
273
    """
274
    # Group integrals by compiler data hash
275
    by_cdid = {}
1✔
276
    for itg in integrals:
1✔
277
        cd = itg.metadata()
1✔
278
        cdid = hash(canonicalize_metadata(cd))
1✔
279
        if cdid not in by_cdid:
1✔
280
            by_cdid[cdid] = ([], cd)
1✔
281
        by_cdid[cdid][0].append(itg)
1✔
282

283
    # Accumulate integrands separately for each compiler data object
284
    # id
285
    for cdid in by_cdid:
1✔
286
        integrals, cd = by_cdid[cdid]
1✔
287
        # Ensure canonical sorting of more than two integrands
288
        integrands = sorted_expr(itg.integrand() for itg in integrals)
1✔
289
        integrands_sum = sum(integrands[1:], integrands[0])
1✔
290
        by_cdid[cdid] = (integrands_sum, cd)
1✔
291

292
    # Sort integrands canonically by integrand first then compiler
293
    # data
294
    return sorted(by_cdid.values(), key=ExprTupleKey)
1✔
295

296

297
def build_integral_data(integrals):
1✔
298
    """Build integral data given a list of integrals.
299

300
    The integrals you pass in here must have been rearranged and
301
    gathered (removing the "everywhere" subdomain_id). To do this, you
302
    should call group_form_integrals.
303

304
    Args:
305
        integrals: An iterable of Integral objects.
306

307
    Returns:
308
        A tuple of IntegralData objects.
309
    """
310
    itgs = defaultdict(list)
1✔
311

312
    # --- Merge integral data that has the same integrals,
313
    for integral in integrals:
1✔
314
        integral_type = integral.integral_type()
1✔
315
        ufl_domain = integral.ufl_domain()
1✔
316
        subdomain_ids = integral.subdomain_id()
1✔
317
        extra_domain_integral_type_tuple = tuple(integral.extra_domain_integral_type_map().items())
1✔
318
        if "everywhere" in subdomain_ids:
1✔
319
            raise ValueError(
×
320
                "'everywhere' not a valid subdomain id. "
321
                "Did you forget to call group_form_integrals?"
322
            )
323

324
        # Group for integral data (One integral data object for all
325
        # integrals with same domain, itype, (but possibly different metadata).
326
        itgs[(ufl_domain, integral_type, subdomain_ids, extra_domain_integral_type_tuple)].append(
1✔
327
            integral
328
        )
329

330
    # Build list with canonical ordering, iteration over dicts
331
    # is not deterministic across python versions
332
    def keyfunc(item):
1✔
333
        (d, itype, sid, extra_d_itype_tuple), _integrals = item
1✔
334
        sid_int = tuple(-1 if i == "otherwise" else i for i in sid)
1✔
335
        return (
1✔
336
            d._ufl_sort_key_(),
337
            itype,
338
            (type(sid).__name__,),
339
            sid_int,
340
            tuple((d_._ufl_sort_key_(), itype_) for d_, itype_ in extra_d_itype_tuple),
341
        )
342

343
    integral_datas = []
1✔
344
    for (d, itype, sid, extra_d_itype_tuple), integrals in sorted(itgs.items(), key=keyfunc):
1✔
345
        d_itype_tuple = ((d, itype),) + extra_d_itype_tuple
1✔
346
        integral_datas.append(
1✔
347
            IntegralData(
348
                d,
349
                itype,
350
                sid,
351
                integrals,
352
                {},
353
                dict(d_itype_tuple),
354
            )
355
        )
356
    return integral_datas
1✔
357

358

359
def group_form_integrals(form, domains, do_append_everywhere_integrals=True):
1✔
360
    """Group integrals by domain and type, performing canonical simplification.
361

362
    Args:
363
        form:
364
            the Form to group the integrals of.
365
        domains:
366
            an iterable of Domains.
367
        do_append_everywhere_integrals:
368
            Boolean indicating if integrals defined on the whole domain should just be restricted to
369
            the set of input subdomain ids.
370

371
    Returns:
372
        A new Form with gathered integrands.
373
    """
374
    # Group integrals by domain and type
375
    integrals_by_domain_and_type = group_integrals_by_domain_and_type(form.integrals(), domains)
1✔
376

377
    integrals = []
1✔
378
    for domain in domains:
1✔
379
        for integral_type in ufl.measure.integral_types():
1✔
380
            # Get integrals with this domain and type
381
            ddt_integrals_map = integrals_by_domain_and_type.get((domain, integral_type))
1✔
382
            if ddt_integrals_map is None:
1✔
383
                continue
1✔
384

385
            for extra_domain_integral_type_tuple, ddt_integrals in ddt_integrals_map.items():
1✔
386
                # Group integrals by subdomain id, after splitting e.g.
387
                #   f*dx((1,2)) + g*dx((2,3)) -> f*dx(1) + (f+g)*dx(2) + g*dx(3)
388
                # (note: before this call, 'everywhere' is a valid subdomain_id,
389
                # and after this call, 'otherwise' is a valid subdomain_id)
390
                single_subdomain_integrals = rearrange_integrals_by_single_subdomains(
1✔
391
                    ddt_integrals, do_append_everywhere_integrals
392
                )
393

394
                for subdomain_id, ss_integrals in sorted_by_key(single_subdomain_integrals):
1✔
395
                    # strip the coordinate derivatives from all integrals
396
                    # this yields a list of the form [(coordinate derivative, integral), ...]
397
                    stripped_integrals_and_coordderivs = strip_coordinate_derivatives(ss_integrals)
1✔
398

399
                    # now group the integrals by the coordinate derivative
400
                    def calc_hash(cd):
1✔
401
                        return sum(
1✔
402
                            sum(tuple_elem._ufl_compute_hash_() for tuple_elem in tuple_)
403
                            for tuple_ in cd
404
                        )
405

406
                    coordderiv_integrals_dict = {}
1✔
407
                    for integral, coordderiv in stripped_integrals_and_coordderivs:
1✔
408
                        coordderivhash = calc_hash(coordderiv)
1✔
409
                        if coordderivhash in coordderiv_integrals_dict:
1✔
410
                            coordderiv_integrals_dict[coordderivhash][1].append(integral)
×
411
                        else:
412
                            coordderiv_integrals_dict[coordderivhash] = (coordderiv, [integral])
1✔
413

414
                    # cd_integrals_dict is now a dict of the form
415
                    # { hash: (CoordinateDerivative, [integral, integral, ...]), ... }
416
                    # we can now put the integrals back together and then afterwards
417
                    # apply the CoordinateDerivative again
418

419
                    for cdhash, samecd_integrals in sorted_by_key(coordderiv_integrals_dict):
1✔
420
                        # Accumulate integrands of integrals that share the
421
                        # same compiler data
422
                        integrands_and_cds = accumulate_integrands_with_same_metadata(
1✔
423
                            samecd_integrals[1]
424
                        )
425

426
                        for integrand, metadata in integrands_and_cds:
1✔
427
                            integral = Integral(
1✔
428
                                integrand,
429
                                integral_type,
430
                                domain,
431
                                subdomain_id,
432
                                metadata,
433
                                None,
434
                                extra_domain_integral_type_map=dict(
435
                                    extra_domain_integral_type_tuple
436
                                ),
437
                            )
438
                            integral = attach_coordinate_derivatives(integral, samecd_integrals[0])
1✔
439
                            integrals.append(integral)
1✔
440

441
    # Group integrals by common integrand
442
    # u.dx(0)*dx(1) + u.dx(0)*dx(2) -> u.dx(0)*dx((1,2))
443
    # to avoid duplicate kernels generated after geometry lowering
444
    unique_integrals = defaultdict(tuple)
1✔
445
    metadata_table = defaultdict(dict)
1✔
446
    for integral in integrals:
1✔
447
        integral_type = integral.integral_type()
1✔
448
        ufl_domain = integral.ufl_domain()
1✔
449
        metadata = integral.metadata()
1✔
450
        meta_hash = hash(canonicalize_metadata(metadata))
1✔
451
        subdomain_id = integral.subdomain_id()
1✔
452
        subdomain_data = id_or_none(integral.subdomain_data())
1✔
453
        extra_domain_integral_type_tuple = tuple(integral.extra_domain_integral_type_map().items())
1✔
454
        integrand = renumber_indices(integral.integrand())
1✔
455
        key = (
1✔
456
            integral_type,
457
            ufl_domain,
458
            meta_hash,
459
            integrand,
460
            subdomain_data,
461
            extra_domain_integral_type_tuple,
462
        )
463
        unique_integrals[key] += (subdomain_id,)
1✔
464
        metadata_table[key] = metadata
1✔
465

466
    grouped_integrals = []
1✔
467
    for integral_data, subdomain_ids in unique_integrals.items():
1✔
468
        (
1✔
469
            integral_type,
470
            ufl_domain,
471
            metadata,
472
            integrand,
473
            subdomain_data,
474
            extra_domain_integral_type_tuple,
475
        ) = integral_data
476
        integral = Integral(
1✔
477
            integrand,
478
            integral_type,
479
            ufl_domain,
480
            subdomain_ids,
481
            metadata_table[integral_data],
482
            subdomain_data,
483
            extra_domain_integral_type_map=dict(extra_domain_integral_type_tuple),
484
        )
485
        grouped_integrals.append(integral)
1✔
486

487
    return Form(grouped_integrals)
1✔
488

489

490
def reconstruct_form_from_integral_data(integral_data):
1✔
491
    """Reconstruct a form from integral data."""
492
    integrals = []
1✔
493
    for ida in integral_data:
1✔
494
        integrals.extend(ida.integrals)
1✔
495
    return Form(integrals)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc