• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DataBiosphere / consent / #5684

17 Apr 2025 07:36PM UTC coverage: 79.057% (+0.02%) from 79.034%
#5684

push

web-flow
DT-1537: Ensure that study info is populated when indexing dataset terms (#2483)

12 of 14 new or added lines in 5 files covered. (85.71%)

2 existing lines in 2 files now uncovered.

10264 of 12983 relevant lines covered (79.06%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.18
/src/main/java/org/broadinstitute/consent/http/service/ElasticSearchService.java
1
package org.broadinstitute.consent.http.service;
2

3
import com.google.api.client.http.HttpStatusCodes;
4
import com.google.gson.JsonArray;
5
import jakarta.ws.rs.HttpMethod;
6
import jakarta.ws.rs.core.Response;
7
import jakarta.ws.rs.core.Response.Status;
8
import jakarta.ws.rs.core.StreamingOutput;
9
import java.io.IOException;
10
import java.nio.charset.StandardCharsets;
11
import java.sql.SQLException;
12
import java.time.Instant;
13
import java.util.ArrayList;
14
import java.util.Collection;
15
import java.util.List;
16
import java.util.Map;
17
import java.util.Objects;
18
import java.util.Optional;
19
import org.apache.http.entity.ContentType;
20
import org.apache.http.nio.entity.NStringEntity;
21
import org.broadinstitute.consent.http.configurations.ElasticSearchConfiguration;
22
import org.broadinstitute.consent.http.db.DacDAO;
23
import org.broadinstitute.consent.http.db.DataAccessRequestDAO;
24
import org.broadinstitute.consent.http.db.DatasetDAO;
25
import org.broadinstitute.consent.http.db.InstitutionDAO;
26
import org.broadinstitute.consent.http.db.StudyDAO;
27
import org.broadinstitute.consent.http.db.UserDAO;
28
import org.broadinstitute.consent.http.models.Dac;
29
import org.broadinstitute.consent.http.models.DataAccessRequest;
30
import org.broadinstitute.consent.http.models.Dataset;
31
import org.broadinstitute.consent.http.models.DatasetProperty;
32
import org.broadinstitute.consent.http.models.Institution;
33
import org.broadinstitute.consent.http.models.Study;
34
import org.broadinstitute.consent.http.models.StudyProperty;
35
import org.broadinstitute.consent.http.models.User;
36
import org.broadinstitute.consent.http.models.elastic_search.DacTerm;
37
import org.broadinstitute.consent.http.models.elastic_search.DatasetTerm;
38
import org.broadinstitute.consent.http.models.elastic_search.ElasticSearchHits;
39
import org.broadinstitute.consent.http.models.elastic_search.InstitutionTerm;
40
import org.broadinstitute.consent.http.models.elastic_search.StudyTerm;
41
import org.broadinstitute.consent.http.models.elastic_search.UserTerm;
42
import org.broadinstitute.consent.http.models.ontology.DataUseSummary;
43
import org.broadinstitute.consent.http.service.dao.DatasetServiceDAO;
44
import org.broadinstitute.consent.http.util.ConsentLogger;
45
import org.broadinstitute.consent.http.util.gson.GsonUtil;
46
import org.elasticsearch.client.Request;
47
import org.elasticsearch.client.RestClient;
48

49
public class ElasticSearchService implements ConsentLogger {
50

51
  private final RestClient esClient;
52
  private final ElasticSearchConfiguration esConfig;
53
  private final DacDAO dacDAO;
54
  private final DataAccessRequestDAO dataAccessRequestDAO;
55
  private final UserDAO userDAO;
56
  private final OntologyService ontologyService;
57
  private final InstitutionDAO institutionDAO;
58
  private final DatasetDAO datasetDAO;
59
  private final DatasetServiceDAO datasetServiceDAO;
60
  private final StudyDAO studyDAO;
61

62
  public ElasticSearchService(
63
      RestClient esClient,
64
      ElasticSearchConfiguration esConfig,
65
      DacDAO dacDAO,
66
      DataAccessRequestDAO dataAccessRequestDAO,
67
      UserDAO userDao,
68
      OntologyService ontologyService,
69
      InstitutionDAO institutionDAO,
70
      DatasetDAO datasetDAO,
71
      DatasetServiceDAO datasetServiceDAO,
72
      StudyDAO studyDAO) {
1✔
73
    this.esClient = esClient;
1✔
74
    this.esConfig = esConfig;
1✔
75
    this.dacDAO = dacDAO;
1✔
76
    this.dataAccessRequestDAO = dataAccessRequestDAO;
1✔
77
    this.userDAO = userDao;
1✔
78
    this.ontologyService = ontologyService;
1✔
79
    this.institutionDAO = institutionDAO;
1✔
80
    this.datasetDAO = datasetDAO;
1✔
81
    this.datasetServiceDAO = datasetServiceDAO;
1✔
82
    this.studyDAO = studyDAO;
1✔
83
  }
1✔
84

85

86
  private static final String BULK_HEADER = """
87
      { "index": {"_type": "dataset", "_id": "%d"} }
88
      """;
89

90
  private static final String DELETE_QUERY = """
91
      { "query": { "bool": { "must": [ { "match": { "_type": "dataset" } }, { "match": { "_id": "%d" } } ] } } }
92
      """;
93

94
  private Response performRequest(Request request) throws IOException {
95
    var response = esClient.performRequest(request);
1✔
96
    var status = response.getStatusLine().getStatusCode();
1✔
97
    if (status != 200) {
1✔
98
      throw new IOException("Invalid Elasticsearch query");
1✔
99
    }
100
    var body = new String(response.getEntity().getContent().readAllBytes(),
1✔
101
        StandardCharsets.UTF_8);
102
    return Response.status(status).entity(body).build();
1✔
103
  }
104

105
  public Response indexDatasetTerms(List<DatasetTerm> datasets, User user) throws IOException {
106
    List<String> bulkApiCall = new ArrayList<>();
1✔
107

108
    datasets.forEach(dsTerm -> {
1✔
109
      bulkApiCall.add(BULK_HEADER.formatted(dsTerm.getDatasetId()));
1✔
110
      bulkApiCall.add(GsonUtil.getInstance().toJson(dsTerm) + "\n");
1✔
111
      updateDatasetIndexDate(dsTerm.getDatasetId(), user.getUserId(), Instant.now());
1✔
112
    });
1✔
113

114
    Request bulkRequest = new Request(
1✔
115
        HttpMethod.PUT,
116
        "/" + esConfig.getDatasetIndexName() + "/_bulk");
1✔
117

118
    bulkRequest.setEntity(new NStringEntity(
1✔
119
        String.join("", bulkApiCall) + "\n",
1✔
120
        ContentType.APPLICATION_JSON));
121

122
    return performRequest(bulkRequest);
1✔
123
  }
124

125
  public Response deleteIndex(Integer datasetId, Integer userId) throws IOException {
126
    Request deleteRequest = new Request(
×
127
        HttpMethod.POST,
128
        "/" + esConfig.getDatasetIndexName() + "/_delete_by_query");
×
129
    deleteRequest.setEntity(new NStringEntity(
×
130
        DELETE_QUERY.formatted(datasetId),
×
131
        ContentType.APPLICATION_JSON));
132
    updateDatasetIndexDate(datasetId, userId, null);
×
133
    return performRequest(deleteRequest);
×
134
  }
135

136
  public boolean validateQuery(String query) throws IOException {
137
    // Remove `size` and `from` parameters from query, otherwise validation will fail
138
    var modifiedQuery = query
1✔
139
        .replaceAll("\"size\": ?\\d+,?", "")
1✔
140
        .replaceAll("\"from\": ?\\d+,?", "");
1✔
141

142
    Request validateRequest = new Request(
1✔
143
        HttpMethod.GET,
144
        "/" + esConfig.getDatasetIndexName() + "/_validate/query");
1✔
145
    validateRequest.setEntity(new NStringEntity(modifiedQuery, ContentType.APPLICATION_JSON));
1✔
146
    Response response = performRequest(validateRequest);
1✔
147

148
    var entity = response.getEntity().toString();
1✔
149
    var json = GsonUtil.getInstance().fromJson(entity, Map.class);
1✔
150

151
    return (boolean) json.get("valid");
1✔
152
  }
153

154
  public Response searchDatasets(String query) throws IOException {
155
    if (!validateQuery(query)) {
1✔
156
      throw new IOException("Invalid Elasticsearch query");
×
157
    }
158

159
    Request searchRequest = new Request(
1✔
160
        HttpMethod.GET,
161
        "/" + esConfig.getDatasetIndexName() + "/_search");
1✔
162
    searchRequest.setEntity(new NStringEntity(query, ContentType.APPLICATION_JSON));
1✔
163

164
    Response response = performRequest(searchRequest);
1✔
165

166
    var entity = response.getEntity().toString();
1✔
167
    var json = GsonUtil.getInstance().fromJson(entity, ElasticSearchHits.class);
1✔
168
    var hits = json.getHits();
1✔
169

170
    return Response.ok().entity(hits).build();
1✔
171
  }
172

173
  public StudyTerm toStudyTerm(Study study) {
174
    if (Objects.isNull(study)) {
1✔
175
      return null;
×
176
    }
177

178
    StudyTerm term = new StudyTerm();
1✔
179

180
    term.setDescription(study.getDescription());
1✔
181
    term.setStudyName(study.getName());
1✔
182
    term.setStudyId(study.getStudyId());
1✔
183
    term.setDataTypes(study.getDataTypes());
1✔
184
    term.setPiName(study.getPiName());
1✔
185
    term.setPublicVisibility(study.getPublicVisibility());
1✔
186

187
    findStudyProperty(
1✔
188
        study.getProperties(), "dbGaPPhsID"
1✔
189
    ).ifPresent(
1✔
190
        prop -> term.setPhsId(prop.getValue().toString())
1✔
191
    );
192

193
    findStudyProperty(
1✔
194
        study.getProperties(), "phenotypeIndication"
1✔
195
    ).ifPresent(
1✔
196
        prop -> term.setPhenotype(prop.getValue().toString())
1✔
197
    );
198

199
    findStudyProperty(
1✔
200
        study.getProperties(), "species"
1✔
201
    ).ifPresent(
1✔
202
        prop -> term.setSpecies(prop.getValue().toString())
1✔
203
    );
204

205
    findStudyProperty(
1✔
206
        study.getProperties(), "dataCustodianEmail"
1✔
207
    ).ifPresent(
1✔
208
        prop -> {
209
          JsonArray jsonArray = (JsonArray) prop.getValue();
1✔
210
          List<String> dataCustodianEmail = new ArrayList<>();
1✔
211
          jsonArray.forEach(email -> dataCustodianEmail.add(email.getAsString()));
1✔
212
          term.setDataCustodianEmail(dataCustodianEmail);
1✔
213
        }
1✔
214
    );
215

216
    if (Objects.nonNull(study.getCreateUserId())) {
1✔
217
      term.setDataSubmitterId(study.getCreateUserId());
1✔
218
      User user = userDAO.findUserById(study.getCreateUserId());
1✔
219
      if (Objects.nonNull(user)) {
1✔
220
        study.setCreateUserEmail(user.getEmail());
1✔
221
      }
222
    }
223

224
    if (Objects.nonNull(study.getCreateUserEmail())) {
1✔
225
      term.setDataSubmitterEmail(study.getCreateUserEmail());
1✔
226
    }
227

228
    return term;
1✔
229
  }
230

231
  public UserTerm toUserTerm(User user) {
232
    if (Objects.isNull(user)) {
1✔
233
      return null;
×
234
    }
235
    InstitutionTerm institution = (Objects.nonNull(user.getInstitutionId())) ?
1✔
236
        toInstitutionTerm(institutionDAO.findInstitutionById(user.getInstitutionId())) :
1✔
237
        null;
1✔
238
    return new UserTerm(user.getUserId(), user.getDisplayName(), institution);
1✔
239
  }
240

241
  public DacTerm toDacTerm(Dac dac) {
242
    if (Objects.isNull(dac)) {
1✔
243
      return null;
1✔
244
    }
245
    return new DacTerm(dac.getDacId(), dac.getName(), dac.getEmail());
1✔
246
  }
247

248
  public InstitutionTerm toInstitutionTerm(Institution institution) {
249
    if (Objects.isNull(institution)) {
1✔
250
      return null;
1✔
251
    }
252
    return new InstitutionTerm(institution.getId(), institution.getName());
1✔
253
  }
254

255
  /**
256
   * Synchronize the dataset in the ES index. This will only index the dataset if it has been
257
   * previously indexed, UNLESS the force argument is true which means it will index the dataset
258
   * and update the dataset's last indexed date value.
259
   *
260
   * @param dataset The Dataset
261
   * @param user    The User
262
   * @param force   Boolean to force the index update regardless of dataset's indexed date status.
263
   */
264
  public void synchronizeDatasetInESIndex(Dataset dataset, User user, boolean force) {
265
    if (force || dataset.getIndexedDate() != null) {
×
NEW
266
      try (var response = indexDataset(dataset.getDatasetId(), user)) {
×
267
        if (!HttpStatusCodes.isSuccess(response.getStatus())) {
×
268
          logWarn("Response error, unable to index dataset: %s".formatted(dataset.getDatasetId()));
×
269
        }
270
      } catch (IOException e) {
×
271
        logWarn("Exception, unable to index dataset: %s".formatted(dataset.getDatasetId()));
×
272
      }
×
273
    }
274
  }
×
275

276
  public Response indexDataset(Integer datasetId, User user) throws IOException {
277
    return indexDatasets(List.of(datasetId), user);
1✔
278
  }
279

280
  public Response indexDatasets(List<Integer> datasetIds, User user) throws IOException {
281
    // Datasets in list context may not have their study populated, so we need to ensure that is
282
    // true before trying to index them in ES.
283
    List<DatasetTerm> datasetTerms = datasetIds.stream()
1✔
284
        .map(datasetDAO::findDatasetById)
1✔
285
        .map(this::toDatasetTerm)
1✔
286
        .toList();
1✔
287
    return indexDatasetTerms(datasetTerms, user);
1✔
288
  }
289

290
  /**
291
   * Sequentially index datasets to ElasticSearch by ID list. Note that this is intended for large
292
   * lists of dataset ids. For small sets of datasets (i.e. <~25), it is efficient to index them in
293
   * bulk using the {@link #indexDatasets(List, User)} method.
294
   *
295
   * @param datasetIds List of Dataset IDs to index
296
   * @return StreamingOutput of ElasticSearch responses from indexing datasets
297
   */
298
  public StreamingOutput indexDatasetIds(List<Integer> datasetIds, User user) {
299
    Integer lastDatasetId = datasetIds.get(datasetIds.size() - 1);
1✔
300
    return output -> {
1✔
301
      output.write("[".getBytes());
1✔
302
      datasetIds.forEach(id -> {
1✔
303
        try (Response response = indexDataset(id, user)) {
1✔
304
          output.write(response.getEntity().toString().getBytes());
1✔
305
          if (!id.equals(lastDatasetId)) {
1✔
306
            output.write(",".getBytes());
×
307
          }
308
          output.write("\n".getBytes());
1✔
309
        } catch (IOException e) {
1✔
310
          logException("Error indexing dataset term for dataset id: %d ".formatted(id), e);
1✔
311
        }
1✔
312
      });
1✔
313
      output.write("]".getBytes());
1✔
314
    };
1✔
315
  }
316

317
  public Response indexStudy(Integer studyId, User user) {
318
    Study study = studyDAO.findStudyById(studyId);
1✔
319
    // The dao call above does not populate its datasets so we need to check for datasetIds
320
    if (study != null && !study.getDatasetIds().isEmpty()) {
1✔
NEW
321
      try (Response response = indexDatasets(study.getDatasetIds().stream().toList(), user)) {
×
UNCOV
322
        return response;
×
323
      } catch (Exception e) {
1✔
324
        logException(String.format("Failed to index datasets for study id: %d", studyId), e);
1✔
325
        return Response.status(Status.INTERNAL_SERVER_ERROR).build();
1✔
326
      }
327
    }
328
    return Response.status(Status.NOT_FOUND).build();
1✔
329
  }
330

331
  public DatasetTerm toDatasetTerm(Dataset dataset) {
332
    if (Objects.isNull(dataset)) {
1✔
333
      return null;
1✔
334
    }
335

336
    DatasetTerm term = new DatasetTerm();
1✔
337

338
    term.setDatasetId(dataset.getDatasetId());
1✔
339
    Optional.ofNullable(dataset.getCreateUserId()).ifPresent(userId -> {
1✔
340
      User user = userDAO.findUserById(dataset.getCreateUserId());
1✔
341
      term.setCreateUserId(dataset.getCreateUserId());
1✔
342
      term.setCreateUserDisplayName(user.getDisplayName());
1✔
343
      term.setSubmitter(toUserTerm(user));
1✔
344
    });
1✔
345
    Optional.ofNullable(dataset.getUpdateUserId())
1✔
346
        .map(userDAO::findUserById)
1✔
347
        .map(this::toUserTerm)
1✔
348
        .ifPresent(term::setUpdateUser);
1✔
349
    term.setDatasetIdentifier(dataset.getDatasetIdentifier());
1✔
350
    term.setDeletable(dataset.getDeletable());
1✔
351
    term.setDatasetName(dataset.getName());
1✔
352

353
    if (Objects.nonNull(dataset.getStudy())) {
1✔
354
      term.setStudy(toStudyTerm(dataset.getStudy()));
1✔
355
    }
356

357
    Optional.ofNullable(dataset.getDacId()).ifPresent(dacId -> {
1✔
358
      Dac dac = dacDAO.findById(dataset.getDacId());
1✔
359
      term.setDacId(dataset.getDacId());
1✔
360
      if (Objects.nonNull(dataset.getDacApproval())) {
1✔
361
        term.setDacApproval(dataset.getDacApproval());
1✔
362
      }
363
      term.setDac(toDacTerm(dac));
1✔
364
    });
1✔
365

366
    List<Integer> approvedUserIds = dataAccessRequestDAO
1✔
367
        .findApprovedDARsByDatasetId(dataset.getDatasetId())
1✔
368
        .stream()
1✔
369
        .map(DataAccessRequest::getUserId)
1✔
370
        .toList();
1✔
371

372
    if (!approvedUserIds.isEmpty()) {
1✔
373
      term.setApprovedUserIds(approvedUserIds);
1✔
374
    }
375

376
    if (Objects.nonNull(dataset.getDataUse())) {
1✔
377
      DataUseSummary summary = ontologyService.translateDataUseSummary(dataset.getDataUse());
1✔
378
      if (summary != null) {
1✔
379
        term.setDataUse(summary);
1✔
380
      } else {
381
        logWarn("No data use summary for dataset id: %d".formatted(dataset.getDatasetId()));
1✔
382
      }
383
    }
384

385
    findDatasetProperty(
1✔
386
        dataset.getProperties(), "accessManagement"
1✔
387
    ).ifPresent(
1✔
388
        datasetProperty -> term.setAccessManagement(datasetProperty.getPropertyValueAsString())
1✔
389
    );
390

391
    findFirstDatasetPropertyByName(
1✔
392
        dataset.getProperties(), "# of participants"
1✔
393
    ).ifPresent(
1✔
394
        datasetProperty -> {
395
          String value = datasetProperty.getPropertyValueAsString();
1✔
396
          try {
397
            term.setParticipantCount(Integer.valueOf(value));
1✔
398
          } catch (NumberFormatException e) {
1✔
399
            logWarn(String.format("Unable to coerce participant count to integer: %s for dataset: %s", value, dataset.getDatasetIdentifier()));
1✔
400
          }
1✔
401
        }
1✔
402
    );
403

404
    findDatasetProperty(
1✔
405
        dataset.getProperties(), "url"
1✔
406
    ).ifPresent(
1✔
407
        datasetProperty -> term.setUrl(datasetProperty.getPropertyValueAsString())
1✔
408
    );
409

410
    findDatasetProperty(
1✔
411
        dataset.getProperties(), "dataLocation"
1✔
412
    ).ifPresent(
1✔
413
        datasetProperty -> term.setDataLocation(datasetProperty.getPropertyValueAsString())
1✔
414
    );
415

416
    return term;
1✔
417
  }
418

419
  protected void updateDatasetIndexDate(Integer datasetId, Integer userId, Instant indexDate)
420
      {
421
    // It is possible that a dataset has been deleted. If so, we don't want to try and update it.
422
    Dataset dataset = datasetDAO.findDatasetById(datasetId);
1✔
423
    if (dataset != null) {
1✔
424
      try {
425
        datasetServiceDAO.updateDatasetIndex(datasetId, userId, indexDate);
1✔
426
      } catch (SQLException e) {
×
427
        // We don't want to send these to Sentry, but we do want to log them for follow up off cycle
428
        logWarn("Error updating dataset indexed date for dataset id: %d ".formatted(datasetId), e);
×
429
      }
1✔
430
    }
431
  }
1✔
432

433
  Optional<DatasetProperty> findDatasetProperty(Collection<DatasetProperty> props,
434
      String schemaProp) {
435
    return
1✔
436
        (props == null) ? Optional.empty() : props
1✔
437
            .stream()
1✔
438
            .filter(p -> Objects.nonNull(p.getSchemaProperty()))
1✔
439
            .filter(p -> p.getSchemaProperty().equals(schemaProp))
1✔
440
            .findFirst();
1✔
441
  }
442

443
  Optional<DatasetProperty> findFirstDatasetPropertyByName(Collection<DatasetProperty> props,
444
      String propertyName) {
445
    return
1✔
446
        (props == null) ? Optional.empty(): props
1✔
447
            .stream()
1✔
448
            .filter(p -> p.getPropertyName().equalsIgnoreCase(propertyName))
1✔
449
            .findFirst();
1✔
450
  }
451

452
  Optional<StudyProperty> findStudyProperty(Collection<StudyProperty> props, String key) {
453
    return
1✔
454
        (props == null) ? Optional.empty() : props
1✔
455
            .stream()
1✔
456
            .filter(p -> p.getKey().equals(key))
1✔
457
            .findFirst();
1✔
458
  }
459

460
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc