• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DataBiosphere / consent / #6333

15 Aug 2025 12:17PM UTC coverage: 83.37% (+0.03%) from 83.338%
#6333

push

web-flow
DT-2091: Update DAC Approve Dataset performance (#2636)

Co-authored-by: otchet-broad <111771148+otchet-broad@users.noreply.github.com>

83 of 105 new or added lines in 5 files covered. (79.05%)

4 existing lines in 1 file now uncovered.

10949 of 13133 relevant lines covered (83.37%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.47
/src/main/java/org/broadinstitute/consent/http/service/ElasticSearchService.java
1
package org.broadinstitute.consent.http.service;
2

3
import com.google.api.client.http.HttpStatusCodes;
4
import com.google.common.util.concurrent.FutureCallback;
5
import com.google.common.util.concurrent.Futures;
6
import com.google.common.util.concurrent.ListenableFuture;
7
import com.google.common.util.concurrent.ListeningExecutorService;
8
import com.google.common.util.concurrent.MoreExecutors;
9
import com.google.gson.JsonArray;
10
import jakarta.ws.rs.HttpMethod;
11
import jakarta.ws.rs.core.Response;
12
import jakarta.ws.rs.core.Response.Status;
13
import jakarta.ws.rs.core.StreamingOutput;
14
import java.io.IOException;
15
import java.nio.charset.StandardCharsets;
16
import java.sql.SQLException;
17
import java.time.Instant;
18
import java.util.ArrayList;
19
import java.util.Collection;
20
import java.util.List;
21
import java.util.Map;
22
import java.util.Objects;
23
import java.util.Optional;
24
import java.util.concurrent.ExecutorService;
25
import org.apache.http.entity.ContentType;
26
import org.apache.http.nio.entity.NStringEntity;
27
import org.broadinstitute.consent.http.configurations.ElasticSearchConfiguration;
28
import org.broadinstitute.consent.http.db.DacDAO;
29
import org.broadinstitute.consent.http.db.DataAccessRequestDAO;
30
import org.broadinstitute.consent.http.db.DatasetDAO;
31
import org.broadinstitute.consent.http.db.InstitutionDAO;
32
import org.broadinstitute.consent.http.db.LibraryCardDAO;
33
import org.broadinstitute.consent.http.db.StudyDAO;
34
import org.broadinstitute.consent.http.db.UserDAO;
35
import org.broadinstitute.consent.http.models.Dac;
36
import org.broadinstitute.consent.http.models.DataAccessRequest;
37
import org.broadinstitute.consent.http.models.Dataset;
38
import org.broadinstitute.consent.http.models.DatasetProperty;
39
import org.broadinstitute.consent.http.models.Institution;
40
import org.broadinstitute.consent.http.models.LibraryCard;
41
import org.broadinstitute.consent.http.models.Study;
42
import org.broadinstitute.consent.http.models.StudyProperty;
43
import org.broadinstitute.consent.http.models.User;
44
import org.broadinstitute.consent.http.models.elastic_search.DacTerm;
45
import org.broadinstitute.consent.http.models.elastic_search.DatasetTerm;
46
import org.broadinstitute.consent.http.models.elastic_search.ElasticSearchHits;
47
import org.broadinstitute.consent.http.models.elastic_search.InstitutionTerm;
48
import org.broadinstitute.consent.http.models.elastic_search.StudyTerm;
49
import org.broadinstitute.consent.http.models.elastic_search.UserTerm;
50
import org.broadinstitute.consent.http.models.ontology.DataUseSummary;
51
import org.broadinstitute.consent.http.service.dao.DatasetServiceDAO;
52
import org.broadinstitute.consent.http.util.ConsentLogger;
53
import org.broadinstitute.consent.http.util.ThreadUtils;
54
import org.broadinstitute.consent.http.util.gson.GsonUtil;
55
import org.elasticsearch.client.Request;
56
import org.elasticsearch.client.RestClient;
57

58
public class ElasticSearchService implements ConsentLogger {
59

60
  private final ExecutorService executorService = new ThreadUtils().getExecutorService(
1✔
61
      ElasticSearchService.class);
62
  private final RestClient esClient;
63
  private final ElasticSearchConfiguration esConfig;
64
  private final DacDAO dacDAO;
65
  private final DataAccessRequestDAO dataAccessRequestDAO;
66
  private final UserDAO userDAO;
67
  private final OntologyService ontologyService;
68
  private final InstitutionDAO institutionDAO;
69
  private final DatasetDAO datasetDAO;
70
  private final DatasetServiceDAO datasetServiceDAO;
71
  private final StudyDAO studyDAO;
72
  private final LibraryCardDAO libraryCardDAO;
73

74
  public ElasticSearchService(
75
      RestClient esClient,
76
      ElasticSearchConfiguration esConfig,
77
      DacDAO dacDAO,
78
      DataAccessRequestDAO dataAccessRequestDAO,
79
      UserDAO userDao,
80
      OntologyService ontologyService,
81
      InstitutionDAO institutionDAO,
82
      DatasetDAO datasetDAO,
83
      DatasetServiceDAO datasetServiceDAO,
84
      StudyDAO studyDAO,
85
      LibraryCardDAO libraryCardDAO) {
1✔
86
    this.esClient = esClient;
1✔
87
    this.esConfig = esConfig;
1✔
88
    this.dacDAO = dacDAO;
1✔
89
    this.dataAccessRequestDAO = dataAccessRequestDAO;
1✔
90
    this.userDAO = userDao;
1✔
91
    this.ontologyService = ontologyService;
1✔
92
    this.institutionDAO = institutionDAO;
1✔
93
    this.datasetDAO = datasetDAO;
1✔
94
    this.datasetServiceDAO = datasetServiceDAO;
1✔
95
    this.studyDAO = studyDAO;
1✔
96
    this.libraryCardDAO = libraryCardDAO;
1✔
97
  }
1✔
98

99

100
  private static final String BULK_HEADER = """
101
      { "index": {"_type": "dataset", "_id": "%d"} }
102
      """;
103

104
  private static final String DELETE_QUERY = """
105
      { "query": { "bool": { "must": [ { "match": { "_type": "dataset" } }, { "match": { "_id": "%d" } } ] } } }
106
      """;
107

108
  private Response performRequest(Request request) throws IOException {
109
    var response = esClient.performRequest(request);
1✔
110
    var status = response.getStatusLine().getStatusCode();
1✔
111
    if (status != 200) {
1✔
112
      throw new IOException("Invalid Elasticsearch query");
1✔
113
    }
114
    var body = new String(response.getEntity().getContent().readAllBytes(),
1✔
115
        StandardCharsets.UTF_8);
116
    return Response.status(status).entity(body).build();
1✔
117
  }
118

119
  public Response indexDatasetTerms(List<DatasetTerm> datasets, User user) throws IOException {
120
    List<String> bulkApiCall = new ArrayList<>();
1✔
121

122
    datasets.forEach(dsTerm -> {
1✔
123
      bulkApiCall.add(BULK_HEADER.formatted(dsTerm.getDatasetId()));
1✔
124
      bulkApiCall.add(GsonUtil.getInstance().toJson(dsTerm) + "\n");
1✔
125
      updateDatasetIndexDate(dsTerm.getDatasetId(), user.getUserId(), Instant.now());
1✔
126
    });
1✔
127

128
    Request bulkRequest = new Request(
1✔
129
        HttpMethod.PUT,
130
        "/" + esConfig.getDatasetIndexName() + "/_bulk");
1✔
131

132
    bulkRequest.setEntity(new NStringEntity(
1✔
133
        String.join("", bulkApiCall) + "\n",
1✔
134
        ContentType.APPLICATION_JSON));
135

136
    return performRequest(bulkRequest);
1✔
137
  }
138

139
  public Response deleteIndex(Integer datasetId, Integer userId) throws IOException {
140
    Request deleteRequest = new Request(
×
141
        HttpMethod.POST,
142
        "/" + esConfig.getDatasetIndexName() + "/_delete_by_query");
×
143
    deleteRequest.setEntity(new NStringEntity(
×
144
        DELETE_QUERY.formatted(datasetId),
×
145
        ContentType.APPLICATION_JSON));
146
    updateDatasetIndexDate(datasetId, userId, null);
×
147
    return performRequest(deleteRequest);
×
148
  }
149

150
  public boolean validateQuery(String query) throws IOException {
151
    // Remove `size` and `from` parameters from query, otherwise validation will fail
152
    var modifiedQuery = query
1✔
153
        .replaceAll("\"size\": ?\\d+,?", "")
1✔
154
        .replaceAll("\"from\": ?\\d+,?", "");
1✔
155

156
    Request validateRequest = new Request(
1✔
157
        HttpMethod.GET,
158
        "/" + esConfig.getDatasetIndexName() + "/_validate/query");
1✔
159
    validateRequest.setEntity(new NStringEntity(modifiedQuery, ContentType.APPLICATION_JSON));
1✔
160
    Response response = performRequest(validateRequest);
1✔
161

162
    var entity = response.getEntity().toString();
1✔
163
    var json = GsonUtil.getInstance().fromJson(entity, Map.class);
1✔
164

165
    return (boolean) json.get("valid");
1✔
166
  }
167

168
  public Response searchDatasets(String query) throws IOException {
169
    if (!validateQuery(query)) {
1✔
170
      throw new IOException("Invalid Elasticsearch query");
×
171
    }
172

173
    Request searchRequest = new Request(
1✔
174
        HttpMethod.GET,
175
        "/" + esConfig.getDatasetIndexName() + "/_search");
1✔
176
    searchRequest.setEntity(new NStringEntity(query, ContentType.APPLICATION_JSON));
1✔
177

178
    Response response = performRequest(searchRequest);
1✔
179

180
    var entity = response.getEntity().toString();
1✔
181
    var json = GsonUtil.getInstance().fromJson(entity, ElasticSearchHits.class);
1✔
182
    var hits = json.getHits();
1✔
183

184
    return Response.ok().entity(hits).build();
1✔
185
  }
186

187
  public StudyTerm toStudyTerm(Study study) {
188
    if (Objects.isNull(study)) {
1✔
189
      return null;
×
190
    }
191

192
    StudyTerm term = new StudyTerm();
1✔
193

194
    term.setDescription(study.getDescription());
1✔
195
    term.setStudyName(study.getName());
1✔
196
    term.setStudyId(study.getStudyId());
1✔
197
    term.setDataTypes(study.getDataTypes());
1✔
198
    term.setPiName(study.getPiName());
1✔
199
    term.setPublicVisibility(study.getPublicVisibility());
1✔
200

201
    findStudyProperty(
1✔
202
        study.getProperties(), "dbGaPPhsID"
1✔
203
    ).ifPresent(
1✔
204
        prop -> term.setPhsId(prop.getValue().toString())
1✔
205
    );
206

207
    findStudyProperty(
1✔
208
        study.getProperties(), "phenotypeIndication"
1✔
209
    ).ifPresent(
1✔
210
        prop -> term.setPhenotype(prop.getValue().toString())
1✔
211
    );
212

213
    findStudyProperty(
1✔
214
        study.getProperties(), "species"
1✔
215
    ).ifPresent(
1✔
216
        prop -> term.setSpecies(prop.getValue().toString())
1✔
217
    );
218

219
    findStudyProperty(
1✔
220
        study.getProperties(), "dataCustodianEmail"
1✔
221
    ).ifPresent(
1✔
222
        prop -> {
223
          JsonArray jsonArray = (JsonArray) prop.getValue();
1✔
224
          List<String> dataCustodianEmail = new ArrayList<>();
1✔
225
          jsonArray.forEach(email -> dataCustodianEmail.add(email.getAsString()));
1✔
226
          term.setDataCustodianEmail(dataCustodianEmail);
1✔
227
        }
1✔
228
    );
229

230
    if (Objects.nonNull(study.getCreateUserId())) {
1✔
231
      term.setDataSubmitterId(study.getCreateUserId());
1✔
232
      User user = userDAO.findUserById(study.getCreateUserId());
1✔
233
      if (Objects.nonNull(user)) {
1✔
234
        study.setCreateUserEmail(user.getEmail());
1✔
235
      }
236
    }
237

238
    if (Objects.nonNull(study.getCreateUserEmail())) {
1✔
239
      term.setDataSubmitterEmail(study.getCreateUserEmail());
1✔
240
    }
241

242
    return term;
1✔
243
  }
244

245
  public UserTerm toUserTerm(User user) {
246
    if (Objects.isNull(user)) {
1✔
247
      return null;
×
248
    }
249
    InstitutionTerm institution = (Objects.nonNull(user.getInstitutionId())) ?
1✔
250
        toInstitutionTerm(institutionDAO.findInstitutionById(user.getInstitutionId())) :
1✔
251
        null;
1✔
252
    return new UserTerm(user.getUserId(), user.getDisplayName(), institution);
1✔
253
  }
254

255
  public DacTerm toDacTerm(Dac dac) {
256
    if (Objects.isNull(dac)) {
1✔
257
      return null;
1✔
258
    }
259
    return new DacTerm(dac.getDacId(), dac.getName(), dac.getEmail());
1✔
260
  }
261

262
  public InstitutionTerm toInstitutionTerm(Institution institution) {
263
    if (Objects.isNull(institution)) {
1✔
264
      return null;
1✔
265
    }
266
    return new InstitutionTerm(institution.getId(), institution.getName());
1✔
267
  }
268

269
  public void asyncDatasetInESIndex(Integer datasetId, User user, boolean force) {
270
    ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(
1✔
271
        executorService);
272
    ListenableFuture<Dataset> syncFuture =
1✔
273
        listeningExecutorService.submit(() -> {
1✔
274
          Dataset dataset = datasetDAO.findDatasetById(datasetId);
1✔
NEW
275
          synchronizeDatasetInESIndex(dataset, user, force);
×
NEW
276
          return dataset;
×
277
        });
278
    Futures.addCallback(
1✔
279
        syncFuture,
280
        new FutureCallback<>() {
1✔
281
          @Override
282
          public void onSuccess(Dataset d) {
NEW
283
            logInfo("Successfully synchronized dataset in ES index: %s".formatted(
×
NEW
284
                d.getDatasetIdentifier()));
×
NEW
285
          }
×
286

287
          @Override
288
          public void onFailure(Throwable t) {
289
            logWarn("Failed to synchronize dataset in ES index: %s".formatted(datasetId) + ": "
1✔
290
                + t.getMessage());
1✔
291
          }
1✔
292
        },
293
        listeningExecutorService
294
    );
295
  }
1✔
296

297
  /**
298
   * Synchronize the dataset in the ES index. This will only index the dataset if it has been
299
   * previously indexed, UNLESS the force argument is true which means it will index the dataset and
300
   * update the dataset's last indexed date value.
301
   *
302
   * @param dataset The Dataset
303
   * @param user    The User
304
   * @param force   Boolean to force the index update regardless of dataset's indexed date status.
305
   */
306
  public void synchronizeDatasetInESIndex(Dataset dataset, User user, boolean force) {
307
    if (force || dataset.getIndexedDate() != null) {
1✔
308
      try (var response = indexDataset(dataset.getDatasetId(), user)) {
×
309
        if (!HttpStatusCodes.isSuccess(response.getStatus())) {
×
310
          logWarn("Response error, unable to index dataset: %s".formatted(dataset.getDatasetId()));
×
311
        }
312
      } catch (IOException e) {
×
313
        logWarn("Exception, unable to index dataset: %s".formatted(dataset.getDatasetId()));
×
314
      }
×
315
    }
316
  }
×
317

318
  public Response indexDataset(Integer datasetId, User user) throws IOException {
319
    return indexDatasets(List.of(datasetId), user);
1✔
320
  }
321

322
  public Response indexDatasets(List<Integer> datasetIds, User user) throws IOException {
323
    // Datasets in list context may not have their study populated, so we need to ensure that is
324
    // true before trying to index them in ES.
325
    List<DatasetTerm> datasetTerms = datasetIds.stream()
1✔
326
        .map(datasetDAO::findDatasetById)
1✔
327
        .map(this::toDatasetTerm)
1✔
328
        .toList();
1✔
329
    return indexDatasetTerms(datasetTerms, user);
1✔
330
  }
331

332
  /**
333
   * Sequentially index datasets to ElasticSearch by ID list. Note that this is intended for large
334
   * lists of dataset ids. For small sets of datasets (i.e. <~25), it is efficient to index them in
335
   * bulk using the {@link #indexDatasets(List, User)} method.
336
   *
337
   * @param datasetIds List of Dataset IDs to index
338
   * @return StreamingOutput of ElasticSearch responses from indexing datasets
339
   */
340
  public StreamingOutput indexDatasetIds(List<Integer> datasetIds, User user) {
341
    Integer lastDatasetId = datasetIds.get(datasetIds.size() - 1);
1✔
342
    return output -> {
1✔
343
      output.write("[".getBytes());
1✔
344
      datasetIds.forEach(id -> {
1✔
345
        try (Response response = indexDataset(id, user)) {
1✔
346
          output.write(response.getEntity().toString().getBytes());
1✔
347
          if (!id.equals(lastDatasetId)) {
1✔
348
            output.write(",".getBytes());
×
349
          }
350
          output.write("\n".getBytes());
1✔
351
        } catch (IOException e) {
1✔
352
          logException("Error indexing dataset term for dataset id: %d ".formatted(id), e);
1✔
353
        }
1✔
354
      });
1✔
355
      output.write("]".getBytes());
1✔
356
    };
1✔
357
  }
358

359
  public Response indexStudy(Integer studyId, User user) {
360
    Study study = studyDAO.findStudyById(studyId);
1✔
361
    // The dao call above does not populate its datasets so we need to check for datasetIds
362
    if (study != null && !study.getDatasetIds().isEmpty()) {
1✔
363
      try (Response response = indexDatasets(study.getDatasetIds().stream().toList(), user)) {
×
364
        return response;
×
365
      } catch (Exception e) {
1✔
366
        logException(String.format("Failed to index datasets for study id: %d", studyId), e);
1✔
367
        return Response.status(Status.INTERNAL_SERVER_ERROR).build();
1✔
368
      }
369
    }
370
    return Response.status(Status.NOT_FOUND).build();
1✔
371
  }
372

373
  public DatasetTerm toDatasetTerm(Dataset dataset) {
374
    if (Objects.isNull(dataset)) {
1✔
375
      return null;
1✔
376
    }
377

378
    DatasetTerm term = new DatasetTerm();
1✔
379

380
    term.setDatasetId(dataset.getDatasetId());
1✔
381
    Optional.ofNullable(dataset.getCreateUserId()).ifPresent(userId -> {
1✔
382
      User user = userDAO.findUserById(dataset.getCreateUserId());
1✔
383
      term.setCreateUserId(dataset.getCreateUserId());
1✔
384
      term.setCreateUserDisplayName(user.getDisplayName());
1✔
385
      term.setSubmitter(toUserTerm(user));
1✔
386
    });
1✔
387
    Optional.ofNullable(dataset.getUpdateUserId())
1✔
388
        .map(userDAO::findUserById)
1✔
389
        .map(this::toUserTerm)
1✔
390
        .ifPresent(term::setUpdateUser);
1✔
391
    term.setDatasetIdentifier(dataset.getDatasetIdentifier());
1✔
392
    term.setDeletable(dataset.getDeletable());
1✔
393
    term.setDatasetName(dataset.getName());
1✔
394

395
    if (Objects.nonNull(dataset.getStudy())) {
1✔
396
      term.setStudy(toStudyTerm(dataset.getStudy()));
1✔
397
    }
398

399
    Optional.ofNullable(dataset.getDacId()).ifPresent(dacId -> {
1✔
400
      Dac dac = dacDAO.findById(dataset.getDacId());
1✔
401
      term.setDacId(dataset.getDacId());
1✔
402
      if (Objects.nonNull(dataset.getDacApproval())) {
1✔
403
        term.setDacApproval(dataset.getDacApproval());
1✔
404
      }
405
      term.setDac(toDacTerm(dac));
1✔
406
    });
1✔
407

408
    List<Integer> approvedUserIds = dataAccessRequestDAO
1✔
409
        .findApprovedDARsByDatasetId(dataset.getDatasetId())
1✔
410
        .stream()
1✔
411
        .map(DataAccessRequest::getUserId)
1✔
412
        .toList();
1✔
413

414
    if (!approvedUserIds.isEmpty()) {
1✔
415
      List<Integer> approvedLCUserIds = libraryCardDAO.findLibraryCardsByUserIds(approvedUserIds)
1✔
416
          .stream()
1✔
417
          .map(LibraryCard::getUserId)
1✔
418
          .toList();
1✔
419
      term.setApprovedUserIds(approvedLCUserIds);
1✔
420
    }
421

422
    if (Objects.nonNull(dataset.getDataUse())) {
1✔
423
      DataUseSummary summary = ontologyService.translateDataUseSummary(dataset.getDataUse());
1✔
424
      if (summary != null) {
1✔
425
        term.setDataUse(summary);
1✔
426
      } else {
427
        logWarn("No data use summary for dataset id: %d".formatted(dataset.getDatasetId()));
1✔
428
      }
429
    }
430

431
    findDatasetProperty(
1✔
432
        dataset.getProperties(), "accessManagement"
1✔
433
    ).ifPresent(
1✔
434
        datasetProperty -> term.setAccessManagement(datasetProperty.getPropertyValueAsString())
1✔
435
    );
436

437
    findFirstDatasetPropertyByName(
1✔
438
        dataset.getProperties(), "# of participants"
1✔
439
    ).ifPresent(
1✔
440
        datasetProperty -> {
441
          String value = datasetProperty.getPropertyValueAsString();
1✔
442
          try {
443
            term.setParticipantCount(Integer.valueOf(value));
1✔
444
          } catch (NumberFormatException e) {
1✔
445
            logWarn(
1✔
446
                String.format("Unable to coerce participant count to integer: %s for dataset: %s",
1✔
447
                    value, dataset.getDatasetIdentifier()));
1✔
448
          }
1✔
449
        }
1✔
450
    );
451

452
    findDatasetProperty(
1✔
453
        dataset.getProperties(), "url"
1✔
454
    ).ifPresent(
1✔
455
        datasetProperty -> term.setUrl(datasetProperty.getPropertyValueAsString())
1✔
456
    );
457

458
    findDatasetProperty(
1✔
459
        dataset.getProperties(), "dataLocation"
1✔
460
    ).ifPresent(
1✔
461
        datasetProperty -> term.setDataLocation(datasetProperty.getPropertyValueAsString())
1✔
462
    );
463

464
    return term;
1✔
465
  }
466

467
  protected void updateDatasetIndexDate(Integer datasetId, Integer userId, Instant indexDate) {
468
    // It is possible that a dataset has been deleted. If so, we don't want to try and update it.
469
    Dataset dataset = datasetDAO.findDatasetById(datasetId);
1✔
470
    if (dataset != null) {
1✔
471
      try {
472
        datasetServiceDAO.updateDatasetIndex(datasetId, userId, indexDate);
1✔
473
      } catch (SQLException e) {
×
474
        // We don't want to send these to Sentry, but we do want to log them for follow up off cycle
475
        logWarn("Error updating dataset indexed date for dataset id: %d ".formatted(datasetId), e);
×
476
      }
1✔
477
    }
478
  }
1✔
479

480
  Optional<DatasetProperty> findDatasetProperty(Collection<DatasetProperty> props,
481
      String schemaProp) {
482
    return
1✔
483
        (props == null) ? Optional.empty() : props
1✔
484
            .stream()
1✔
485
            .filter(p -> Objects.nonNull(p.getSchemaProperty()))
1✔
486
            .filter(p -> p.getSchemaProperty().equals(schemaProp))
1✔
487
            .findFirst();
1✔
488
  }
489

490
  Optional<DatasetProperty> findFirstDatasetPropertyByName(Collection<DatasetProperty> props,
491
      String propertyName) {
492
    return
1✔
493
        (props == null) ? Optional.empty() : props
1✔
494
            .stream()
1✔
495
            .filter(p -> p.getPropertyName().equalsIgnoreCase(propertyName))
1✔
496
            .findFirst();
1✔
497
  }
498

499
  Optional<StudyProperty> findStudyProperty(Collection<StudyProperty> props, String key) {
500
    return
1✔
501
        (props == null) ? Optional.empty() : props
1✔
502
            .stream()
1✔
503
            .filter(p -> p.getKey().equals(key))
1✔
504
            .findFirst();
1✔
505
  }
506

507
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc