• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DataBiosphere / consent / #5636

08 Apr 2025 02:19PM UTC coverage: 79.133% (-0.1%) from 79.273%
#5636

push

web-flow
DT-1424: Mark datasets as indexed/deindexed when indexing operations are called (#2470)

58 of 83 new or added lines in 10 files covered. (69.88%)

10 existing lines in 3 files now uncovered.

10277 of 12987 relevant lines covered (79.13%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.24
/src/main/java/org/broadinstitute/consent/http/service/ElasticSearchService.java
1
package org.broadinstitute.consent.http.service;
2

3
import com.google.api.client.http.HttpStatusCodes;
4
import com.google.gson.JsonArray;
5
import jakarta.ws.rs.HttpMethod;
6
import jakarta.ws.rs.core.Response;
7
import jakarta.ws.rs.core.Response.Status;
8
import jakarta.ws.rs.core.StreamingOutput;
9
import java.io.IOException;
10
import java.nio.charset.StandardCharsets;
11
import java.sql.SQLException;
12
import java.time.Instant;
13
import java.util.ArrayList;
14
import java.util.Collection;
15
import java.util.List;
16
import java.util.Map;
17
import java.util.Objects;
18
import java.util.Optional;
19
import org.apache.http.entity.ContentType;
20
import org.apache.http.nio.entity.NStringEntity;
21
import org.broadinstitute.consent.http.configurations.ElasticSearchConfiguration;
22
import org.broadinstitute.consent.http.db.DacDAO;
23
import org.broadinstitute.consent.http.db.DataAccessRequestDAO;
24
import org.broadinstitute.consent.http.db.DatasetDAO;
25
import org.broadinstitute.consent.http.db.InstitutionDAO;
26
import org.broadinstitute.consent.http.db.StudyDAO;
27
import org.broadinstitute.consent.http.db.UserDAO;
28
import org.broadinstitute.consent.http.models.Dac;
29
import org.broadinstitute.consent.http.models.DataAccessRequest;
30
import org.broadinstitute.consent.http.models.Dataset;
31
import org.broadinstitute.consent.http.models.DatasetProperty;
32
import org.broadinstitute.consent.http.models.Institution;
33
import org.broadinstitute.consent.http.models.Study;
34
import org.broadinstitute.consent.http.models.StudyProperty;
35
import org.broadinstitute.consent.http.models.User;
36
import org.broadinstitute.consent.http.models.elastic_search.DacTerm;
37
import org.broadinstitute.consent.http.models.elastic_search.DatasetTerm;
38
import org.broadinstitute.consent.http.models.elastic_search.ElasticSearchHits;
39
import org.broadinstitute.consent.http.models.elastic_search.InstitutionTerm;
40
import org.broadinstitute.consent.http.models.elastic_search.StudyTerm;
41
import org.broadinstitute.consent.http.models.elastic_search.UserTerm;
42
import org.broadinstitute.consent.http.models.ontology.DataUseSummary;
43
import org.broadinstitute.consent.http.service.dao.DatasetServiceDAO;
44
import org.broadinstitute.consent.http.util.ConsentLogger;
45
import org.broadinstitute.consent.http.util.gson.GsonUtil;
46
import org.elasticsearch.client.Request;
47
import org.elasticsearch.client.RestClient;
48

49
public class ElasticSearchService implements ConsentLogger {
50

51
  private final RestClient esClient;
52
  private final ElasticSearchConfiguration esConfig;
53
  private final DacDAO dacDAO;
54
  private final DataAccessRequestDAO dataAccessRequestDAO;
55
  private final UserDAO userDAO;
56
  private final OntologyService ontologyService;
57
  private final InstitutionDAO institutionDAO;
58
  private final DatasetDAO datasetDAO;
59
  private final DatasetServiceDAO datasetServiceDAO;
60
  private final StudyDAO studyDAO;
61

62
  public ElasticSearchService(
63
      RestClient esClient,
64
      ElasticSearchConfiguration esConfig,
65
      DacDAO dacDAO,
66
      DataAccessRequestDAO dataAccessRequestDAO,
67
      UserDAO userDao,
68
      OntologyService ontologyService,
69
      InstitutionDAO institutionDAO,
70
      DatasetDAO datasetDAO,
71
      DatasetServiceDAO datasetServiceDAO,
72
      StudyDAO studyDAO) {
1✔
73
    this.esClient = esClient;
1✔
74
    this.esConfig = esConfig;
1✔
75
    this.dacDAO = dacDAO;
1✔
76
    this.dataAccessRequestDAO = dataAccessRequestDAO;
1✔
77
    this.userDAO = userDao;
1✔
78
    this.ontologyService = ontologyService;
1✔
79
    this.institutionDAO = institutionDAO;
1✔
80
    this.datasetDAO = datasetDAO;
1✔
81
    this.datasetServiceDAO = datasetServiceDAO;
1✔
82
    this.studyDAO = studyDAO;
1✔
83
  }
1✔
84

85

86
  private static final String BULK_HEADER = """
87
      { "index": {"_type": "dataset", "_id": "%d"} }
88
      """;
89

90
  private static final String DELETE_QUERY = """
91
      { "query": { "bool": { "must": [ { "match": { "_type": "dataset" } }, { "match": { "_id": "%d" } } ] } } }
92
      """;
93

94
  private Response performRequest(Request request) throws IOException {
95
    var response = esClient.performRequest(request);
1✔
96
    var status = response.getStatusLine().getStatusCode();
1✔
97
    if (status != 200) {
1✔
98
      throw new IOException("Invalid Elasticsearch query");
1✔
99
    }
100
    var body = new String(response.getEntity().getContent().readAllBytes(),
1✔
101
        StandardCharsets.UTF_8);
102
    return Response.status(status).entity(body).build();
1✔
103
  }
104

105
  public Response indexDatasetTerms(List<DatasetTerm> datasets, User user) throws IOException {
106
    List<String> bulkApiCall = new ArrayList<>();
1✔
107

108
    datasets.forEach(dsTerm -> {
1✔
109
      bulkApiCall.add(BULK_HEADER.formatted(dsTerm.getDatasetId()));
1✔
110
      bulkApiCall.add(GsonUtil.getInstance().toJson(dsTerm) + "\n");
1✔
111
      updateDatasetIndexDate(dsTerm.getDatasetId(), user.getUserId(), Instant.now());
1✔
112
    });
1✔
113

114
    Request bulkRequest = new Request(
1✔
115
        HttpMethod.PUT,
116
        "/" + esConfig.getDatasetIndexName() + "/_bulk");
1✔
117

118
    bulkRequest.setEntity(new NStringEntity(
1✔
119
        String.join("", bulkApiCall) + "\n",
1✔
120
        ContentType.APPLICATION_JSON));
121

122
    return performRequest(bulkRequest);
1✔
123
  }
124

125
  public Response deleteIndex(Integer datasetId, Integer userId) throws IOException {
126
    Request deleteRequest = new Request(
×
127
        HttpMethod.POST,
128
        "/" + esConfig.getDatasetIndexName() + "/_delete_by_query");
×
129
    deleteRequest.setEntity(new NStringEntity(
×
NEW
130
        DELETE_QUERY.formatted(datasetId),
×
131
        ContentType.APPLICATION_JSON));
NEW
132
    updateDatasetIndexDate(datasetId, userId, null);
×
UNCOV
133
    return performRequest(deleteRequest);
×
134
  }
135

136
  public boolean validateQuery(String query) throws IOException {
137
    // Remove `size` and `from` parameters from query, otherwise validation will fail
138
    var modifiedQuery = query
1✔
139
        .replaceAll("\"size\": ?\\d+,?", "")
1✔
140
        .replaceAll("\"from\": ?\\d+,?", "");
1✔
141

142
    Request validateRequest = new Request(
1✔
143
        HttpMethod.GET,
144
        "/" + esConfig.getDatasetIndexName() + "/_validate/query");
1✔
145
    validateRequest.setEntity(new NStringEntity(modifiedQuery, ContentType.APPLICATION_JSON));
1✔
146
    Response response = performRequest(validateRequest);
1✔
147

148
    var entity = response.getEntity().toString();
1✔
149
    var json = GsonUtil.getInstance().fromJson(entity, Map.class);
1✔
150

151
    return (boolean) json.get("valid");
1✔
152
  }
153

154
  public Response searchDatasets(String query) throws IOException {
155
    if (!validateQuery(query)) {
1✔
156
      throw new IOException("Invalid Elasticsearch query");
×
157
    }
158

159
    Request searchRequest = new Request(
1✔
160
        HttpMethod.GET,
161
        "/" + esConfig.getDatasetIndexName() + "/_search");
1✔
162
    searchRequest.setEntity(new NStringEntity(query, ContentType.APPLICATION_JSON));
1✔
163

164
    Response response = performRequest(searchRequest);
1✔
165

166
    var entity = response.getEntity().toString();
1✔
167
    var json = GsonUtil.getInstance().fromJson(entity, ElasticSearchHits.class);
1✔
168
    var hits = json.getHits();
1✔
169

170
    return Response.ok().entity(hits).build();
1✔
171
  }
172

173
  public StudyTerm toStudyTerm(Study study) {
174
    if (Objects.isNull(study)) {
1✔
175
      return null;
×
176
    }
177

178
    StudyTerm term = new StudyTerm();
1✔
179

180
    term.setDescription(study.getDescription());
1✔
181
    term.setStudyName(study.getName());
1✔
182
    term.setStudyId(study.getStudyId());
1✔
183
    term.setDataTypes(study.getDataTypes());
1✔
184
    term.setPiName(study.getPiName());
1✔
185
    term.setPublicVisibility(study.getPublicVisibility());
1✔
186

187
    findStudyProperty(
1✔
188
        study.getProperties(), "dbGaPPhsID"
1✔
189
    ).ifPresent(
1✔
190
        prop -> term.setPhsId(prop.getValue().toString())
1✔
191
    );
192

193
    findStudyProperty(
1✔
194
        study.getProperties(), "phenotypeIndication"
1✔
195
    ).ifPresent(
1✔
196
        prop -> term.setPhenotype(prop.getValue().toString())
1✔
197
    );
198

199
    findStudyProperty(
1✔
200
        study.getProperties(), "species"
1✔
201
    ).ifPresent(
1✔
202
        prop -> term.setSpecies(prop.getValue().toString())
1✔
203
    );
204

205
    findStudyProperty(
1✔
206
        study.getProperties(), "dataCustodianEmail"
1✔
207
    ).ifPresent(
1✔
208
        prop -> {
209
          JsonArray jsonArray = (JsonArray) prop.getValue();
1✔
210
          List<String> dataCustodianEmail = new ArrayList<>();
1✔
211
          jsonArray.forEach(email -> dataCustodianEmail.add(email.getAsString()));
1✔
212
          term.setDataCustodianEmail(dataCustodianEmail);
1✔
213
        }
1✔
214
    );
215

216
    if (Objects.nonNull(study.getCreateUserId())) {
1✔
217
      term.setDataSubmitterId(study.getCreateUserId());
1✔
218
      User user = userDAO.findUserById(study.getCreateUserId());
1✔
219
      if (Objects.nonNull(user)) {
1✔
220
        study.setCreateUserEmail(user.getEmail());
1✔
221
      }
222
    }
223

224
    if (Objects.nonNull(study.getCreateUserEmail())) {
1✔
225
      term.setDataSubmitterEmail(study.getCreateUserEmail());
1✔
226
    }
227

228
    return term;
1✔
229
  }
230

231
  public UserTerm toUserTerm(User user) {
232
    if (Objects.isNull(user)) {
1✔
233
      return null;
×
234
    }
235
    InstitutionTerm institution = (Objects.nonNull(user.getInstitutionId())) ?
1✔
236
        toInstitutionTerm(institutionDAO.findInstitutionById(user.getInstitutionId())) :
1✔
237
        null;
1✔
238
    return new UserTerm(user.getUserId(), user.getDisplayName(), institution);
1✔
239
  }
240

241
  public DacTerm toDacTerm(Dac dac) {
242
    if (Objects.isNull(dac)) {
1✔
243
      return null;
×
244
    }
245
    return new DacTerm(dac.getDacId(), dac.getName(), dac.getEmail());
1✔
246
  }
247

248
  public InstitutionTerm toInstitutionTerm(Institution institution) {
249
    if (Objects.isNull(institution)) {
1✔
250
      return null;
1✔
251
    }
252
    return new InstitutionTerm(institution.getId(), institution.getName());
1✔
253
  }
254

255
  /**
256
   * Synchronize the dataset in the ES index. This will only index the dataset if it has been
257
   * previously indexed, UNLESS the force argument is true which means it will index the dataset
258
   * and update the dataset's last indexed date value.
259
   *
260
   * @param dataset The Dataset
261
   * @param user    The User
262
   * @param force   Boolean to force the index update regardless of dataset's indexed date status.
263
   */
264
  public void synchronizeDatasetInESIndex(Dataset dataset, User user, boolean force) {
NEW
265
    if (force || dataset.getIndexedDate() != null) {
×
NEW
266
      try (var response = indexDataset(dataset, user)) {
×
NEW
267
        if (!HttpStatusCodes.isSuccess(response.getStatus())) {
×
NEW
268
          logWarn("Response error, unable to index dataset: %s".formatted(dataset.getDatasetId()));
×
269
        }
NEW
270
      } catch (IOException e) {
×
NEW
271
        logWarn("Exception, unable to index dataset: %s".formatted(dataset.getDatasetId()));
×
NEW
272
      }
×
273
    }
UNCOV
274
  }
×
275

276
  public Response indexDataset(Dataset dataset, User user) throws IOException {
277
    return indexDatasets(List.of(dataset), user);
1✔
278
  }
279

280
  public Response indexDatasets(List<Dataset> datasets, User user) throws IOException {
281
    List<DatasetTerm> datasetTerms = datasets.stream().map(this::toDatasetTerm).toList();
1✔
282
    return indexDatasetTerms(datasetTerms, user);
1✔
283
  }
284

285
  /**
286
   * Sequentially index datasets to ElasticSearch by ID list. Note that this is intended for large
287
   * lists of dataset ids. For small sets of datasets (i.e. <~25), it is efficient to index them in
288
   * bulk using the {@link #indexDatasets(List, User)} method.
289
   *
290
   * @param datasetIds List of Dataset IDs to index
291
   * @return StreamingOutput of ElasticSearch responses from indexing datasets
292
   */
293
  public StreamingOutput indexDatasetIds(List<Integer> datasetIds, User user) {
294
    Integer lastDatasetId = datasetIds.get(datasetIds.size() - 1);
1✔
295
    return output -> {
1✔
296
      output.write("[".getBytes());
1✔
297
      datasetIds.forEach(id -> {
1✔
298
        Dataset dataset = datasetDAO.findDatasetById(id);
1✔
299
        try (Response response = indexDataset(dataset, user)) {
1✔
300
          output.write(response.getEntity().toString().getBytes());
1✔
301
          if (!id.equals(lastDatasetId)) {
1✔
302
            output.write(",".getBytes());
×
303
          }
304
          output.write("\n".getBytes());
1✔
305
        } catch (IOException e) {
1✔
306
          logException("Error indexing dataset term for dataset id: %d ".formatted(dataset.getDatasetId()), e);
1✔
307
        }
1✔
308
      });
1✔
309
      output.write("]".getBytes());
1✔
310
    };
1✔
311
  }
312

313
  public Response indexStudy(Integer studyId, User user) {
314
    Study study = studyDAO.findStudyById(studyId);
1✔
315
    // The dao call above does not populate its datasets so we need to check for datasetIds
316
    if (study != null && study.getDatasetIds() != null && !study.getDatasetIds().isEmpty()) {
1✔
317
      List<Dataset> datasets = datasetDAO.findDatasetsByIdList(study.getDatasetIds().stream().toList());
1✔
NEW
318
      try (Response response = indexDatasets(datasets, user)) {
×
319
        return response;
×
320
      } catch (Exception e) {
1✔
321
        logException(String.format("Failed to index datasets for study id: %d", studyId), e);
1✔
322
        return Response.status(Status.INTERNAL_SERVER_ERROR).build();
1✔
323
      }
324
    }
325
    return Response.status(Status.NOT_FOUND).build();
1✔
326
  }
327

328
  public DatasetTerm toDatasetTerm(Dataset dataset) {
329
    if (Objects.isNull(dataset)) {
1✔
330
      return null;
×
331
    }
332

333
    DatasetTerm term = new DatasetTerm();
1✔
334

335
    term.setDatasetId(dataset.getDatasetId());
1✔
336
    Optional.ofNullable(dataset.getCreateUserId()).ifPresent(userId -> {
1✔
337
      User user = userDAO.findUserById(dataset.getCreateUserId());
1✔
338
      term.setCreateUserId(dataset.getCreateUserId());
1✔
339
      term.setCreateUserDisplayName(user.getDisplayName());
1✔
340
      term.setSubmitter(toUserTerm(user));
1✔
341
    });
1✔
342
    Optional.ofNullable(dataset.getUpdateUserId())
1✔
343
        .map(userDAO::findUserById)
1✔
344
        .map(this::toUserTerm)
1✔
345
        .ifPresent(term::setUpdateUser);
1✔
346
    term.setDatasetIdentifier(dataset.getDatasetIdentifier());
1✔
347
    term.setDeletable(dataset.getDeletable());
1✔
348
    term.setDatasetName(dataset.getName());
1✔
349

350
    if (Objects.nonNull(dataset.getStudy())) {
1✔
351
      term.setStudy(toStudyTerm(dataset.getStudy()));
1✔
352
    }
353

354
    Optional.ofNullable(dataset.getDacId()).ifPresent(dacId -> {
1✔
355
      Dac dac = dacDAO.findById(dataset.getDacId());
1✔
356
      term.setDacId(dataset.getDacId());
1✔
357
      if (Objects.nonNull(dataset.getDacApproval())) {
1✔
358
        term.setDacApproval(dataset.getDacApproval());
1✔
359
      }
360
      term.setDac(toDacTerm(dac));
1✔
361
    });
1✔
362

363
    List<Integer> approvedUserIds = dataAccessRequestDAO
1✔
364
        .findApprovedDARsByDatasetId(dataset.getDatasetId())
1✔
365
        .stream()
1✔
366
        .map(DataAccessRequest::getUserId)
1✔
367
        .toList();
1✔
368

369
    if (!approvedUserIds.isEmpty()) {
1✔
370
      term.setApprovedUserIds(approvedUserIds);
1✔
371
    }
372

373
    if (Objects.nonNull(dataset.getDataUse())) {
1✔
374
      DataUseSummary summary = ontologyService.translateDataUseSummary(dataset.getDataUse());
1✔
375
      if (summary != null) {
1✔
376
        term.setDataUse(summary);
1✔
377
      } else {
378
        logWarn("No data use summary for dataset id: %d".formatted(dataset.getDatasetId()));
1✔
379
      }
380
    }
381

382
    findDatasetProperty(
1✔
383
        dataset.getProperties(), "accessManagement"
1✔
384
    ).ifPresent(
1✔
385
        datasetProperty -> term.setAccessManagement(datasetProperty.getPropertyValueAsString())
1✔
386
    );
387

388
    findFirstDatasetPropertyByName(
1✔
389
        dataset.getProperties(), "# of participants"
1✔
390
    ).ifPresent(
1✔
391
        datasetProperty -> {
392
          String value = datasetProperty.getPropertyValueAsString();
1✔
393
          try {
394
            term.setParticipantCount(Integer.valueOf(value));
1✔
395
          } catch (NumberFormatException e) {
1✔
396
            logWarn(String.format("Unable to coerce participant count to integer: %s for dataset: %s", value, dataset.getDatasetIdentifier()));
1✔
397
          }
1✔
398
        }
1✔
399
    );
400

401
    findDatasetProperty(
1✔
402
        dataset.getProperties(), "url"
1✔
403
    ).ifPresent(
1✔
404
        datasetProperty -> term.setUrl(datasetProperty.getPropertyValueAsString())
1✔
405
    );
406

407
    findDatasetProperty(
1✔
408
        dataset.getProperties(), "dataLocation"
1✔
409
    ).ifPresent(
1✔
410
        datasetProperty -> term.setDataLocation(datasetProperty.getPropertyValueAsString())
1✔
411
    );
412

413
    return term;
1✔
414
  }
415

416
  protected void updateDatasetIndexDate(Integer datasetId, Integer userId, Instant indexDate)
417
      {
418
    // It is possible that a dataset has been deleted. If so, we don't want to try and update it.
419
    Dataset dataset = datasetDAO.findDatasetById(datasetId);
1✔
420
    if (dataset != null) {
1✔
421
      try {
422
        datasetServiceDAO.updateDatasetIndex(datasetId, userId, indexDate);
1✔
NEW
423
      } catch (SQLException e) {
×
424
        // We don't want to send these to Sentry, but we do want to log them for follow up off cycle
NEW
425
        logWarn("Error updating dataset indexed date for dataset id: %d ".formatted(datasetId), e);
×
426
      }
1✔
427
    }
428
  }
1✔
429

430
  Optional<DatasetProperty> findDatasetProperty(Collection<DatasetProperty> props,
431
      String schemaProp) {
432
    return
1✔
433
        (props == null) ? Optional.empty() : props
1✔
434
            .stream()
1✔
435
            .filter(p -> Objects.nonNull(p.getSchemaProperty()))
1✔
436
            .filter(p -> p.getSchemaProperty().equals(schemaProp))
1✔
437
            .findFirst();
1✔
438
  }
439

440
  Optional<DatasetProperty> findFirstDatasetPropertyByName(Collection<DatasetProperty> props,
441
      String propertyName) {
442
    return
1✔
443
        (props == null) ? Optional.empty(): props
1✔
444
            .stream()
1✔
445
            .filter(p -> p.getPropertyName().equalsIgnoreCase(propertyName))
1✔
446
            .findFirst();
1✔
447
  }
448

449
  Optional<StudyProperty> findStudyProperty(Collection<StudyProperty> props, String key) {
450
    return
1✔
451
        (props == null) ? Optional.empty() : props
1✔
452
            .stream()
1✔
453
            .filter(p -> p.getKey().equals(key))
1✔
454
            .findFirst();
1✔
455
  }
456

457
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc