• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

FIWARE / contract-management / #69

30 Oct 2025 07:38AM UTC coverage: 1.686% (+0.04%) from 1.651%
#69

Pull #12

wistefan
improve organization creation and cleanup
Pull Request #12: Support for central marketplace and policy creation

117 of 1238 new or added lines in 31 files covered. (9.45%)

5 existing lines in 2 files now uncovered.

587 of 34807 relevant lines covered (1.69%)

0.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.3
/src/main/java/org/fiware/iam/tmforum/notification/NotificationSubscriber.java
1
package org.fiware.iam.tmforum.notification;
2

3
import io.micronaut.context.annotation.Context;
4
import io.micronaut.context.annotation.Requires;
5
import io.micronaut.context.annotation.Value;
6
import io.micronaut.http.HttpMethod;
7
import io.micronaut.http.HttpRequest;
8
import io.micronaut.http.HttpStatus;
9
import io.micronaut.http.client.exceptions.HttpClientResponseException;
10
import io.micronaut.http.client.netty.DefaultHttpClient;
11
import io.micronaut.runtime.event.annotation.EventListener;
12
import io.micronaut.runtime.server.event.ServerStartupEvent;
13
import io.micronaut.scheduling.TaskScheduler;
14
import lombok.RequiredArgsConstructor;
15
import lombok.extern.slf4j.Slf4j;
16
import org.fiware.iam.configuration.GeneralProperties;
17
import org.fiware.iam.configuration.NotificationProperties;
18
import org.fiware.iam.tmforum.party.model.EventSubscriptionInputVO;
19
import org.fiware.iam.tmforum.party.model.EventSubscriptionVO;
20
import reactor.core.publisher.Hooks;
21
import reactor.core.publisher.Mono;
22

23
import java.time.Duration;
24
import java.util.List;
25
import java.util.Optional;
26

27
/**
28
 * Responsible for subscribing to all configured tmforum events.
29
 */
30
@Requires(condition = NotificationProperties.NotificationCondition.class)
31
@Context
32
@RequiredArgsConstructor
33
@Slf4j
1✔
34
public class NotificationSubscriber {
35

36
    private static final String QUERY_TEMPLATE = "eventType=%s%s";
37
    private static final String LISTENER_ADDRESS_TEMPLATE = "%s/hub";
38
    private static final String LISTENER_PATH = "/listener/event";
39

40
    private final SubscriptionHealthIndicator subscriptionHealthIndicator;
41
    private final NotificationProperties notificationProperties;
42
    private final GeneralProperties generalProperties;
43
    private final DefaultHttpClient httpClient;
44
    private final TaskScheduler taskScheduler;
45

46
    @Value("${micronaut.server.port:8080}")
47
    private String servicePort;
48

49
    @EventListener
50
    public void onApplicationEvent(ServerStartupEvent e) {
51
        notificationProperties.getEntities()
1✔
52
                .forEach(tmForumEntity ->
1✔
53
                        Optional.ofNullable(tmForumEntity.getEventTypes())
1✔
54
                                .orElse(List.of())
1✔
55
                                .forEach(eventType -> {
1✔
56
                                    subscriptionHealthIndicator.initiateSubscriptionInMap(tmForumEntity.getEntityType() + eventType.getValue());
1✔
57
                                    taskScheduler.scheduleAtFixedRate(Duration.ofSeconds(5), Duration.ofSeconds(10), () -> createSubscription(tmForumEntity.getEntityType(), eventType.getValue(), tmForumEntity.getApiAddress()));
1✔
58
                                }));
1✔
59
    }
1✔
60

61
    private static String removeTrailingSlash(String theString) {
NEW
62
        if (theString.endsWith("/")) {
×
NEW
63
            return theString.substring(0, theString.length() - 1);
×
64
        }
NEW
65
        return theString;
×
66
    }
67

68
    public void createSubscription(String entityType, String eventType, String apiAddress) {
NEW
69
        String callbackUrl = String.format("http://%s:%s%s%s", notificationProperties.getHost(), servicePort, removeTrailingSlash(generalProperties.getBasePath()), LISTENER_PATH);
×
NEW
70
        log.debug("Attempting to register subscription with callback {}", callbackUrl);
×
71

NEW
72
        EventSubscriptionInputVO subscription = new EventSubscriptionInputVO()
×
NEW
73
                .callback(callbackUrl)
×
NEW
74
                .query(String.format(QUERY_TEMPLATE, entityType, eventType));
×
75

NEW
76
        HttpRequest<?> request = HttpRequest.create(HttpMethod.POST, String.format(LISTENER_ADDRESS_TEMPLATE, apiAddress)).body(subscription);
×
77

NEW
78
        Mono.from(httpClient.exchange(request, EventSubscriptionVO.class))
×
NEW
79
                .doOnSuccess(res -> subscriptionHealthIndicator.setSubscriptionHealthy(entityType + eventType))
×
NEW
80
                .onErrorResume(t -> {
×
NEW
81
                    if (t instanceof HttpClientResponseException e) {
×
NEW
82
                        if (e.getStatus() == HttpStatus.CONFLICT) {
×
NEW
83
                            subscriptionHealthIndicator.setSubscriptionHealthy(entityType + eventType);
×
NEW
84
                            return Mono.empty();
×
85
                        }
NEW
86
                        if (e.getStatus() != HttpStatus.CONFLICT) {
×
NEW
87
                            log.info("Event registration failed for {} at {} - Cause: {} : {}", entityType, request.getUri(), e.getStatus(), e.getMessage());
×
88
                        }
NEW
89
                        return Mono.empty();
×
90
                    }
NEW
91
                    log.info("Could not create subscription for {} in TM Forum API", entityType, t);
×
NEW
92
                    return Mono.empty();
×
NEW
93
                }).subscribe();
×
94

NEW
95
    }
×
96
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc