• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

FIWARE / contract-management / #91

22 Apr 2026 06:29AM UTC coverage: 1.776% (+0.1%) from 1.681%
#91

push

web-flow
Merge pull request #23 from FIWARE/fix/proxy

fix(proxy): do not require proxyHost and proxyPort when proxy is disabled

20 of 136 new or added lines in 11 files covered. (14.71%)

1 existing line in 1 file now uncovered.

621 of 34976 relevant lines covered (1.78%)

0.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.5
/src/main/java/org/fiware/iam/tmforum/notification/NotificationSubscriber.java
1
package org.fiware.iam.tmforum.notification;
2

3
import io.micronaut.context.annotation.Context;
4
import io.micronaut.context.annotation.Requires;
5
import io.micronaut.context.annotation.Value;
6
import io.micronaut.http.HttpMethod;
7
import io.micronaut.http.HttpRequest;
8
import io.micronaut.http.HttpStatus;
9
import io.micronaut.http.client.exceptions.HttpClientResponseException;
10
import io.micronaut.http.client.netty.DefaultHttpClient;
11
import io.micronaut.runtime.event.annotation.EventListener;
12
import io.micronaut.runtime.server.event.ServerStartupEvent;
13
import io.micronaut.scheduling.TaskScheduler;
14
import lombok.RequiredArgsConstructor;
15
import lombok.extern.slf4j.Slf4j;
16
import org.fiware.iam.configuration.GeneralProperties;
17
import org.fiware.iam.configuration.NotificationProperties;
18
import org.fiware.iam.tmforum.party.model.EventSubscriptionInputVO;
19
import org.fiware.iam.tmforum.party.model.EventSubscriptionVO;
20
import reactor.core.publisher.Mono;
21

22
import java.time.Duration;
23
import java.util.List;
24
import java.util.Optional;
25

26
/**
27
 * Responsible for subscribing to all configured tmforum events.
28
 */
29
@Requires(condition = NotificationProperties.NotificationCondition.class)
30
@Context
31
@RequiredArgsConstructor
32
@Slf4j
1✔
33
public class NotificationSubscriber {
34

35
    private static final String QUERY_TEMPLATE = "eventType=%s%s";
36
    private static final String LISTENER_ADDRESS_TEMPLATE = "%s/hub";
37
    private static final String LISTENER_PATH = "/listener/event";
38

39
    private final SubscriptionHealthIndicator subscriptionHealthIndicator;
40
    private final NotificationProperties notificationProperties;
41
    private final GeneralProperties generalProperties;
42
    private final DefaultHttpClient httpClient;
43
    private final TaskScheduler taskScheduler;
44

45
    @Value("${micronaut.server.port:8080}")
46
    private String servicePort;
47

48
    private static String removeTrailingSlash(String path) {
49
        if (path.endsWith("/")) {
1✔
50
            return path.substring(0, path.length() - 1);
1✔
51
        }
NEW
52
        return path;
×
53
    }
54

55
    @EventListener
56
    public void onApplicationEvent(ServerStartupEvent e) {
57
        notificationProperties.getEntities()
1✔
58
                .forEach(tmForumEntity ->
1✔
59
                        Optional.ofNullable(tmForumEntity.getEventTypes())
1✔
60
                                .orElse(List.of())
1✔
61
                                .forEach(eventType -> {
1✔
62
                                    subscriptionHealthIndicator.initiateSubscriptionInMap(tmForumEntity.getEntityType() + eventType.getValue());
1✔
63
                                    scheduleSubscription(notificationProperties.getSubscriptionInitialDelay(), tmForumEntity.getEntityType(), eventType.getValue(), tmForumEntity.getApiAddress());
1✔
64
                                }));
1✔
65
    }
1✔
66

67
    private void scheduleSubscription(long delaySeconds, String entityType, String eventType, String apiAddress) {
68
        taskScheduler.schedule(Duration.ofSeconds(delaySeconds), () -> createSubscription(entityType, eventType, apiAddress));
1✔
69
    }
1✔
70

71
    public void createSubscription(String entityType, String eventType, String apiAddress) {
72
        String callbackUrl = String.format("http://%s:%s%s%s", notificationProperties.getHost(), servicePort, removeTrailingSlash(generalProperties.getBasePath()), LISTENER_PATH);
1✔
73
        log.debug("Attempting to register subscription for {} {} events at {}", entityType, eventType, String.format(LISTENER_ADDRESS_TEMPLATE, apiAddress));
1✔
74

75
        EventSubscriptionInputVO subscription = new EventSubscriptionInputVO()
1✔
76
                .callback(callbackUrl)
1✔
77
                .query(String.format(QUERY_TEMPLATE, entityType, eventType));
1✔
78

79
        HttpRequest<?> request = HttpRequest.create(HttpMethod.POST, String.format(LISTENER_ADDRESS_TEMPLATE, apiAddress)).body(subscription);
1✔
80

81
        Mono.from(httpClient.exchange(request, EventSubscriptionVO.class))
1✔
82
                .doOnSuccess(res -> {
1✔
NEW
83
                    subscriptionHealthIndicator.setSubscriptionHealthy(entityType + eventType);
×
NEW
84
                    log.info("Successfully subscribed to {} {} events at {}", entityType, eventType, request.getUri());
×
NEW
85
                })
×
86
                .onErrorResume(t -> {
1✔
87
                    if (t instanceof HttpClientResponseException e) {
×
88
                        if (e.getStatus() == HttpStatus.CONFLICT) {
×
89
                            subscriptionHealthIndicator.setSubscriptionHealthy(entityType + eventType);
×
NEW
90
                            log.info("Subscription for {} {} already exists at {}", entityType, eventType, request.getUri());
×
91
                        } else {
NEW
92
                            String body = e.getResponse().getBody(String.class).orElse("<no body>");
×
NEW
93
                            log.warn("Event registration failed for {} at {} - Status: {} | Message: {} | Body: {} - retrying in {}s", entityType, request.getUri(), e.getStatus(), e.getMessage(), body, notificationProperties.getSubscriptionRetryInterval());
×
NEW
94
                            scheduleSubscription(notificationProperties.getSubscriptionRetryInterval(), entityType, eventType, apiAddress);
×
95
                        }
96
                        return Mono.empty();
×
97
                    }
NEW
98
                    log.warn("Could not create subscription for {} in TM Forum API - retrying in {}s", entityType, notificationProperties.getSubscriptionRetryInterval(), t);
×
NEW
99
                    scheduleSubscription(notificationProperties.getSubscriptionRetryInterval(), entityType, eventType, apiAddress);
×
100
                    return Mono.empty();
×
101
                }).subscribe();
1✔
102

103
    }
1✔
104
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc