• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

divviup / divviup-api / 13020718101

28 Jan 2025 10:27PM UTC coverage: 55.794% (+0.005%) from 55.789%
13020718101

push

github

web-flow
Bump validator from 0.19.0 to 0.20.0 (#1533)

* Bump validator from 0.19.0 to 0.20.0

Bumps [validator](https://github.com/Keats/validator) from 0.19.0 to 0.20.0.
- [Changelog](https://github.com/Keats/validator/blob/master/CHANGELOG.md)
- [Commits](https://github.com/Keats/validator/commits/v0.20.0)

---
updated-dependencies:
- dependency-name: validator
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Wrap the vdaf in Cow::Borrowed

* cargo fmt

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: J.C. Jones <jc@insufficient.coffee>

0 of 7 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

3881 of 6956 relevant lines covered (55.79%)

86.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.09
/src/entity/task/new_task.rs
1
use super::*;
2
use crate::{
3
    clients::aggregator_client::api_types::{AggregatorVdaf, QueryType},
4
    entity::{
5
        aggregator::{Feature, Role},
6
        Account, CollectorCredential, Protocol,
7
    },
8
    handler::Error,
9
};
10
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
11
use rand::Rng;
12
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter};
13
use sha2::{Digest, Sha256};
14
use validator::{ValidationErrors, ValidationErrorsKind};
15
use vdaf::{DpStrategy, DpStrategyKind, SumVec};
16

17
#[derive(Deserialize, Validate, Debug, Clone, Default)]
18
pub struct NewTask {
19
    #[validate(required, length(min = 1))]
20
    pub name: Option<String>,
21

22
    #[validate(required)]
23
    pub leader_aggregator_id: Option<String>,
24

25
    #[validate(required)]
26
    pub helper_aggregator_id: Option<String>,
27

28
    #[validate(required, nested)]
29
    pub vdaf: Option<Vdaf>,
30

31
    #[validate(required, range(min = 100))]
32
    pub min_batch_size: Option<u64>,
33

34
    #[validate(range(min = 0))]
35
    pub max_batch_size: Option<u64>,
36

37
    #[validate(range(min = 0))]
38
    pub batch_time_window_size_seconds: Option<u64>,
39

40
    #[validate(
41
        required,
42
        range(
43
            min = 60,
44
            max = 2592000,
45
            message = "must be between 1 minute and 4 weeks"
46
        )
47
    )]
48
    pub time_precision_seconds: Option<u64>,
49

50
    #[validate(required)]
51
    pub collector_credential_id: Option<String>,
52
}
53

54
async fn load_aggregator(
52✔
55
    account: &Account,
52✔
56
    id: Option<&str>,
52✔
57
    db: &impl ConnectionTrait,
52✔
58
) -> Result<Option<Aggregator>, Error> {
52✔
59
    let Some(id) = id.map(Uuid::parse_str).transpose()? else {
52✔
60
        return Ok(None);
6✔
61
    };
62

63
    let aggregator = Aggregators::find_by_id(id)
46✔
64
        .filter(AggregatorColumn::DeletedAt.is_null())
46✔
65
        .one(db)
46✔
66
        .await?;
46✔
67

68
    let Some(aggregator) = aggregator else {
46✔
69
        return Ok(None);
2✔
70
    };
71

72
    if aggregator.account_id.is_none() || aggregator.account_id == Some(account.id) {
44✔
73
        Ok(Some(aggregator))
44✔
74
    } else {
75
        Ok(None)
×
76
    }
77
}
52✔
78

79
const VDAF_BYTES: usize = 16;
80
fn generate_vdaf_verify_key_and_expected_task_id() -> (String, String) {
7✔
81
    let mut verify_key = [0; VDAF_BYTES];
7✔
82
    rand::thread_rng().fill(&mut verify_key);
7✔
83
    (
7✔
84
        URL_SAFE_NO_PAD.encode(verify_key),
7✔
85
        URL_SAFE_NO_PAD.encode(Sha256::digest(verify_key)),
7✔
86
    )
7✔
87
}
7✔
88

89
impl NewTask {
90
    fn validate_min_lte_max(&self, errors: &mut ValidationErrors) {
26✔
91
        let min = self.min_batch_size;
26✔
92
        let max = self.max_batch_size;
26✔
93
        if matches!((min, max), (Some(min), Some(max)) if min > max) {
26✔
94
            let error = ValidationError::new("min_greater_than_max");
2✔
95
            errors.add("min_batch_size", error.clone());
2✔
96
            errors.add("max_batch_size", error);
2✔
97
        }
24✔
98
    }
26✔
99

100
    fn validate_batch_time_window_size(&self, errors: &mut ValidationErrors) {
26✔
101
        let window = self.batch_time_window_size_seconds;
26✔
102
        if let Some(window) = window {
26✔
103
            if self.max_batch_size.is_none() {
5✔
104
                errors.add(
1✔
105
                    "batch_time_window_size_seconds",
1✔
106
                    ValidationError::new("missing-max-batch-size"),
1✔
107
                );
1✔
108
            }
4✔
109
            if let Some(precision) = self.time_precision_seconds {
5✔
110
                if window % precision != 0 {
5✔
111
                    errors.add(
2✔
112
                        "batch_time_window_size_seconds",
2✔
113
                        ValidationError::new("not-multiple-of-time-precision"),
2✔
114
                    );
2✔
115
                }
3✔
116
            }
×
117
        }
21✔
118
    }
26✔
119

120
    async fn load_collector_credential(
26✔
121
        &self,
26✔
122
        account: &Account,
26✔
123
        db: &impl ConnectionTrait,
26✔
124
    ) -> Option<CollectorCredential> {
26✔
125
        let id = Uuid::parse_str(self.collector_credential_id.as_deref()?).ok()?;
26✔
126
        CollectorCredentials::find_by_id(id)
12✔
127
            .filter(CollectorCredentialColumn::AccountId.eq(account.id))
12✔
128
            .one(db)
12✔
129
            .await
12✔
130
            .ok()
12✔
131
            .flatten()
12✔
132
    }
26✔
133

134
    async fn validate_collector_credential(
26✔
135
        &self,
26✔
136
        account: &Account,
26✔
137
        leader: Option<&Aggregator>,
26✔
138
        db: &impl ConnectionTrait,
26✔
139
        errors: &mut ValidationErrors,
26✔
140
    ) -> Option<CollectorCredential> {
26✔
141
        match self.load_collector_credential(account, db).await {
26✔
142
            None => {
143
                errors.add("collector_credential_id", ValidationError::new("required"));
14✔
144
                None
14✔
145
            }
146

147
            Some(collector_credential) => {
12✔
148
                let leader_needs_token_hash =
12✔
149
                    leader.is_some_and(|leader| leader.features.token_hash_enabled());
12✔
150

12✔
151
                if leader_needs_token_hash && collector_credential.token_hash.is_none() {
12✔
152
                    errors.add(
×
153
                        "collector_credential_id",
×
154
                        ValidationError::new("missing-token-hash"),
×
155
                    );
×
156
                    None
×
157
                } else {
158
                    Some(collector_credential)
12✔
159
                }
160
            }
161
        }
162
    }
26✔
163

164
    async fn validate_aggregators(
26✔
165
        &self,
26✔
166
        account: &Account,
26✔
167
        db: &impl ConnectionTrait,
26✔
168
        errors: &mut ValidationErrors,
26✔
169
    ) -> Option<(Aggregator, Aggregator, Protocol)> {
26✔
170
        let leader = load_aggregator(account, self.leader_aggregator_id.as_deref(), db)
26✔
171
            .await
26✔
172
            .ok()
26✔
173
            .flatten();
26✔
174
        if leader.is_none() {
26✔
175
            errors.add("leader_aggregator_id", ValidationError::new("required"));
4✔
176
        }
22✔
177

178
        let helper = load_aggregator(account, self.helper_aggregator_id.as_deref(), db)
26✔
179
            .await
26✔
180
            .ok()
26✔
181
            .flatten();
26✔
182
        if helper.is_none() {
26✔
183
            errors.add("helper_aggregator_id", ValidationError::new("required"));
4✔
184
        }
22✔
185

186
        let (Some(leader), Some(helper)) = (leader, helper) else {
26✔
187
            return None;
5✔
188
        };
189

190
        if leader == helper {
21✔
191
            errors.add("leader_aggregator_id", ValidationError::new("same"));
×
192
            errors.add("helper_aggregator_id", ValidationError::new("same"));
×
193
        }
21✔
194

195
        if !leader.is_first_party && !helper.is_first_party {
21✔
196
            errors.add(
×
197
                "leader_aggregator_id",
×
198
                ValidationError::new("no-first-party"),
×
199
            );
×
200
            errors.add(
×
201
                "helper_aggregator_id",
×
202
                ValidationError::new("no-first-party"),
×
203
            );
×
204
        }
21✔
205

206
        let resolved_protocol = if leader.protocol == helper.protocol {
21✔
207
            leader.protocol
21✔
208
        } else {
209
            errors.add("leader_aggregator_id", ValidationError::new("protocol"));
×
210
            errors.add("helper_aggregator_id", ValidationError::new("protocol"));
×
211
            return None;
×
212
        };
213

214
        if leader.role == Role::Helper {
21✔
215
            errors.add("leader_aggregator_id", ValidationError::new("role"))
1✔
216
        }
20✔
217

218
        if helper.role == Role::Leader {
21✔
219
            errors.add("helper_aggregator_id", ValidationError::new("role"))
1✔
220
        }
20✔
221

222
        if self.batch_time_window_size_seconds.is_some()
21✔
223
            && !leader.features.contains(&Feature::TimeBucketedFixedSize)
5✔
224
        {
225
            errors.add(
1✔
226
                "leader_aggregator_id",
1✔
227
                ValidationError::new("time-bucketed-fixed-size-unsupported"),
1✔
228
            )
1✔
229
        }
20✔
230

231
        let uses_pure_dp_discrete_laplace = match &self.vdaf {
21✔
232
            Some(Vdaf::SumVec(SumVec {
233
                dp_strategy:
234
                    DpStrategy {
235
                        dp_strategy: DpStrategyKind::PureDpDiscreteLaplace,
236
                        ..
237
                    },
238
                ..
239
            })) => true,
×
240
            Some(Vdaf::Histogram(histogram)) => matches!(
6✔
241
                histogram.dp_strategy().dp_strategy,
6✔
242
                DpStrategyKind::PureDpDiscreteLaplace
243
            ),
244
            _ => false,
15✔
245
        };
246
        if uses_pure_dp_discrete_laplace
21✔
247
            && !leader.features.contains(&Feature::PureDpDiscreteLaplace)
5✔
248
        {
1✔
249
            errors.add(
1✔
250
                "leader_aggregator_id",
1✔
251
                ValidationError::new("pure-dp-discrete-laplace-unsupported"),
1✔
252
            );
1✔
253
        }
20✔
254
        if uses_pure_dp_discrete_laplace
21✔
255
            && !helper.features.contains(&Feature::PureDpDiscreteLaplace)
5✔
256
        {
1✔
257
            errors.add(
1✔
258
                "helper_aggregator_id",
1✔
259
                ValidationError::new("pure-dp-discrete-laplace-unsupported"),
1✔
260
            );
1✔
261
        }
20✔
262

263
        if errors.is_empty() {
21✔
264
            Some((leader, helper, resolved_protocol))
7✔
265
        } else {
266
            None
14✔
267
        }
268
    }
26✔
269

270
    fn validate_vdaf_is_supported(
7✔
271
        &self,
7✔
272
        leader: &Aggregator,
7✔
273
        helper: &Aggregator,
7✔
274
        protocol: &Protocol,
7✔
275
        errors: &mut ValidationErrors,
7✔
276
    ) -> Option<AggregatorVdaf> {
7✔
277
        let vdaf = self.vdaf.as_ref()?;
7✔
278

279
        let name = vdaf.name();
7✔
280
        let aggregator_vdaf = match vdaf.representation_for_protocol(protocol) {
7✔
281
            Ok(vdaf) => vdaf,
7✔
282
            Err(e) => {
×
NEW
283
                let errors = errors
×
NEW
284
                    .errors_mut()
×
NEW
285
                    .entry(std::borrow::Cow::Borrowed("vdaf"))
×
NEW
286
                    .or_insert_with(|| {
×
NEW
287
                        ValidationErrorsKind::Struct(Box::new(ValidationErrors::new()))
×
NEW
288
                    });
×
289
                match errors {
×
290
                    ValidationErrorsKind::Struct(errors) => {
×
291
                        errors.errors_mut().extend(e.into_errors())
×
292
                    }
293
                    other => *other = ValidationErrorsKind::Struct(Box::new(e)),
×
294
                };
295
                return None;
×
296
            }
297
        };
298

299
        if !leader.vdafs.contains(&name) || !helper.vdafs.contains(&name) {
7✔
300
            let errors = errors
×
301
                .errors_mut()
×
NEW
302
                .entry(std::borrow::Cow::Borrowed("vdaf"))
×
303
                .or_insert_with(|| ValidationErrorsKind::Struct(Box::new(ValidationErrors::new())));
×
304
            match errors {
×
305
                ValidationErrorsKind::Struct(errors) => {
×
306
                    errors.add("type", ValidationError::new("not-supported"));
×
307
                }
×
308
                other => {
×
309
                    let mut e = ValidationErrors::new();
×
310
                    e.add("type", ValidationError::new("not-supported"));
×
311
                    *other = ValidationErrorsKind::Struct(Box::new(e));
×
312
                }
×
313
            };
314
        }
7✔
315

316
        Some(aggregator_vdaf)
7✔
317
    }
7✔
318

319
    fn populate_chunk_length(&mut self, protocol: &Protocol) {
7✔
320
        if let Some(vdaf) = &mut self.vdaf {
7✔
321
            vdaf.populate_chunk_length(protocol);
7✔
322
        }
7✔
323
    }
7✔
324

325
    fn validate_query_type_is_supported(
7✔
326
        &self,
7✔
327
        leader: &Aggregator,
7✔
328
        helper: &Aggregator,
7✔
329
        errors: &mut ValidationErrors,
7✔
330
    ) {
7✔
331
        let name = self.query_type().name();
7✔
332
        if !leader.query_types.contains(&name) || !helper.query_types.contains(&name) {
7✔
333
            errors.add("max_batch_size", ValidationError::new("not-supported"));
×
334
        }
7✔
335
    }
7✔
336

337
    pub async fn normalize_and_validate(
26✔
338
        &mut self,
26✔
339
        account: Account,
26✔
340
        db: &impl ConnectionTrait,
26✔
341
    ) -> Result<ProvisionableTask, ValidationErrors> {
26✔
342
        let mut errors = Validate::validate(self).err().unwrap_or_default();
26✔
343
        self.validate_min_lte_max(&mut errors);
26✔
344
        self.validate_batch_time_window_size(&mut errors);
26✔
345
        let aggregators = self.validate_aggregators(&account, db, &mut errors).await;
26✔
346
        let collector_credential = self
26✔
347
            .validate_collector_credential(
26✔
348
                &account,
26✔
349
                aggregators.as_ref().map(|(leader, ..)| leader),
26✔
350
                db,
26✔
351
                &mut errors,
26✔
352
            )
26✔
353
            .await;
26✔
354

355
        let aggregator_vdaf = if let Some((leader, helper, protocol)) = aggregators.as_ref() {
26✔
356
            self.validate_query_type_is_supported(leader, helper, &mut errors);
7✔
357
            self.populate_chunk_length(protocol);
7✔
358
            self.validate_vdaf_is_supported(leader, helper, protocol, &mut errors)
7✔
359
        } else {
360
            None
19✔
361
        };
362

363
        if errors.is_empty() {
26✔
364
            // Unwrap safety: All of these unwraps below have previously
365
            // been checked by the above validations. The fact that we
366
            // have to check them twice is a consequence of the
367
            // disharmonious combination of Validate and the fact that we
368
            // need to use options for all fields so serde doesn't bail on
369
            // the first error.
370
            let (leader_aggregator, helper_aggregator, protocol) = aggregators.unwrap();
7✔
371

7✔
372
            let (vdaf_verify_key, id) = generate_vdaf_verify_key_and_expected_task_id();
7✔
373

7✔
374
            Ok(ProvisionableTask {
7✔
375
                account,
7✔
376
                id,
7✔
377
                vdaf_verify_key,
7✔
378
                name: self.name.clone().unwrap(),
7✔
379
                leader_aggregator,
7✔
380
                helper_aggregator,
7✔
381
                vdaf: self.vdaf.clone().unwrap(),
7✔
382
                aggregator_vdaf: aggregator_vdaf.unwrap(),
7✔
383
                min_batch_size: self.min_batch_size.unwrap(),
7✔
384
                max_batch_size: self.max_batch_size,
7✔
385
                batch_time_window_size_seconds: self.batch_time_window_size_seconds,
7✔
386
                expiration: Some(OffsetDateTime::now_utc() + DEFAULT_EXPIRATION_DURATION),
7✔
387
                time_precision_seconds: self.time_precision_seconds.unwrap(),
7✔
388
                collector_credential: collector_credential.unwrap(),
7✔
389
                aggregator_auth_token: None,
7✔
390
                protocol,
7✔
391
            })
7✔
392
        } else {
393
            Err(errors)
19✔
394
        }
395
    }
26✔
396

397
    pub fn query_type(&self) -> QueryType {
7✔
398
        if let Some(max_batch_size) = self.max_batch_size {
7✔
399
            QueryType::FixedSize {
1✔
400
                max_batch_size,
1✔
401
                batch_time_window_size: self.batch_time_window_size_seconds,
1✔
402
            }
1✔
403
        } else {
404
            QueryType::TimeInterval
6✔
405
        }
406
    }
7✔
407
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc