• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wistefan / tmforum-api / #68

24 Oct 2023 01:20PM UTC coverage: 67.773% (-0.4%) from 68.167%
#68

push

web-flow
Make dome working (#37)

* set defaults

* update mapping dep

* add redis as option

3287 of 4850 relevant lines covered (67.77%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/common/src/main/java/org/fiware/tmforum/common/notification/EventHandler.java
1
package org.fiware.tmforum.common.notification;
2

3
import io.github.wistefan.mapping.EntityVOMapper;
4
import io.micronaut.cache.annotation.Cacheable;
5
import io.micronaut.context.annotation.Bean;
6
import lombok.RequiredArgsConstructor;
7
import org.fiware.ngsi.model.EntityVO;
8
import org.fiware.tmforum.common.CommonConstants;
9
import org.fiware.tmforum.common.domain.subscription.Event;
10
import org.fiware.tmforum.common.domain.subscription.Subscription;
11
import org.fiware.tmforum.common.exception.EventHandlingException;
12
import org.fiware.tmforum.common.notification.command.*;
13
import org.fiware.tmforum.common.querying.QueryParser;
14
import org.fiware.tmforum.common.querying.SubscriptionQueryResolver;
15
import org.fiware.tmforum.common.repository.TmForumRepository;
16
import org.fiware.tmforum.common.util.StringUtils;
17
import reactor.core.publisher.Mono;
18

19
import java.lang.reflect.InvocationTargetException;
20
import java.time.Instant;
21
import java.util.*;
22

23
import static org.fiware.tmforum.common.CommonConstants.DEFAULT_OFFSET;
24
import static org.fiware.tmforum.common.notification.EventConstants.*;
25

26
@Bean
27
@RequiredArgsConstructor
×
28
public class EventHandler {
29

30
    private final QueryParser queryParser;
31
    private final TmForumRepository repository;
32
    private final NotificationSender notificationSender;
33
    private final SubscriptionQueryResolver subscriptionQueryResolver;
34
    private final EntityVOMapper entityVOMapper;
35

36
    @Cacheable(CommonConstants.SUBSCRIPTIONS_CACHE_NAME)
37
    public Mono<List<Subscription>> getSubscriptions(String entityType, String eventType) {
38
        return repository.findEntities(
×
39
                DEFAULT_OFFSET,
×
40
                100,
×
41
                Subscription.TYPE_SUBSCRIPTION,
42
                Subscription.class,
43
                queryParser.toNgsiLdQuery(Subscription.class,
×
44
                        String.format("entities=%s&eventTypes=%s", entityType, eventType))
×
45
        );
46
    }
47

48
    private <T> Mono<Void> handle(T entity, EventDetails<T> eventDetails, Command command) {
49
        return getSubscriptions(eventDetails.entityType, eventDetails.eventType)
×
50
                .doOnNext(subscriptions -> {
×
51
                    List<Notification> notifications = new ArrayList<>();
×
52
                    subscriptions.forEach(subscription -> {
×
53
                        if (command.execute(subscription.getQuery())) {
×
54
                            Event event = createEvent(eventDetails.eventType, applyFieldsFilter(entity, subscription.getFields()),
×
55
                                    eventDetails.payloadName);
56
                            notifications.add(new Notification(subscription.getCallback(), event));
×
57
                        }
58
                    });
×
59
                    notificationSender.sendNotifications(notifications);
×
60
                })
×
61
                .then();
×
62
    }
63

64
    public <T> Mono<Void> handleCreateEvent(T entity) {
65
        try {
66
            EventDetails<T> eventDetails = new EventDetails<>(entity, CREATE_EVENT_SUFFIX);
×
67
            Command command = new CreateEventCommand<>(subscriptionQueryResolver, entity, eventDetails.payloadName);
×
68
            return handle(entity, eventDetails, command);
×
69
        } catch (EventHandlingException e) {
×
70
            return Mono.empty();
×
71
        }
72
    }
73

74
    public <T> Mono<Void> handleUpdateEvent(T newState, T oldState) {
75
        try {
76
            EventDetails<T> eventDetails1 = new EventDetails<>(newState, ATTRIBUTE_VALUE_CHANGE_EVENT_SUFFIX);
×
77
            Command command1 = new AttributeValueChangeEventCommand<>(
×
78
                    subscriptionQueryResolver, newState, oldState, eventDetails1.payloadName);
79
            Mono<Void> attrUpdateMono = handle(newState, eventDetails1, command1);
×
80

81

82
            EventDetails<T> eventDetails2 = new EventDetails<>(newState, STATE_CHANGE_EVENT_SUFFIX);
×
83
            Command command2 = new StateChangeEventCommand<>(newState, oldState);
×
84
            Mono<Void> stateChangeMono = handle(newState, eventDetails2, command2);
×
85

86
            return attrUpdateMono.then(stateChangeMono);
×
87
        } catch (EventHandlingException e) {
×
88
            return Mono.empty();
×
89
        }
90
    }
91

92
    public Mono<Void> handleDeleteEvent(EntityVO entityVO) {
93
        try {
94
            EventDetails<Object> eventDetails = new EventDetails<>(entityVO, DELETE_EVENT_SUFFIX);
×
95
            Command command = new DeleteCommand();
×
96
            return handle(entityVO, eventDetails, command);
×
97
        } catch (EventHandlingException e) {
×
98
            return Mono.empty();
×
99
        }
100
    }
101

102
    private Event createEvent(String eventType, Map<String, Object> payload, String payloadName) {
103
        Event event = new Event();
×
104

105
        event.setEventType(eventType);
×
106
        event.setEventId(UUID.randomUUID().toString());
×
107
        event.setEvent(Map.of(payloadName, payload));
×
108
        event.setEventTime(Instant.now());
×
109

110
        return event;
×
111
    }
112

113
    private <T> Map<String, Object> applyFieldsFilter(T entity, List<String> fields) {
114
        Map<String, Object> entityMap = entityVOMapper.convertEntityToMap(entity);
×
115
        if (fields != null && !fields.isEmpty()) {
×
116
            Map<String, Object> filteredMap = new HashMap<>();
×
117
            fields.forEach(field -> {
×
118
                if (entityMap.containsKey(field)) {
×
119
                    filteredMap.put(field, entityMap.get(field));
×
120
                }
121
            });
×
122
            return filteredMap;
×
123
        } else {
124
            return entityMap;
×
125
        }
126
    }
127

128
    private static class EventDetails<T> {
129
        String entityType;
130
        String eventType;
131
        String payloadName;
132

133
        public EventDetails(T entity, String eventSuffix) {
×
134
            entityType = getEntityType(entity);
×
135
            init(eventSuffix);
×
136
        }
×
137

138
        public EventDetails(EntityVO entityVO, String eventSuffix) {
×
139
            entityType = entityVO.getType();
×
140
            init(eventSuffix);
×
141
        }
×
142

143
        private String getEntityType(T entity) {
144
            try {
145
                return entity.getClass().getMethod("getType").invoke(entity).toString();
×
146
            } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
×
147
                return "";
×
148
            }
149
        }
150

151
        private void init(String eventSuffix) {
152
            if (entityType.equals(Subscription.TYPE_SUBSCRIPTION)) {
×
153
                throw new EventHandlingException();
×
154
            }
155
            eventType = StringUtils.toCamelCase(entityType) + eventSuffix;
×
156
            payloadName = StringUtils.decapitalize(StringUtils.getEventGroupName(eventType));
×
157
        }
×
158
    }
159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc