• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wistefan / tmforum-api / #49

29 Sep 2023 06:20AM UTC coverage: 67.488% (-4.3%) from 71.815%
#49

push

web-flow
Notifications (#23)

* Squashed commits

* Added cache invalidation for entity deletion

* Updated error message

618 of 618 new or added lines in 86 files covered. (100.0%)

2794 of 4140 relevant lines covered (67.49%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/common/src/main/java/org/fiware/tmforum/common/notification/EventHandler.java
1
package org.fiware.tmforum.common.notification;
2

3
import io.github.wistefan.mapping.EntityVOMapper;
4
import io.micronaut.cache.annotation.Cacheable;
5
import io.micronaut.context.annotation.Bean;
6
import lombok.RequiredArgsConstructor;
7
import org.fiware.ngsi.model.EntityVO;
8
import org.fiware.tmforum.common.domain.subscription.Event;
9
import org.fiware.tmforum.common.domain.subscription.Subscription;
10
import org.fiware.tmforum.common.exception.EventHandlingException;
11
import org.fiware.tmforum.common.notification.command.*;
12
import org.fiware.tmforum.common.querying.QueryParser;
13
import org.fiware.tmforum.common.querying.SubscriptionQueryResolver;
14
import org.fiware.tmforum.common.repository.TmForumRepository;
15
import org.fiware.tmforum.common.util.StringUtils;
16
import reactor.core.publisher.Mono;
17

18
import java.lang.reflect.InvocationTargetException;
19
import java.time.Instant;
20
import java.util.*;
21

22
import static org.fiware.tmforum.common.CommonConstants.DEFAULT_OFFSET;
23
import static org.fiware.tmforum.common.notification.EventConstants.*;
24

25
@Bean
26
@RequiredArgsConstructor
×
27
public class EventHandler {
28
    public static final String SUBSCRIPTIONS_CACHE_NAME = "subscriptions";
29

30
    private final TmForumRepository repository;
31
    private final NotificationSender notificationSender;
32
    private final SubscriptionQueryResolver subscriptionQueryResolver;
33
    private final EntityVOMapper entityVOMapper;
34

35
    @Cacheable(SUBSCRIPTIONS_CACHE_NAME)
36
    public Mono<List<Subscription>> getSubscriptions(String entityType, String eventType) {
37
        return repository.findEntities(
×
38
                DEFAULT_OFFSET,
×
39
                100,
×
40
                Subscription.TYPE_SUBSCRIPTION,
41
                Subscription.class,
42
                QueryParser.toNgsiLdQuery(Subscription.class,
×
43
                        String.format("entities=%s&eventTypes=%s", entityType, eventType))
×
44
        );
45
    }
46

47
    private <T> Mono<Void> handle(T entity, EventDetails<T> eventDetails, Command command) {
48
        return getSubscriptions(eventDetails.entityType, eventDetails.eventType)
×
49
                .doOnNext(subscriptions -> {
×
50
                    List<Notification> notifications = new ArrayList<>();
×
51
                    subscriptions.forEach(subscription -> {
×
52
                        if (command.execute(subscription.getQuery())) {
×
53
                            Event event = createEvent(eventDetails.eventType, applyFieldsFilter(entity, subscription.getFields()),
×
54
                                    eventDetails.payloadName);
55
                            notifications.add(new Notification(subscription.getCallback(), event));
×
56
                        }
57
                    });
×
58
                    notificationSender.sendNotifications(notifications);
×
59
                })
×
60
                .then();
×
61
    }
62

63
    public <T> Mono<Void> handleCreateEvent(T entity) {
64
        try {
65
            EventDetails<T> eventDetails = new EventDetails<>(entity, CREATE_EVENT_SUFFIX);
×
66
            Command command = new CreateEventCommand<>(subscriptionQueryResolver, entity, eventDetails.payloadName);
×
67
            return handle(entity, eventDetails, command);
×
68
        } catch (EventHandlingException e) {
×
69
            return Mono.empty();
×
70
        }
71
    }
72

73
    public <T> Mono<Void> handleUpdateEvent(T newState, T oldState) {
74
        try {
75
            EventDetails<T> eventDetails1 = new EventDetails<>(newState, ATTRIBUTE_VALUE_CHANGE_EVENT_SUFFIX);
×
76
            Command command1 = new AttributeValueChangeEventCommand<>(
×
77
                    subscriptionQueryResolver, newState, oldState, eventDetails1.payloadName);
78
            Mono<Void> attrUpdateMono = handle(newState, eventDetails1, command1);
×
79

80

81
            EventDetails<T> eventDetails2 = new EventDetails<>(newState, STATE_CHANGE_EVENT_SUFFIX);
×
82
            Command command2 = new StateChangeEventCommand<>(newState, oldState);
×
83
            Mono<Void> stateChangeMono = handle(newState, eventDetails2, command2);
×
84

85
            return attrUpdateMono.then(stateChangeMono);
×
86
        } catch (EventHandlingException e) {
×
87
            return Mono.empty();
×
88
        }
89
    }
90

91
    public Mono<Void> handleDeleteEvent(EntityVO entityVO) {
92
        try {
93
            EventDetails<Object> eventDetails = new EventDetails<>(entityVO, DELETE_EVENT_SUFFIX);
×
94
            Command command = new DeleteCommand();
×
95
            return handle(entityVO, eventDetails, command);
×
96
        } catch (EventHandlingException e) {
×
97
            return Mono.empty();
×
98
        }
99
    }
100

101
    private Event createEvent(String eventType, Map<String, Object> payload, String payloadName) {
102
        Event event = new Event();
×
103

104
        event.setEventType(eventType);
×
105
        event.setEventId(UUID.randomUUID().toString());
×
106
        event.setEvent(Map.of(payloadName, payload));
×
107
        event.setEventTime(Instant.now());
×
108

109
        return event;
×
110
    }
111

112
    private <T> Map<String, Object> applyFieldsFilter(T entity, List<String> fields) {
113
        Map<String, Object> entityMap = entityVOMapper.convertEntityToMap(entity);
×
114
        if (fields != null && !fields.isEmpty()) {
×
115
            Map<String, Object> filteredMap = new HashMap<>();
×
116
            fields.forEach(field -> {
×
117
                if (entityMap.containsKey(field)) {
×
118
                    filteredMap.put(field, entityMap.get(field));
×
119
                }
120
            });
×
121
            return filteredMap;
×
122
        } else {
123
            return entityMap;
×
124
        }
125
    }
126

127
    private static class EventDetails<T> {
128
        String entityType;
129
        String eventType;
130
        String payloadName;
131

132
        public EventDetails(T entity, String eventSuffix) {
×
133
            entityType = getEntityType(entity);
×
134
            init(eventSuffix);
×
135
        }
×
136

137
        public EventDetails(EntityVO entityVO, String eventSuffix) {
×
138
            entityType = entityVO.getType();
×
139
            init(eventSuffix);
×
140
        }
×
141

142
        private String getEntityType(T entity) {
143
            try {
144
                return entity.getClass().getMethod("getType").invoke(entity).toString();
×
145
            } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
×
146
                return "";
×
147
            }
148
        }
149

150
        private void init(String eventSuffix) {
151
            if (entityType.equals(Subscription.TYPE_SUBSCRIPTION)) {
×
152
                throw new EventHandlingException();
×
153
            }
154
            eventType = StringUtils.toCamelCase(entityType) + eventSuffix;
×
155
            payloadName = StringUtils.decapitalize(StringUtils.getEventGroupName(eventType));
×
156
        }
×
157
    }
158
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc