• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iplweb / bpp / dedd10c6-fb94-4555-854a-16d9255205b4

24 Aug 2025 08:22PM UTC coverage: 43.704% (-3.2%) from 46.899%
dedd10c6-fb94-4555-854a-16d9255205b4

push

circleci

mpasternak
Merge branch 'release/v202508.1204'

4 of 4 new or added lines in 2 files covered. (100.0%)

739 existing lines in 33 files now uncovered.

16905 of 38681 relevant lines covered (43.7%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
src/pbn_api/models/sentdata.py
1
from django.core.exceptions import ObjectDoesNotExist
2✔
2
from django.db import models
2✔
3
from django.db.models import JSONField
2✔
4

5
from django.contrib.contenttypes.fields import GenericForeignKey
2✔
6
from django.contrib.contenttypes.models import ContentType
2✔
7

8
from bpp import const
2✔
9
from bpp.models import LinkDoPBNMixin
2✔
10

11

12
class SentDataManager(models.Manager):
2✔
13
    def get_for_rec(self, rec):
2✔
14
        return self.get(
×
15
            object_id=rec.pk, content_type=ContentType.objects.get_for_model(rec)
16
        )
17

18
    def check_if_needed(self, rec, data: dict):
2✔
19
        try:
×
20
            sd = self.get_for_rec(rec)
×
21
        except SentData.DoesNotExist:
×
22
            return True
×
23

24
        if sd.data_sent != data:
×
25
            return True
×
26

27
        if not sd.uploaded_okay:
×
28
            return True
×
29

30
        return False
×
31

32
    def updated(
2✔
33
        self, rec, data: dict, pbn_uid_id=None, uploaded_okay=True, exception=None
34
    ):
35
        try:
×
36
            sd = self.get_for_rec(rec)
×
37
        except SentData.DoesNotExist:
×
38
            self.create(
×
39
                object=rec,
40
                data_sent=data,
41
                uploaded_okay=uploaded_okay,
42
                pbn_uid_id=pbn_uid_id,
43
                exception=exception,
44
            )
45
            return
×
46

47
        sd.data_sent = data
×
48
        sd.uploaded_okay = uploaded_okay
×
49
        sd.exception = exception
×
50
        sd.pbn_uid_id = pbn_uid_id
×
51
        sd.save()
×
52

53
    def ids_for_model(self, model):
2✔
54
        return self.filter(content_type=ContentType.objects.get_for_model(model))
×
55

56
    def bad_uploads(self, model):
2✔
57
        return (
×
58
            self.ids_for_model(model)
59
            .filter(uploaded_okay=False)
60
            .values_list("object_id", flat=True)
61
            .distinct()
62
        )
63

64

65
class SentData(LinkDoPBNMixin, models.Model):
2✔
66
    url_do_pbn = const.LINK_PBN_DO_PUBLIKACJI
2✔
67
    content_type = models.ForeignKey(
2✔
68
        "contenttypes.ContentType", on_delete=models.CASCADE
69
    )
70
    object_id = models.PositiveIntegerField(db_index=True)
2✔
71

72
    object = GenericForeignKey()
2✔
73

74
    data_sent = JSONField("Wysłane dane")
2✔
75
    last_updated_on = models.DateTimeField("Data operacji", auto_now=True)
2✔
76

77
    uploaded_okay = models.BooleanField(
2✔
78
        "Wysłano poprawnie", default=True, db_index=True
79
    )
80
    exception = models.TextField("Kod błędu", max_length=65535, blank=True, null=True)
2✔
81

82
    pbn_uid = models.ForeignKey(
2✔
83
        "pbn_api.Publication",
84
        verbose_name="Publikacja z PBN",
85
        blank=True,
86
        null=True,
87
        on_delete=models.SET_NULL,
88
    )
89

90
    typ_rekordu = models.CharField(max_length=50, blank=True, null=True)
2✔
91

92
    objects = SentDataManager()
2✔
93

94
    class Meta:
2✔
95
        verbose_name = "Informacja o wysłanych danych"
2✔
96
        verbose_name_plural = "Informacje o wysłanych danych"
2✔
97

98
    object.verbose_name = "Rekord"
2✔
99

100
    def __str__(self):
2✔
101
        return (
×
102
            f"Informacja o wysłanych do PBN danych dla rekordu ({self.content_type_id},{self.object_id}) "
103
            f"z dnia {self.last_updated_on} (status: {'OK' if self.uploaded_okay else 'ERR'})"
104
        )
105

106
    def link_do_pbn_wartosc_id(self):
2✔
107
        return self.pbn_uid_id
×
108

109
    def rekord_w_bpp(self):
2✔
110
        try:
×
111
            return self.object
×
112
        except ObjectDoesNotExist:
×
113
            pass
×
114

115
    def save(
2✔
116
        self, force_insert=False, force_update=False, using=None, update_fields=None
117
    ):
118

UNCOV
119
        if update_fields and "data_sent" in update_fields:
×
120
            if self.typ_rekordu != self.data_sent.get("type"):
×
121
                update_fields.append("typ_rekordu")
×
122

UNCOV
123
        self.typ_rekordu = self.data_sent.get("type")
×
UNCOV
124
        return super().save(
×
125
            force_insert=force_insert,
126
            force_update=force_update,
127
            using=using,
128
            update_fields=update_fields,
129
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc