• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

saritasa-nest / django-import-export-extensions / 7177800123

12 Dec 2023 06:33AM UTC coverage: 79.507% (+0.3%) from 79.244%
7177800123

push

github

web-flow
Migrate to poetry

- Migrate all invokes to `saritasa-invocations`
- Drop support of python 3.9
- Add support for python 3.12
- Add django 5.0 to ci matrix
- Migrate from `pip-tools` to `poetry`
- Add configuration for  `coverage` tool
- Add separate invoke for `coverage` tests run
- Remove makefiles

3 of 4 new or added lines in 1 file covered. (75.0%)

1 existing line in 1 file now uncovered.

1129 of 1420 relevant lines covered (79.51%)

7.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/import_export_extensions/widgets.py
1
import mimetypes
10✔
2
import typing
10✔
3
from urllib.parse import urlparse
10✔
4

5
from django.conf import settings
10✔
6
from django.core.exceptions import SuspiciousFileOperation
10✔
7
from django.core.files import File
10✔
8
from django.core.files.storage import default_storage
10✔
9
from django.db.models import Model, Q, QuerySet
10✔
10
from django.forms import ValidationError
10✔
11
from django.utils.encoding import smart_str
10✔
12

13
from import_export.exceptions import ImportExportError
10✔
14
from import_export.widgets import CharWidget, ManyToManyWidget
10✔
15

16
from . import utils
10✔
17

18
DEFAULT_SYSTEM_STORAGE = "django.core.files.storage.FileSystemStorage"
10✔
19

20

21
class IntermediateManyToManyWidget(ManyToManyWidget):
10✔
22
    """Widget for M2M field with custom ``through`` model.
23

24
    Default M2M widget store just IDs of related objects.
25
    With intermediate model additional data may be stored.
26

27
    This widget (and subclasses) should be used with
28
    ``IntermediateManyToManyField``. This field expects that widget will return
29
    plain data for intermediate model as a dict.
30

31
    Example models from ``IntermediateManyToManyField`` docstring will be used.
32

33
    So this widget's ``clean`` method should return dict with Membership info,
34
    i.e::
35

36
        {
37
            'band': <Some band>,
38
            'date_joined': '2015-01-18'
39
        }
40

41
    And ``render`` method should return string that will contain required data.
42

43
    """
44

45
    def __init__(
10✔
46
        self,
47
        rem_model: typing.Type[Model],
48
        instance_separator: str = ",",
49
        prop_separator: typing.Optional[str] = None,
50
        rem_field: str = "pk",
51
        rem_field_lookup: typing.Optional[str] = None,
52
        extra_fields: typing.Optional[list[str]] = None,
53
        render_empty: bool = False,
54
        *args,
55
        **kwargs,
56
    ):
57
        """Init widget.
58

59
        Args:
60
            rem_model(models.Model): remote model (i.e. Band)
61
            instance_separator(str): separator for instances
62
            prop_separator(str): separator for instance properties
63
            rem_field(str): name of remote field to dump remote model (pk
64
                by default, may be used more human-readable names)
65
            extra_fields(list[str]): extra fields that should be dumped
66
                (will be dumped in most dummy way)
67
            render_empty (bool): defines if render empty values or not
68
        """
69
        if prop_separator == ";" or prop_separator is None:
10✔
70
            prop_separator = ":"
10✔
71
        self.rem_model = rem_model
10✔
72
        self.instance_separator = instance_separator
10✔
73
        self.prop_separator = prop_separator
10✔
74
        self.rem_field = rem_field
10✔
75
        self.rem_field_lookup = rem_field_lookup
10✔
76
        self.extra_fields = extra_fields or []
10✔
77
        self.render_empty = render_empty
10✔
78

79
    def render(self, value: typing.Iterable[Model], *args, **kwargs) -> str:
10✔
80
        """Return an export representation of a intermediate instances.
81

82
        For atrists example should be returned something like
83
            "5:1990-12-12;19:2005-08-16"
84
            where 5 is band id
85

86
        Args:
87
            value(QuerySet): instances of intermediate model
88

89
        """
90
        instances = []
10✔
91
        for i in value:
10✔
92
            instances.append(
10✔
93
                self.render_instance(i, self._get_related_instance(i)),
94
            )
95
        # Clean empty instances
96
        if not self.render_empty:
10✔
97
            instances = list(filter(None, instances))
10✔
98

99
        return self.instance_separator.join(instances)
10✔
100

101
    def render_instance(self, instance: Model, related_object: Model) -> str:
10✔
102
        """Return export representation of one intermediate instance.
103

104
        Should take related object PK and extra fields from Intermediate model,
105
        i.e. Artist.pk and Membership.date_joined
106

107
        Args:
108
            instance: object of intermediate model
109
            related_object: associated object (i.e. Band)
110

111
        """
112
        # get related object (i.e. Band)
113
        props = [
10✔
114
            smart_str(self._get_field_value(related_object, self.rem_field)),
115
        ]
116

117
        for attr in self.extra_fields:
10✔
118
            props.append(smart_str(self._get_field_value(instance, attr)))
10✔
119

120
        return self.prop_separator.join(props)
10✔
121

122
    def _get_field_value(self, obj: Model, field: str) -> typing.Any:
10✔
123
        """Get `field` value from an object.
124

125
        Support chained fields like `field1__field2__field3` which allows to
126
        get values from obj.field1.field2.field3
127

128
        """
129
        fields_chain = field.split("__")
10✔
130
        value = obj
10✔
131
        for s in fields_chain:
10✔
132
            value = getattr(value, s)
10✔
133

134
        return value
10✔
135

136
    def _get_related_instance(self, instance: Model) -> Model:
10✔
137
        """Get related instance based on IntermediateModel instance.
138

139
        i.e. get Band based on Membership instance
140

141
        Args:
142
            instance: instance of intermediate model
143

144
        """
145
        for field in instance._meta.get_fields():
10✔
146
            if field.related_model == self.rem_model:
10✔
147
                return getattr(instance, field.name)
10✔
148

149
    def clean(
10✔
150
        self,
151
        value: str,
152
        *args,
153
        **kwargs,
154
    ) -> list[dict[str, typing.Any]]:
155
        """Restore data from dump.
156

157
        In ``value`` we have data that saved using ``render`` method. We should
158
        restore it
159

160
        Args:
161
            value(str): rendered data about instance of intermediate model
162
            instance:
163

164
        Returns:
165
            list[dict]: parsed data
166

167
        Example:
168
            [{'band': <Band object>, 'date_joined': '1998-12-21'}]
169

170
        """
171
        if not value:
10✔
NEW
172
            return []  # pragma: no cover
×
173

174
        # if value is one integer number
175
        value = str(value)
10✔
176

177
        # in some cases if click `enter` values `\n\r` inserted
178
        if self.instance_separator == "\n":  # pragma: no cover
10✔
179
            value = value.replace("\r", "")
×
180

181
        raw_instances = utils.clean_sequence_of_string_values(
10✔
182
            value.split(self.instance_separator),
183
        )
184

185
        result = []
10✔
186
        invalid_instances = []
10✔
187
        restored_objects_ids = []
10✔
188
        for raw_instance in raw_instances:
10✔
189
            try:
10✔
190
                for item in self.clean_instance(raw_instance):
10✔
191
                    if item["object"].pk not in restored_objects_ids:
10✔
192
                        restored_objects_ids.append(item["object"].pk)
10✔
193
                        result.append(item)
10✔
194
            except ValueError:
10✔
195
                invalid_instances.append(raw_instance)
10✔
196

197
        # if there are entries in `invalid_instances`
198
        if invalid_instances:
10✔
199
            raise ValueError(
10✔
200
                "You are trying import invalid values: {0}".format(
201
                    str(invalid_instances),
202
                ),
203
            )
204

205
        return result
10✔
206

207
    def clean_instance(self, raw_instance: str) -> list[dict[str, typing.Any]]:
10✔
208
        """Restore info about one instance of intermediate model.
209

210
        If there are few instances in DB with same
211
        `self.rem_field`=`rem_field_value` then it returns few items
212

213
        Args:
214
            raw_instance(str): info about one instance that saved using
215
                ``render_instance`` method
216

217
        Returns:
218
            list: list of dicts with restored info about one intermediate
219
                  instance. If there are few instances in DB with same
220
                  `self.rem_field`=`rem_field_value` then it returns few items
221

222
        Example::
223

224
            {
225
                'object': <Band object>,
226
                'properties': {'date_joined': '2011-10-12'}
227
            }
228

229
        """
230
        props = raw_instance.split(self.prop_separator)
10✔
231

232
        if len(props) > len(self.extra_fields) + 1:
10✔
233
            # +1 is for `self.rem_field`
234
            raise ImportExportError(
10✔
235
                "Too many property separators '{0}' in '{1}'".format(
236
                    self.prop_separator, raw_instance,
237
                ),
238
            )
239

240
        # rem instance now contains value used to identify related object,
241
        # i.e. PK of Band
242
        # props contain other saved properties of intermediate model
243
        # i.e. `date_joined`
244
        rem_field_value, *props = utils.clean_sequence_of_string_values(
10✔
245
            raw_instance.split(self.prop_separator), ignore_empty=False,
246
        )
247

248
        # get related objects
249
        qs = self.filter_instances(rem_field_value)
10✔
250

251
        # if we tries import nonexistent instance
252
        if not qs.exists():
10✔
253
            raise ValueError("Invalid instance {0}".format(raw_instance))
10✔
254

255
        # build dict with other properties. Ignore extra fields which has
256
        # empty strings values
257
        other_props = {
10✔
258
            key: value for key, value in zip(self.extra_fields, props) if value
259
        }
260
        return [
10✔
261
            {"object": rem_object, "properties": other_props}
262
            for rem_object in qs
263
        ]
264

265
    def filter_instances(self, rem_field_value: str) -> QuerySet:
10✔
266
        """Shortcut to filter corresponding instances."""
267
        if self.rem_field_lookup:
10✔
268
            if self.rem_field_lookup == "regex":
×
269
                instance_filter = utils.get_clear_q_filter(
×
270
                    rem_field_value, self.rem_field,
271
                )
272
            else:
273
                lookup = f"{self.rem_field}__{self.rem_field_lookup}"
×
274
                instance_filter = Q(**{lookup: rem_field_value})
×
275
        else:
276
            instance_filter = Q(**{self.rem_field: rem_field_value})
10✔
277

278
        return self.rem_model.objects.filter(instance_filter)
10✔
279

280

281
class FileWidget(CharWidget):
10✔
282
    """Widget for working with File fields."""
283

284
    def __init__(self, filename: str):
10✔
285
        self.filename = filename
10✔
286

287
    def render(
10✔
288
        self,
289
        value: typing.Optional[Model],
290
        *args,
291
        **kwargs,
292
    ) -> typing.Optional[str]:
293
        """Convert DB value to URL to file."""
294
        if not value:
10✔
295
            return None
10✔
296

297
        if self._get_default_storage() == DEFAULT_SYSTEM_STORAGE:
10✔
298
            return f"http://localhost:8000{value.url}"
10✔
299

300
        return value.url
×
301

302
    def clean(
10✔
303
        self,
304
        value: typing.Optional[str],
305
        *args,
306
        **kwargs,
307
    ) -> typing.Optional[str]:
308
        """Get the file and check for exists."""
309
        if not value:
10✔
310
            return None
10✔
311

312
        internal_url = utils.url_to_internal_value(urlparse(value).path)
10✔
313

314
        if not internal_url:
10✔
315
            raise ValidationError("Invalid image path")
×
316

317
        try:
10✔
318
            if default_storage.exists(internal_url):
10✔
319
                return internal_url
10✔
320
        except SuspiciousFileOperation:
×
321
            pass
322

323
        return self._get_file(value)
10✔
324

325
    def _get_file(self, url: str) -> File:
10✔
326
        """Download file from the external resource."""
327
        file = utils.download_file(url)
10✔
328
        ext = mimetypes.guess_extension(file.content_type)
10✔
329
        filename = f"{self.filename}.{ext}" if ext else self.filename
10✔
330

331
        return File(file, filename)
10✔
332

333
    def _get_default_storage(self) -> str:
10✔
334
        """Return default system storage used in project.
335

336
        Use the value from `STORAGES` if it's available,
337
        otherwise use `DEFAULT_FILE_STORAGE`.
338

339
        `STORAGES` variable replaced `DEFAULT_FILE_STORAGE`, in django 4.2
340
        https://docs.djangoproject.com/en/4.2/ref/settings/#default-file-storage
341

342
        """
343
        if hasattr(settings, "STORAGES"):
10✔
344
            return settings.STORAGES["default"]["BACKEND"]
10✔
UNCOV
345
        return settings.DEFAULT_FILE_STORAGE
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc