• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

FIWARE / contract-management / #36

09 Apr 2025 12:52PM UTC coverage: 1.745% (+0.8%) from 0.982%
#36

Pull #5

wistefan
fix tmf version
Pull Request #5: integrate contract negotiation

25 of 340 new or added lines in 12 files covered. (7.35%)

16673 existing lines in 365 files now uncovered.

588 of 33695 relevant lines covered (1.75%)

0.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/org/fiware/iam/tmforum/handlers/ProductOfferingEventHandler.java
1
package org.fiware.iam.tmforum.handlers;
2

3
import com.fasterxml.jackson.databind.ObjectMapper;
4
import io.micronaut.http.HttpResponse;
5
import io.micronaut.http.HttpStatus;
6
import jakarta.inject.Singleton;
7
import lombok.RequiredArgsConstructor;
8
import lombok.extern.slf4j.Slf4j;
9
import org.fiware.iam.tmforum.ProductOfferingConstants;
10
import org.fiware.iam.tmforum.productcatalog.api.ProductSpecificationApiClient;
11
import org.fiware.iam.tmforum.productcatalog.model.*;
12
import org.fiware.rainbow.api.CatalogApiClient;
13
import org.fiware.rainbow.model.CatalogVO;
14
import org.fiware.rainbow.model.DataServiceVO;
15
import org.fiware.rainbow.model.NewDataserviceVO;
16
import reactor.core.publisher.Mono;
17

18
import java.util.*;
19
import java.util.function.Function;
20

21
/**
22
 * Handle all incoming events in connection to ProductOfferings
23
 */
24
@RequiredArgsConstructor
25
@Singleton
26
@Slf4j
×
27
public class ProductOfferingEventHandler implements EventHandler {
28

29

30
        private static final String CREATE_EVENT = "ProductOfferingCreateEvent";
31
        private static final String DELETE_EVENT = "ProductOfferingDeleteEvent";
32
        private static final String STATE_CHANGE_EVENT = "ProductOfferingStateChangeEvent";
33

34
        private static final List<String> SUPPORTED_EVENT_TYPES = List.of(CREATE_EVENT, DELETE_EVENT, STATE_CHANGE_EVENT);
×
35

36
        public static final String EMPTY_STRING_MARKER = "Empty";
37
        public static final String OWNER_ROLE = "Owner";
38

39
        private final CatalogApiClient rainbowCatalogApiClient;
40
        private final ProductSpecificationApiClient productSpecificationApiClient;
41
        private final org.fiware.iam.tmforum.productcatalog.api.CatalogApiClient catalogApiClient;
42
        private final ObjectMapper objectMapper;
43

44
        @Override
45
        public boolean isEventTypeSupported(String eventType) {
46
                return SUPPORTED_EVENT_TYPES.contains(eventType);
×
47
        }
48

49
        @Override
50
        public Mono<HttpResponse<?>> handleEvent(String eventType, Map<String, Object> event) {
51
                return switch (eventType) {
×
52
                        case CREATE_EVENT -> handleOfferingCreation(event);
×
53
                        case STATE_CHANGE_EVENT -> handleOfferingStateChange(event);
×
54
                        case DELETE_EVENT -> handleOfferingDeletion(event);
×
55
                        default -> throw new IllegalArgumentException("Even type %s is not supported.".formatted(eventType));
×
56
                };
57
        }
58

59
        private Mono<HttpResponse<?>> handleOfferingCreation(Map<String, Object> event) {
60
                ProductOfferingCreateEventVO productOfferingCreateEventVO = objectMapper.convertValue(event, ProductOfferingCreateEventVO.class);
×
61
                ProductOfferingVO productOfferingVO = Optional.ofNullable(productOfferingCreateEventVO.getEvent())
×
62
                                .map(ProductOfferingCreateEventPayloadVO::getProductOffering)
×
63
                                .orElseThrow(() -> new IllegalArgumentException("The event does not contain a product offering."));
×
64

65
                if (productOfferingVO.getCategory() == null || productOfferingVO.getCategory().isEmpty()) {
×
66
                        throw new IllegalArgumentException("Product offering does not have a category.");
×
67
                }
68

69
                Mono<NewDataserviceVO> dataserviceVOMono = prepareNewDataservice(productOfferingVO);
×
70
                Mono<List<String>> catalogsMono = getCatalogsForProductOffering(productOfferingVO);
×
71
                return Mono.zip(dataserviceVOMono, catalogsMono, this::createDataservice).
×
72
                                flatMap(Function.identity());
×
73
        }
74

75
        private Mono<HttpResponse<?>> handleOfferingStateChange(Map<String, Object> event) {
76
                ProductOfferingStateChangeEventVO productOfferingStateChangeEventVO = objectMapper.convertValue(event, ProductOfferingStateChangeEventVO.class);
×
77
                ProductOfferingVO productOfferingVO = Optional.ofNullable(productOfferingStateChangeEventVO.getEvent())
×
78
                                .map(ProductOfferingStateChangeEventPayloadVO::getProductOffering)
×
79
                                .orElseThrow(() -> new IllegalArgumentException("The event does not contain a product offering."));
×
80

81
                if (productOfferingVO.getCategory() == null || productOfferingVO.getCategory().isEmpty()) {
×
82
                        // if no category is included, the offering is not part of a catalog anymore
83
                        return deleteOffering(productOfferingVO);
×
84
                }
85
                // find catalogs that the offering should be in
86
                Mono<List<String>> targetCatalogs = getCatalogsForProductOffering(productOfferingVO);
×
87

88
                // (rainbow) catalogs that the offering is currently included
89
                Mono<List<String>> currentCatalogs = rainbowCatalogApiClient.getCatalogs()
×
90
                                .map(HttpResponse::body)
×
91
                                .map(catalogVOS -> catalogVOS.stream().map(CatalogVO::getAtId).toList());
×
92

93
                return Mono.zipDelayError(targetCatalogs, currentCatalogs)
×
94
                                .flatMap(tuple -> handleCatalogEntries(tuple.getT1(), tuple.getT2(), productOfferingVO));
×
95
        }
96

97
        private Mono<HttpResponse<?>> handleCatalogEntries(List<String> targetCatalogs, List<String> currentCatalogs, ProductOfferingVO productOfferingVO) {
98
                List<String> newCatalogs = new ArrayList<>();
×
99
                List<String> updateCatalogs = new ArrayList<>();
×
100
                targetCatalogs.stream().forEach(targetCatalog -> {
×
101
                        if (currentCatalogs.contains(targetCatalog)) {
×
102
                                updateCatalogs.add(targetCatalog);
×
103
                        } else {
104
                                newCatalogs.add(targetCatalog);
×
105
                        }
106
                });
×
107
                List<String> deleteCatalogs = currentCatalogs.stream().filter(currentCatalog -> !targetCatalogs.contains(currentCatalog)).toList();
×
108

109
                List<Mono<HttpResponse<?>>> offeringMonos = new ArrayList<>();
×
110
                offeringMonos.add(
×
111
                                Mono.zipDelayError(deleteCatalogs.stream()
×
112
                                                .map(catalogId -> rainbowCatalogApiClient.deleteDataserviceInCatalog(catalogId, productOfferingVO.getId()))
×
113
                                                .toList(), r -> HttpResponse.accepted()));
×
114

115
                if (!newCatalogs.isEmpty() || !updateCatalogs.isEmpty()) {
×
116
                        Mono<NewDataserviceVO> newDataservice = prepareNewDataservice(productOfferingVO);
×
117
                        offeringMonos.add(newDataservice.flatMap(dataserviceVO -> {
×
118
                                List<Mono<HttpResponse<?>>> rainbowResponses = new ArrayList<>();
×
119
                                if (!newCatalogs.isEmpty()) {
×
120
                                        rainbowResponses.add(createDataservice(dataserviceVO, newCatalogs));
×
121
                                }
122
                                if (!updateCatalogs.isEmpty()) {
×
123
                                        rainbowResponses.add(updateDataservice(dataserviceVO, updateCatalogs));
×
124
                                }
125
                                return Mono.zipDelayError(rainbowResponses, r -> HttpResponse.accepted());
×
126
                        }));
127
                }
128
                return Mono.zipDelayError(offeringMonos, r -> HttpResponse.accepted());
×
129
        }
130

131
        private Mono<ProductSpecificationVO> getSpecForOffering(ProductOfferingVO productOfferingVO) {
132
                return productSpecificationApiClient.retrieveProductSpecification(productOfferingVO.getProductSpecification().getId(), null)
×
133
                                .map(HttpResponse::body);
×
134
        }
135

136
        private Mono<HttpResponse<?>> createDataservice(NewDataserviceVO dataserviceVO, List<String> catalogs) {
137
                return Mono.zip(catalogs.stream()
×
138
                                                .map(id -> rainbowCatalogApiClient
×
139
                                                                .createDataserviceInCatalog(id, dataserviceVO)
×
140
                                                                .onErrorMap(t ->
×
141
                                                                                new IllegalArgumentException("Was not able to create dataservice %s".formatted(dataserviceVO), t)))
×
142
                                                .toList(),
×
143
                                responses -> Arrays.stream(responses)
×
144
                                                .map(HttpResponse.class::cast)
×
145
                                                .filter(resp -> resp.getStatus() != HttpStatus.CREATED)
×
146
                                                .findAny()
×
147
                                                .map(r -> HttpResponse.status(HttpStatus.BAD_GATEWAY))
×
148
                                                .orElse(HttpResponse.ok()));
×
149

150
        }
151

152
        private Mono<HttpResponse<?>> updateDataservice(NewDataserviceVO dataserviceVO, List<String> catalogs) {
153
                return Mono.zip(catalogs.stream()
×
154
                                                .map(id -> rainbowCatalogApiClient
×
155
                                                                .updateDataserviceInCatalog(id, dataserviceVO.getAtId(), dataserviceVO)
×
156
                                                                .onErrorMap(t ->
×
157
                                                                                new IllegalArgumentException("Was not able to update dataservice %s in %s".formatted(dataserviceVO, id), t)))
×
158
                                                .toList(),
×
159
                                responses -> Arrays.stream(responses)
×
160
                                                .map(HttpResponse.class::cast)
×
161
                                                .filter(resp -> resp.getStatus() != HttpStatus.ACCEPTED)
×
162
                                                .findAny()
×
163
                                                .map(r -> HttpResponse.status(HttpStatus.BAD_GATEWAY))
×
164
                                                .orElse(HttpResponse.ok()));
×
165

166
        }
167

168
        private Mono<List<String>> getCatalogsForProductOffering(ProductOfferingVO productOfferingVO) {
169
                List<String> categoryIds = productOfferingVO.getCategory().stream().map(CategoryRefVO::getId).toList();
×
170

171
                return rainbowCatalogApiClient.getCatalogs()
×
172
                                .map(HttpResponse::body)
×
173
                                .flatMap(catalogList ->
×
174
                                                Mono.zip(catalogList.stream()
×
175
                                                                                .map(CatalogVO::getAtId)
×
176
                                                                                .map(id -> catalogApiClient.retrieveCatalog(id, null)
×
177
                                                                                                .map(HttpResponse::body)
×
178
                                                                                                .filter(cvo -> cvo.getCategory().stream()
×
179
                                                                                                                .map(CategoryRefVO::getId)
×
180
                                                                                                                .anyMatch(categoryIds::contains)
×
181
                                                                                                )
182
                                                                                                .map(org.fiware.iam.tmforum.productcatalog.model.CatalogVO::getId)
×
183
                                                                                                // prevent empty monos, because they will terminate the zip
184
                                                                                                .defaultIfEmpty(EMPTY_STRING_MARKER)
×
185
                                                                                )
186
                                                                                .toList(),
×
187
                                                                ids -> Arrays.stream(ids)
×
188
                                                                                .filter(String.class::isInstance)
×
189
                                                                                .map(String.class::cast)
×
190
                                                                                .filter(id -> !id.equals(EMPTY_STRING_MARKER))
×
191
                                                                                .toList())
×
192
                                );
193

194
        }
195

196
        private static void setEndpoint(ProductSpecificationVO spec, NewDataserviceVO newDataserviceVO) {
197
                if (spec.getProductSpecCharacteristic() != null && !spec.getProductSpecCharacteristic().isEmpty()) {
×
198
                        spec.getProductSpecCharacteristic().forEach(psc -> {
×
NEW
199
                                if (psc.getValueType().equals(ProductOfferingConstants.ENDPOINT_URL_TYPE)) {
×
200
                                        getCharValue(psc.getProductSpecCharacteristicValue())
×
201
                                                        .ifPresent(newDataserviceVO::dcatColonEndpointURL);
×
202

NEW
203
                                } else if (psc.getValueType().equals(ProductOfferingConstants.ENDPOINT_DESCRIPTION_TYPE)) {
×
204
                                        getCharValue(psc.getProductSpecCharacteristicValue())
×
205
                                                        .ifPresent(newDataserviceVO::dcatColonEndpointDescription);
×
206
                                }
207
                        });
×
208
                }
209
        }
×
210

211
        private static Optional<String> getCharValue(List<CharacteristicValueSpecificationVO> specs) {
212
                if (specs == null || specs.isEmpty()) {
×
213
                        return Optional.empty();
×
214
                }
215
                if (specs.size() == 1 && specs.get(0).getValue() instanceof String stringValue) {
×
216
                        return Optional.of(stringValue);
×
217
                }
218
                return specs.stream()
×
219
                                .filter(cvs -> Optional.ofNullable(cvs.getIsDefault()).orElse(false))
×
220
                                .findFirst()
×
221
                                .map(CharacteristicValueSpecificationVO::getValue)
×
222
                                .filter(String.class::isInstance)
×
223
                                .map(String.class::cast);
×
224
        }
225

226
        private static void setRelatedParty(ProductSpecificationVO spec, NewDataserviceVO newDataserviceVO) {
227
                if (spec.getRelatedParty() != null && !spec.getRelatedParty().isEmpty()) {
×
228
                        spec.getRelatedParty().stream()
×
229
                                        .filter(rp -> rp.getRole().equals(OWNER_ROLE))
×
230
                                        .map(RelatedPartyVO::getId)
×
231
                                        .findFirst()
×
232
                                        .ifPresent(newDataserviceVO::dctColonCreator);
×
233
                }
234
        }
×
235

236

237
        private Mono<HttpResponse<?>> handleOfferingDeletion(Map<String, Object> event) {
238

239
                ProductOfferingDeleteEventVO productOfferingDeleteEventVO = objectMapper.convertValue(event, ProductOfferingDeleteEventVO.class);
×
240
                ProductOfferingVO productOfferingVO = Optional.ofNullable(productOfferingDeleteEventVO.getEvent())
×
241
                                .map(ProductOfferingDeleteEventPayloadVO::getProductOffering)
×
242
                                .orElseThrow(() -> new IllegalArgumentException("The event does not contain a product offering."));
×
243

244
                return deleteOffering(productOfferingVO);
×
245

246
        }
247

248
        private Mono<HttpResponse<?>> deleteOffering(ProductOfferingVO productOfferingVO) {
249
                return rainbowCatalogApiClient.getCatalogs()
×
250
                                .map(HttpResponse::body)
×
251
                                .flatMap(catalogVOS ->
×
252
                                                Mono.zipDelayError(
×
253
                                                                catalogVOS.stream()
×
254
                                                                                .filter(catalogVO ->
×
255
                                                                                                catalogVO.getDcatColonService()
×
256
                                                                                                                .stream()
×
257
                                                                                                                .map(DataServiceVO::getAtId)
×
258
                                                                                                                .anyMatch(id -> id.equals(productOfferingVO.getId()))
×
259
                                                                                )
260
                                                                                .map(catalogVO -> rainbowCatalogApiClient
×
261
                                                                                                .deleteDataserviceInCatalog(catalogVO.getAtId(), productOfferingVO.getId())
×
262

263
                                                                                ).toList(),
×
264
                                                                responses -> HttpResponse.noContent())
×
265
                                );
266
        }
267

268
        private Mono<NewDataserviceVO> prepareNewDataservice(ProductOfferingVO productOfferingVO) {
269
                NewDataserviceVO newDataserviceVO = new NewDataserviceVO().atId(productOfferingVO.getId());
×
270
                return getSpecForOffering(productOfferingVO)
×
271
                                .map(spec -> {
×
272
                                        newDataserviceVO.dctColonTitle(spec.getName());
×
273
                                        setRelatedParty(spec, newDataserviceVO);
×
274
                                        setEndpoint(spec, newDataserviceVO);
×
275
                                        return newDataserviceVO;
×
276
                                });
277
        }
278

279
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc