• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2863

05 Nov 2024 06:20PM UTC coverage: 30.754% (-16.6%) from 47.311%
2863

push

buildkite

web-flow
Merge pull request #16296 from MinaProtocol/dkijania/more_multi_jobs

more multi jobs in CI

20276 of 65930 relevant lines covered (30.75%)

8631.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.08
/src/lib/parallel_scan/parallel_scan.ml
1
open Core_kernel
3✔
2
open Async_kernel
3
open Pipe_lib
4

5
(*Glossary of type variables used in this file:
6
  1. 'base: polymorphic type for jobs in the leaves of the scan state tree
7
  2. 'merge: polymorphic type for jobs in the intermediate nodes of the scan state tree
8
  3. 'base_t: 'base Base.t
9
  4. 'merge_t: 'merge Merge.t
10
  5. 'base_job: Base.Job.t
11
  6. 'merge_job: Merge.Job.t
12
*)
13

14
(*Note:  Prefixing some of the general purpose functions that could be used in the future with an "_" to not cause "unused function" error*)
15

16
(**Sequence number for jobs in the scan state that corresponds to the order in
17
which they were added*)
18
module Sequence_number = struct
19
  [%%versioned
20
  module Stable = struct
21
    module V1 = struct
22
      type t = int [@@deriving sexp]
3✔
23

24
      let to_latest = Fn.id
25
    end
26
  end]
27
end
28

29
(**Each node on the tree is viewed as a job that needs to be completed. When a
30
job is completed, it creates a new "Todo" job and marks the old job as "Done"*)
31
module Job_status = struct
32
  [%%versioned
33
  module Stable = struct
34
    module V1 = struct
35
      type t = Todo | Done [@@deriving sexp]
×
36

37
      let to_latest = Fn.id
38
    end
39
  end]
40

41
  let to_string = function Todo -> "Todo" | Done -> "Done"
×
42
end
43

44
(**The number of new jobs- base and merge that can be added to this tree.
45
 * Each node has a weight associated to it and the
46
 * new jobs received are distributed across the tree based on this number. *)
47
module Weight = struct
48
  [%%versioned
49
  module Stable = struct
50
    [@@@no_toplevel_latest_type]
51

52
    module V1 = struct
53
      type t = { base : int; merge : int } [@@deriving sexp]
×
54

55
      let to_latest = Fn.id
56
    end
57
  end]
58

59
  type t = Stable.Latest.t = { base : int; merge : int } [@@deriving sexp, lens]
×
60
end
61

62
(**For base proofs (Proving new transactions)*)
63
module Base = struct
64
  module Record = struct
65
    [%%versioned
66
    module Stable = struct
67
      module V1 = struct
68
        type 'base t =
3✔
69
          { job : 'base
×
70
          ; seq_no : Sequence_number.Stable.V1.t
×
71
          ; status : Job_status.Stable.V1.t
×
72
          }
73
        [@@deriving sexp]
9✔
74
      end
75
    end]
76

77
    let map (t : 'a t) ~(f : 'a -> 'b) : 'b t = { t with job = f t.job }
×
78
  end
79

80
  module Job = struct
81
    [%%versioned
82
    module Stable = struct
83
      module V1 = struct
84
        type 'base t = Empty | Full of 'base Record.Stable.V1.t
×
85
        [@@deriving sexp]
9✔
86
      end
87
    end]
88

89
    let map (t : 'a t) ~(f : 'a -> 'b) : 'b t =
90
      match t with Empty -> Empty | Full r -> Full (Record.map r ~f)
×
91

92
    let job_str = function Empty -> "Base.Empty" | Full _ -> "Base.Full"
×
93
  end
94

95
  [%%versioned
96
  module Stable = struct
97
    module V1 = struct
98
      type 'base t = Weight.Stable.V1.t * 'base Job.Stable.V1.t
×
99
      [@@deriving sexp]
9✔
100
    end
101
  end]
102

103
  let map ((x, j) : 'a t) ~(f : 'a -> 'b) : 'b t = (x, Job.map j ~f)
×
104
end
105

106
(** For merge proofs: Merging two base proofs or two merge proofs*)
107
module Merge = struct
108
  module Record = struct
109
    [%%versioned
110
    module Stable = struct
111
      module V1 = struct
112
        type 'merge t =
3✔
113
          { left : 'merge
×
114
          ; right : 'merge
×
115
          ; seq_no : Sequence_number.Stable.V1.t
×
116
          ; status : Job_status.Stable.V1.t
×
117
          }
118
        [@@deriving sexp]
9✔
119
      end
120
    end]
121

122
    let map (t : 'a t) ~(f : 'a -> 'b) : 'b t =
123
      { t with left = f t.left; right = f t.right }
×
124
  end
125

126
  module Job = struct
127
    [%%versioned
128
    module Stable = struct
129
      module V1 = struct
130
        type 'merge t =
3✔
131
          | Empty
×
132
          | Part of 'merge (*When only the left component of the job is available since we always complete the jobs from left to right*)
×
133
          | Full of 'merge Record.Stable.V1.t
×
134
        [@@deriving sexp]
9✔
135
      end
136
    end]
137

138
    let map (t : 'a t) ~(f : 'a -> 'b) : 'b t =
139
      match t with
×
140
      | Empty ->
×
141
          Empty
142
      | Part x ->
×
143
          Part (f x)
×
144
      | Full r ->
×
145
          Full (Record.map r ~f)
×
146

147
    let job_str = function
148
      | Empty ->
×
149
          "Merge.Empty"
150
      | Full _ ->
×
151
          "Merge.Full"
152
      | Part _ ->
×
153
          "Merge.Part"
154
  end
155

156
  [%%versioned
157
  module Stable = struct
158
    module V1 = struct
159
      type 'merge t =
3✔
160
        (Weight.Stable.V1.t * Weight.Stable.V1.t) * 'merge Job.Stable.V1.t
×
161
      [@@deriving sexp]
9✔
162
    end
163
  end]
164

165
  let map ((x, j) : 'a t) ~(f : 'a -> 'b) : 'b t = (x, Job.map j ~f)
×
166
end
167

168
(**All the jobs on a tree that can be done. Base.Full and Merge.Full*)
169
module Available_job = struct
170
  type ('merge, 'base) t = Base of 'base | Merge of 'merge * 'merge
×
171
  [@@deriving sexp]
172
end
173

174
(**New jobs to be added (including new transactions or new merge jobs)*)
175
module Job = struct
176
  type ('merge, 'base) t = Base of 'base | Merge of 'merge [@@deriving sexp]
×
177
end
178

179
(**Space available and number of jobs required to enqueue data.
180
 first = space on the current tree and number of jobs required
181
 to be completed
182
 second = If the current-tree space is less than <max_base_jobs>
183
 then remaining number of slots on a new tree and the corresponding
184
 job count.*)
185
module Space_partition = struct
186
  [%%versioned
187
  module Stable = struct
188
    module V1 = struct
189
      type t = { first : int * int; second : (int * int) option }
×
190
      [@@deriving sexp]
15✔
191

192
      let to_latest = Fn.id
193
    end
194
  end]
195
end
196

197
(**View of a job for json output*)
198
module Job_view = struct
199
  module Extra = struct
200
    [%%versioned
201
    module Stable = struct
202
      module V1 = struct
203
        type t =
6✔
204
          { seq_no : Sequence_number.Stable.V1.t
×
205
          ; status : Job_status.Stable.V1.t
×
206
          }
207
        [@@deriving sexp]
15✔
208

209
        let to_latest = Fn.id
210
      end
211
    end]
212
  end
213

214
  module Node = struct
215
    [%%versioned
216
    module Stable = struct
217
      module V1 = struct
218
        type 'a t =
3✔
219
          | BEmpty
×
220
          | BFull of ('a * Extra.Stable.V1.t)
×
221
          | MEmpty
×
222
          | MPart of 'a
×
223
          | MFull of ('a * 'a * Extra.Stable.V1.t)
×
224
        [@@deriving sexp]
9✔
225
      end
226
    end]
227
  end
228

229
  [%%versioned
230
  module Stable = struct
231
    module V1 = struct
232
      type 'a t = { position : int; value : 'a Node.Stable.V1.t }
×
233
      [@@deriving sexp]
3✔
234
    end
235
  end]
236
end
237

238
module Hash = struct
239
  type t = Digestif.SHA256.t [@@deriving equal]
×
240
end
241

242
(**A single tree with number of leaves = max_base_jobs = 2**transaction_capacity_log_2 *)
243
module Tree = struct
244
  [%%versioned
245
  module Stable = struct
246
    module V1 = struct
247
      type ('merge_t, 'base_t) t =
3✔
248
        | Leaf of 'base_t
×
249
        | Node of
×
250
            { depth : int
×
251
            ; value : 'merge_t
×
252
            ; sub_tree : ('merge_t * 'merge_t, 'base_t * 'base_t) t
×
253
            }
254
      [@@deriving sexp]
9✔
255
    end
256
  end]
257

258
  (*Eg: Tree depth = 3
259

260
    Node M
261
    |
262
    Node (M,M)
263
    |
264
    Node ((M,M),(M,M))
265
    |
266
    Leaf (((B,B),(B,B)),((B,B),(B,B)))
267
  *)
268

269
  (*mapi where i is the level of the tree*)
270
  let rec map_depth :
271
      type a_merge b_merge c_base d_base.
272
         f_merge:(int -> a_merge -> b_merge)
273
      -> f_base:(c_base -> d_base)
274
      -> (a_merge, c_base) t
275
      -> (b_merge, d_base) t =
276
   fun ~f_merge ~f_base tree ->
277
    match tree with
5,920✔
278
    | Leaf d ->
1,480✔
279
        Leaf (f_base d)
1,480✔
280
    | Node { depth; value; sub_tree } ->
4,440✔
281
        Node
282
          { depth
283
          ; value = f_merge depth value
4,440✔
284
          ; sub_tree =
285
              map_depth
4,440✔
286
                ~f_merge:(fun i (x, y) -> (f_merge i x, f_merge i y))
5,920✔
287
                ~f_base:(fun (x, y) -> (f_base x, f_base y))
10,360✔
288
                sub_tree
289
          }
290

291
  let map :
292
      type a_merge b_merge c_base d_base.
293
         f_merge:(a_merge -> b_merge)
294
      -> f_base:(c_base -> d_base)
295
      -> (a_merge, c_base) t
296
      -> (b_merge, d_base) t =
297
   fun ~f_merge ~f_base tree ->
298
    map_depth tree ~f_base ~f_merge:(fun _ -> f_merge)
1,480✔
299

300
  (* foldi where i is the cur_level*)
301
  module Make_foldable (M : Monad.S) = struct
302
    let rec fold_depth_until' :
303
        type merge_t accum base_t final.
304
           f_merge:
305
             (int -> accum -> merge_t -> (accum, final) Continue_or_stop.t M.t)
306
        -> f_base:(accum -> base_t -> (accum, final) Continue_or_stop.t M.t)
307
        -> init:accum
308
        -> (merge_t, base_t) t
309
        -> (accum, final) Continue_or_stop.t M.t =
310
     fun ~f_merge ~f_base ~init:acc t ->
311
      let open Container.Continue_or_stop in
35,200✔
312
      let open M.Let_syntax in
313
      match t with
314
      | Leaf d ->
8,800✔
315
          f_base acc d
316
      | Node { depth; value; sub_tree } -> (
26,400✔
317
          match%bind f_merge depth acc value with
26,400✔
318
          | Continue acc' ->
26,400✔
319
              fold_depth_until'
320
                ~f_merge:(fun i acc (x, y) ->
321
                  match%bind f_merge i acc x with
35,200✔
322
                  | Continue r ->
35,200✔
323
                      f_merge i r y
324
                  | x ->
×
325
                      M.return x )
326
                ~f_base:(fun acc (x, y) ->
327
                  match%bind f_base acc x with
61,600✔
328
                  | Continue r ->
61,600✔
329
                      f_base r y
330
                  | x ->
×
331
                      M.return x )
332
                ~init:acc' sub_tree
333
          | x ->
×
334
              M.return x )
335

336
    let fold_depth_until :
337
        type merge_t base_t accum final.
338
           f_merge:
339
             (int -> accum -> merge_t -> (accum, final) Continue_or_stop.t M.t)
340
        -> f_base:(accum -> base_t -> (accum, final) Continue_or_stop.t M.t)
341
        -> init:accum
342
        -> finish:(accum -> final M.t)
343
        -> (merge_t, base_t) t
344
        -> final M.t =
345
     fun ~f_merge ~f_base ~init ~finish t ->
346
      let open M.Let_syntax in
8,520✔
347
      match%bind fold_depth_until' ~f_merge ~f_base ~init t with
8,520✔
348
      | Continue result ->
8,520✔
349
          finish result
350
      | Stop e ->
×
351
          M.return e
352
  end
353

354
  module Foldable_ident = Make_foldable (Monad.Ident)
355

356
  let fold_depth :
357
      type merge_t base_t accum.
358
         f_merge:(int -> accum -> merge_t -> accum)
359
      -> f_base:(accum -> base_t -> accum)
360
      -> init:accum
361
      -> (merge_t, base_t) t
362
      -> accum =
363
   fun ~f_merge ~f_base ~init t ->
364
    Foldable_ident.fold_depth_until
8,520✔
365
      ~f_merge:(fun i acc a -> Continue (f_merge i acc a))
59,640✔
366
      ~f_base:(fun acc d -> Continue (f_base acc d))
68,160✔
367
      ~init ~finish:Fn.id t
368

369
  let fold :
370
      type merge_t base_t accum.
371
         f_merge:(accum -> merge_t -> accum)
372
      -> f_base:(accum -> base_t -> accum)
373
      -> init:accum
374
      -> (merge_t, base_t) t
375
      -> accum =
376
   fun ~f_merge ~f_base ~init t ->
377
    fold_depth t ~init ~f_merge:(fun _ -> f_merge) ~f_base
1,480✔
378

379
  let _fold_until :
380
      type merge_t base_t accum final.
381
         f_merge:(accum -> merge_t -> (accum, final) Continue_or_stop.t)
382
      -> f_base:(accum -> base_t -> (accum, final) Continue_or_stop.t)
383
      -> init:accum
384
      -> finish:(accum -> final)
385
      -> (merge_t, base_t) t
386
      -> final =
387
   fun ~f_merge ~f_base ~init ~finish t ->
388
    Foldable_ident.fold_depth_until
×
389
      ~f_merge:(fun _ -> f_merge)
×
390
      ~f_base ~init ~finish t
391

392
  (*
393
    result -> final proof
394
    f_merge, f_base are to update the nodes with new jobs and mark old jobs as "Done"*)
395
  let rec update_split :
396
      type merge_t base_t data weight result.
397
         f_merge:(data -> int -> merge_t -> (merge_t * result option) Or_error.t)
398
      -> f_base:(data -> base_t -> base_t Or_error.t)
399
      -> weight_merge:(merge_t -> weight * weight)
400
      -> jobs:data
401
      -> update_level:int
402
      -> jobs_split:(weight * weight -> data -> data * data)
403
      -> (merge_t, base_t) t
404
      -> ((merge_t, base_t) t * result option) Or_error.t =
405
   fun ~f_merge ~f_base ~weight_merge ~jobs ~update_level ~jobs_split t ->
406
    let open Or_error.Let_syntax in
640✔
407
    match t with
408
    | Leaf d ->
160✔
409
        let%map updated = f_base jobs d in
160✔
410
        (Leaf updated, None)
160✔
411
    | Node { depth; value; sub_tree } ->
480✔
412
        let weight_left_subtree, weight_right_subtree = weight_merge value in
413
        (*update the jobs at the current level*)
414
        let%bind value', scan_result = f_merge jobs depth value in
480✔
415
        (*get the updated subtree*)
416
        let%map sub, _ =
417
          if update_level = depth then Ok (sub_tree, None)
×
418
          else
419
            (*split the jobs for the next level*)
420
            let new_jobs_list =
480✔
421
              jobs_split (weight_left_subtree, weight_right_subtree) jobs
422
            in
423
            update_split
480✔
424
              ~f_merge:(fun (b, b') i (x, y) ->
425
                let%bind left = f_merge b i x in
640✔
426
                let%map right = f_merge b' i y in
640✔
427
                ((fst left, fst right), Option.both (snd left) (snd right)) )
640✔
428
              ~f_base:(fun (b, b') (x, x') ->
429
                let%bind left = f_base b x in
1,120✔
430
                let%map right = f_base b' x' in
1,120✔
431
                (left, right) )
1,120✔
432
              ~weight_merge:(fun (a, b) -> (weight_merge a, weight_merge b))
640✔
433
              ~update_level
434
              ~jobs_split:(fun (x, y) (a, b) -> (jobs_split x a, jobs_split y b))
640✔
435
              ~jobs:new_jobs_list sub_tree
436
        in
437
        (Node { depth; value = value'; sub_tree = sub }, scan_result)
480✔
438

439
  let rec update_accumulate :
440
      type merge_t base_t data.
441
         f_merge:(data * data -> merge_t -> merge_t * data)
442
      -> f_base:(base_t -> base_t * data)
443
      -> (merge_t, base_t) t
444
      -> (merge_t, base_t) t * data =
445
   fun ~f_merge ~f_base t ->
446
    let transpose ((x1, y1), (x2, y2)) = ((x1, x2), (y1, y2)) in
640✔
447
    match t with
448
    | Leaf d ->
160✔
449
        let new_base, count_list = f_base d in
450
        (Leaf new_base, count_list)
160✔
451
    | Node { depth; value; sub_tree } ->
480✔
452
        (*get the updated subtree*)
453
        let sub, counts =
454
          update_accumulate
455
            ~f_merge:(fun (b1, b2) (x, y) ->
456
              transpose (f_merge b1 x, f_merge b2 y) )
640✔
457
            ~f_base:(fun (x, y) -> transpose (f_base x, f_base y))
1,120✔
458
            sub_tree
459
        in
460
        let value', count_list = f_merge counts value in
480✔
461
        (Node { depth; value = value'; sub_tree = sub }, count_list)
480✔
462

463
  let update :
464
         ('merge_job, 'base_job) Job.t list
465
      -> update_level:int
466
      -> sequence_no:int
467
      -> weight_lens:(Weight.t, int) Lens.t
468
      -> ('merge_t, 'base_t) t
469
      -> (('merge_t, 'base_t) t * 'b option) Or_error.t =
470
   fun completed_jobs ~update_level ~sequence_no:seq_no ~weight_lens tree ->
471
    let open Or_error.Let_syntax in
160✔
472
    let add_merges (jobs : ('b, 'c) Job.t list) cur_level
473
        (((w1, w2) as weight), m) =
474
      let left, right = (weight_lens.get w1, weight_lens.get w2) in
1,120✔
475
      if cur_level = update_level - 1 then
476
        (*Create new jobs from the completed ones*)
477
        let%map new_weight, m' =
478
          match (jobs, m) with
479
          | [], _ ->
80✔
480
              Ok (weight, m)
481
          | [ Job.Merge a; Merge b ], Merge.Job.Empty ->
×
482
              Ok
483
                ( (weight_lens.set (left - 1) w1, weight_lens.set (right - 1) w2)
×
484
                , Full { left = a; right = b; seq_no; status = Job_status.Todo }
485
                )
486
          | [ Merge a ], Empty ->
×
487
              Ok
488
                ( (weight_lens.set (left - 1) w1, weight_lens.set right w2)
×
489
                , Part a )
490
          | [ Merge b ], Part a ->
×
491
              Ok
492
                ( (weight_lens.set left w1, weight_lens.set (right - 1) w2)
×
493
                , Full { left = a; right = b; seq_no; status = Job_status.Todo }
494
                )
495
          | [ Base _ ], Empty ->
×
496
              (*Depending on whether this is the first or second of the two base jobs*)
497
              let weight =
498
                if left = 0 then
499
                  (weight_lens.set left w1, weight_lens.set (right - 1) w2)
×
500
                else (weight_lens.set (left - 1) w1, weight_lens.set right w2)
×
501
              in
502
              Ok (weight, m)
503
          | [ Base _; Base _ ], Empty ->
560✔
504
              Ok
505
                ( (weight_lens.set (left - 1) w1, weight_lens.set (right - 1) w2)
560✔
506
                , m )
507
          | xs, m ->
×
508
              Or_error.errorf
×
509
                "Got %d jobs when updating level %d and when one of the merge \
510
                 nodes at level %d is %s"
511
                (List.length xs) update_level cur_level (Merge.Job.job_str m)
×
512
        in
513
        ((new_weight, m'), None)
640✔
514
      else if cur_level = update_level then
480✔
515
        (*Mark completed jobs as Done*)
516
        match (jobs, m) with
×
517
        | [ Merge a ], Full ({ status = Job_status.Todo; _ } as x) ->
×
518
            let new_job = Merge.Job.Full { x with status = Job_status.Done } in
519
            let scan_result, weight' =
520
              if cur_level = 0 then
521
                (Some a, (weight_lens.set 0 w1, weight_lens.set 0 w2))
×
522
              else (None, weight)
×
523
            in
524
            Ok ((weight', new_job), scan_result)
525
        | [], m ->
×
526
            Ok ((weight, m), None)
527
        | xs, m ->
×
528
            Or_error.errorf
529
              "Got %d jobs when updating level %d and when one of the merge \
530
               nodes at level %d is %s"
531
              (List.length xs) update_level cur_level (Merge.Job.job_str m)
×
532
      else if cur_level < update_level - 1 then
480✔
533
        (*Update the job count for all the level above*)
534
        match jobs with
480✔
535
        | [] ->
40✔
536
            Ok ((weight, m), None)
537
        | _ ->
440✔
538
            let jobs_sent_left = min (List.length jobs) left in
440✔
539
            let jobs_sent_right =
440✔
540
              min (List.length jobs - jobs_sent_left) right
440✔
541
            in
542
            let new_weight =
440✔
543
              ( weight_lens.set (left - jobs_sent_left) w1
440✔
544
              , weight_lens.set (right - jobs_sent_right) w2 )
440✔
545
            in
546
            Ok ((new_weight, m), None)
547
      else Ok ((weight, m), None)
×
548
    in
549
    let add_bases jobs (w, d) =
550
      let weight = weight_lens.get w in
1,280✔
551
      match (jobs, d) with
1,280✔
552
      | [], _ ->
160✔
553
          Ok (w, d)
554
      | [ Job.Base d ], Base.Job.Empty ->
1,120✔
555
          Ok
556
            ( weight_lens.set (weight - 1) w
1,120✔
557
            , Base.Job.Full { job = d; seq_no; status = Job_status.Todo } )
558
      | [ Job.Merge _ ], Full b ->
×
559
          Ok (w, Full { b with status = Job_status.Done })
560
      | xs, _ ->
×
561
          Or_error.errorf
562
            "Got %d jobs when updating level %d and when one of the base nodes \
563
             is %s"
564
            (List.length xs) update_level (Base.Job.job_str d)
×
565
    in
566
    let jobs = completed_jobs in
567
    update_split ~f_merge:add_merges ~f_base:add_bases tree ~weight_merge:fst
568
      ~jobs ~update_level ~jobs_split:(fun (w1, w2) a ->
569
        let l = weight_lens.get w1 in
1,120✔
570
        let r = weight_lens.get w2 in
1,120✔
571
        (List.take a l, List.take (List.drop a l) r) )
1,120✔
572

573
  let reset_weights :
574
         [ `Base | `Merge | `Both ]
575
      -> ('merge_t, 'base_t) t
576
      -> ('merge_t, 'base_t) t =
577
   fun weight_type tree ->
578
    let set_all_zero weight = Weight.base.set 0 (Weight.merge.set 0 weight) in
160✔
579
    let f_base base =
580
      let set_one (lens : (Weight.t, int) Lens.t) weight = lens.set 1 weight in
1,120✔
581
      let set_zero (lens : (Weight.t, int) Lens.t) weight = lens.set 0 weight in
1,440✔
582
      let update_merge_weight weight =
583
        (*When updating the merge-weight of base nodes, only the nodes with
584
          "Todo" status needs to be included*)
585
        match snd base with
1,280✔
586
        | Base.Job.Full { status = Job_status.Todo; _ } ->
1,120✔
587
            set_one Weight.merge weight
588
        | _ ->
160✔
589
            set_zero Weight.merge weight
590
      in
591
      let update_base_weight weight =
592
        (*When updating the base-weight of base nodes, only the Empty nodes
593
          need to be included*)
594
        match snd base with
960✔
595
        | Base.Job.Empty ->
×
596
            set_one Weight.base weight
597
        | _ ->
960✔
598
            set_zero Weight.base weight
599
      in
600
      let new_weight, dummy_right_for_base_nodes =
601
        match weight_type with
602
        | `Merge ->
320✔
603
            (update_merge_weight (fst base), set_zero Weight.merge (fst base))
320✔
604
        | `Base ->
×
605
            (update_base_weight (fst base), set_zero Weight.base (fst base))
×
606
        | `Both ->
960✔
607
            let w' = update_base_weight (fst base) in
960✔
608
            (update_merge_weight w', set_all_zero w')
960✔
609
      in
610
      ((new_weight, snd base), (new_weight, dummy_right_for_base_nodes))
1,280✔
611
    in
612
    let f_merge lst m =
613
      let (w1, w2), (w3, w4) = lst in
1,120✔
614
      let reset (lens : (Weight.t, int) Lens.t) (w, w') =
615
        (* Weights of all other jobs is sum of weights of its children*)
616
        ( lens.set (lens.get w1 + lens.get w2) w
1,960✔
617
        , lens.set (lens.get w3 + lens.get w4) w' )
1,960✔
618
      in
619
      let w' =
620
        match weight_type with
621
        | `Merge -> (
280✔
622
            (*When updating the merge-weight of merge nodes, only the nodes
623
              with "Todo" status needs to be included*)
624
            let lens = Weight.merge in
625
            match m with
626
            | (w1', w2'), Merge.Job.Full { status = Job_status.Todo; _ } ->
×
627
                (lens.set 1 w1', lens.set 0 w2')
×
628
            | w, _ ->
280✔
629
                reset lens w )
280✔
630
        | `Base ->
×
631
            (* The base-weight of merge nodes is the sum of weights of its
632
               children*)
633
            reset Weight.base (fst m)
×
634
        | `Both ->
840✔
635
            reset Weight.merge (reset Weight.base (fst m))
840✔
636
      in
637
      ((w', snd m), w')
1,120✔
638
    in
639
    fst (update_accumulate ~f_merge ~f_base tree)
160✔
640

641
  let jobs_on_level :
642
         depth:int
643
      -> level:int
644
      -> ('merge_t, 'base_t) t
645
      -> ('merge_job, 'base_job) Available_job.t list =
646
   fun ~depth ~level tree ->
647
    fold_depth ~init:[]
6,280✔
648
      ~f_merge:(fun i acc a ->
649
        match (i = level, a) with
43,960✔
650
        | true, (_weight, Merge.Job.Full { left; right; status = Todo; _ }) ->
×
651
            Available_job.Merge (left, right) :: acc
652
        | _ ->
43,960✔
653
            acc )
654
      ~f_base:(fun acc d ->
655
        match (level = depth, d) with
50,240✔
656
        | true, (_weight, Base.Job.Full { job; status = Todo; _ }) ->
42,320✔
657
            Available_job.Base job :: acc
658
        | _ ->
7,920✔
659
            acc )
660
      tree
661
    |> List.rev
662

663
  let to_hashable_jobs :
664
      ('merge_t, 'base_t) t -> ('merge_job, 'base_job) Job.t list =
665
   fun tree ->
666
    fold ~init:[]
1,480✔
667
      ~f_merge:(fun acc a ->
668
        match a with
10,360✔
669
        | _, Merge.Job.Full { status = Job_status.Done; _ } ->
×
670
            acc
671
        | _ ->
10,360✔
672
            Job.Merge a :: acc )
673
      ~f_base:(fun acc d ->
674
        match d with
11,840✔
675
        | _, Base.Job.Full { status = Job_status.Done; _ } ->
×
676
            acc
677
        | _ ->
11,840✔
678
            Job.Base d :: acc )
679
      tree
680
    |> List.rev
681

682
  let jobs_records : ('merge_t, 'base_t) t -> ('merge_job, 'base_job) Job.t list
683
      =
684
   fun tree ->
685
    fold ~init:[]
×
686
      ~f_merge:(fun acc a ->
687
        match a with
×
688
        | _weight, Merge.Job.Full x ->
×
689
            Job.Merge x :: acc
690
        | _ ->
×
691
            acc )
692
      ~f_base:(fun acc d ->
693
        match d with _weight, Base.Job.Full j -> Job.Base j :: acc | _ -> acc )
×
694
      tree
695
    |> List.rev
696

697
  let base_jobs : ('merge_t, _ * 'base_job Base.Job.t) t -> 'base_job list =
698
   fun tree ->
699
    fold_depth ~init:[]
20✔
700
      ~f_merge:(fun _ _ _ -> [])
140✔
701
      ~f_base:(fun acc d ->
702
        match d with _, Base.Job.Full { job; _ } -> job :: acc | _ -> acc )
×
703
      tree
704
    |> List.rev
705

706
  (*calculates the number of base and merge jobs that is currently with the Todo status*)
707
  let todo_job_count :
708
      (_ * 'merge_job Merge.Job.t, _ * 'base_job Base.Job.t) t -> int * int =
709
   fun tree ->
710
    fold_depth ~init:(0, 0)
740✔
711
      ~f_merge:(fun _ (b, m) (_, j) ->
712
        match j with
5,180✔
713
        | Merge.Job.Full { status = Job_status.Todo; _ } ->
×
714
            (b, m + 1)
715
        | _ ->
5,180✔
716
            (b, m) )
717
      ~f_base:(fun (b, m) (_, d) ->
718
        match d with
5,920✔
719
        | Base.Job.Full { status = Job_status.Todo; _ } ->
4,880✔
720
            (b + 1, m)
721
        | _ ->
1,040✔
722
            (b, m) )
723
      tree
724

725
  let leaves : ('merge_t, 'base_t) t -> 'base_t list =
726
   fun tree ->
727
    fold_depth ~init:[]
×
728
      ~f_merge:(fun _ _ _ -> [])
×
729
      ~f_base:(fun acc d ->
730
        match d with _, Base.Job.Full _ -> d :: acc | _ -> acc )
×
731
      tree
732
    |> List.rev
733

734
  let rec _view_tree :
735
      type merge_t base_t.
736
         (merge_t, base_t) t
737
      -> show_merge:(merge_t -> string)
738
      -> show_base:(base_t -> string)
739
      -> string =
740
   fun tree ~show_merge ~show_base ->
741
    match tree with
×
742
    | Leaf d ->
×
743
        sprintf !"Leaf %s\n" (show_base d)
×
744
    | Node { value; sub_tree; _ } ->
×
745
        let curr = sprintf !"Node %s\n" (show_merge value) in
×
746
        let subtree =
×
747
          _view_tree sub_tree
748
            ~show_merge:(fun (x, y) ->
749
              sprintf !"%s  %s" (show_merge x) (show_merge y) )
×
750
            ~show_base:(fun (x, y) ->
751
              sprintf !"%s  %s" (show_base x) (show_base y) )
×
752
        in
753
        curr ^ subtree
×
754

755
  let required_job_count = function
756
    | Node { value = (w1, w2), _; _ } ->
×
757
        Weight.merge.get w1 + Weight.merge.get w2
×
758
    | Leaf (w, _) ->
×
759
        Weight.merge.get w
760

761
  let available_space = function
762
    | Node { value = (w1, w2), _; _ } ->
4,100✔
763
        Weight.base.get w1 + Weight.base.get w2
4,100✔
764
    | Leaf (w, _) ->
×
765
        Weight.base.get w
766

767
  let view_jobs_with_position (tree : ('a, 'd) t) fa fd : 'c Job_view.t list =
768
    let f_merge acc a =
×
769
      let view =
×
770
        match snd a with
771
        | Merge.Job.Empty ->
×
772
            Job_view.Node.MEmpty
773
        | Part a ->
×
774
            MPart (fa a)
×
775
        | Full { left; right; seq_no; status } ->
×
776
            MFull (fa left, fa right, { Job_view.Extra.status; seq_no })
×
777
      in
778
      view :: acc
779
    in
780
    let f_base acc a =
781
      let view =
×
782
        match snd a with
783
        | Base.Job.Empty ->
×
784
            Job_view.Node.BEmpty
785
        | Full { seq_no; status; job } ->
×
786
            BFull (fd job, { seq_no; status })
×
787
      in
788
      view :: acc
789
    in
790
    let lst = fold ~f_merge ~f_base ~init:[] tree in
791
    let len = List.length lst - 1 in
×
792
    List.rev_mapi lst ~f:(fun i value -> { Job_view.position = len - i; value })
×
793
end
794

795
(*This structure works well because we always complete all the nodes on a specific level before proceeding to the next level*)
796
module T = struct
797
  module Binable_arg = struct
798
    [%%versioned
799
    module Stable = struct
800
      [@@@no_toplevel_latest_type]
801

802
      module V1 = struct
803
        type ('merge, 'base) t =
3✔
804
          { trees :
805
              ( 'merge Merge.Stable.V1.t
806
              , 'base Base.Stable.V1.t )
807
              Tree.Stable.V1.t
808
              Mina_stdlib.Nonempty_list.Stable.V1.t
809
          ; acc : ('merge * 'base list) option
810
          ; curr_job_seq_no : int
811
          ; max_base_jobs : int
812
          ; delay : int
813
          }
9✔
814
      end
815
    end]
816
  end
817

818
  [%%versioned_binable
819
  module Stable = struct
820
    module V1 = struct
821
      type ('merge, 'base) t = ('merge, 'base) Binable_arg.Stable.V1.t =
×
822
        { trees :
×
823
            ('merge Merge.Stable.V1.t, 'base Base.Stable.V1.t) Tree.Stable.V1.t
824
            Mina_stdlib.Nonempty_list.Stable.V1.t
825
              (*use non empty list*)
826
        ; acc : ('merge * 'base list) option
×
827
              (*last emitted proof and the corresponding transactions*)
828
        ; curr_job_seq_no : int
×
829
              (*Sequence number for the jobs added every block*)
830
        ; max_base_jobs : int (*transaction_capacity_log_2*)
×
831
        ; delay : int
×
832
        }
833
      [@@deriving sexp]
834

835
      (* Delete all the completed jobs because
836
         1. They are completed
837
         2. They are not required to create new jobs anymore
838
         3. We are not exposing these jobs for any sort of computation as of now
839
      *)
840
      let with_leaner_trees ({ trees; _ } as t) =
841
        let trees =
400✔
842
          Mina_stdlib.Nonempty_list.map trees ~f:(fun tree ->
843
              Tree.map tree
1,480✔
844
                ~f_merge:(fun merge_node ->
845
                  match snd merge_node with
10,360✔
846
                  | Merge.Job.Full { status = Job_status.Done; _ } ->
×
847
                      (fst merge_node, Merge.Job.Empty)
×
848
                  | _ ->
10,360✔
849
                      merge_node )
850
                ~f_base:Fn.id )
851
        in
852
        { t with trees }
400✔
853

854
      include
855
        Binable.Of_binable2_without_uuid
856
          (Binable_arg.Stable.V1)
857
          (struct
858
            type nonrec ('merge, 'base) t = ('merge, 'base) t
859

860
            let to_binable = with_leaner_trees
861

862
            let of_binable = Fn.id
863
          end)
864
    end
865
  end]
866

867
  [%%define_locally Stable.Latest.(with_leaner_trees)]
868

869
  let create_tree_for_level ~level ~depth ~merge_job ~base_job =
870
    let rec go :
1,940✔
871
        type merge_t base_t.
872
        int -> (int -> merge_t) -> base_t -> (merge_t, base_t) Tree.t =
873
     fun d fmerge base ->
874
      if d >= depth then Tree.Leaf base
1,940✔
875
      else
876
        let sub_tree =
5,820✔
877
          go (d + 1) (fun i -> (fmerge i, fmerge i)) (base, base)
7,760✔
878
        in
879
        Node { depth = d; value = fmerge d; sub_tree }
5,820✔
880
    in
881
    let weight base merge = { Weight.base; merge } in
29,100✔
882
    let base_weight = if level = -1 then weight 0 0 else weight 1 0 in
×
883
    go 0
884
      (fun d ->
885
        let weight =
13,580✔
886
          if level = -1 then (weight 0 0, weight 0 0)
×
887
          else
888
            let x = Int.pow 2 level / Int.pow 2 (d + 1) in
13,580✔
889
            (weight x 0, weight x 0)
13,580✔
890
        in
891
        (weight, merge_job) )
892
      (base_weight, base_job)
893

894
  let create_tree ~depth =
895
    create_tree_for_level ~level:depth ~depth ~merge_job:Merge.Job.Empty
1,940✔
896
      ~base_job:Base.Job.Empty
897

898
  let empty : type merge base. max_base_jobs:int -> delay:int -> (merge, base) t
899
      =
900
   fun ~max_base_jobs ~delay ->
901
    let depth = Int.ceil_log2 max_base_jobs in
20✔
902
    let first_tree :
20✔
903
        ( (Weight.t * Weight.t) * merge Merge.Job.t
904
        , Weight.t * base Base.Job.t )
905
        Tree.t =
906
      create_tree ~depth
907
    in
908
    { trees = Mina_stdlib.Nonempty_list.singleton first_tree
20✔
909
    ; acc = None
910
    ; curr_job_seq_no = 0
911
    ; max_base_jobs
912
    ; delay
913
    }
914
end
915

916
module State = struct
917
  include T
918
  module Hash = Hash
919

920
  let map (type a1 a2 b1 b2) (t : (a1, a2) t) ~(f1 : a1 -> b1) ~(f2 : a2 -> b2)
921
      : (b1, b2) t =
922
    { t with
×
923
      trees =
924
        Mina_stdlib.Nonempty_list.map t.trees
×
925
          ~f:
926
            (Tree.map_depth
927
               ~f_merge:(fun _ -> Merge.map ~f:f1)
×
928
               ~f_base:(Base.map ~f:f2) )
929
    ; acc = Option.map t.acc ~f:(fun (m, bs) -> (f1 m, List.map bs ~f:f2))
×
930
    }
931

932
  let hash t f_merge f_base =
933
    let { trees; acc; max_base_jobs; curr_job_seq_no; delay; _ } =
400✔
934
      with_leaner_trees t
935
    in
936
    let h = ref (Digestif.SHA256.init ()) in
400✔
937
    let add_string s = h := Digestif.SHA256.feed_string !h s in
118,200✔
938
    let () =
939
      let tree_hash tree f_merge f_base =
940
        List.iter (Tree.to_hashable_jobs tree) ~f:(fun job ->
1,480✔
941
            match job with Job.Merge a -> f_merge a | Base d -> f_base d )
10,360✔
942
      in
943
      Mina_stdlib.Nonempty_list.iter trees ~f:(fun tree ->
400✔
944
          let add_weight_to_hash { Weight.base = b; merge = m } =
1,480✔
945
            add_string @@ Int.to_string b ;
32,560✔
946
            add_string @@ Int.to_string m
32,560✔
947
          in
948
          let add_weight_pair_to_hash (w1, w2) =
949
            add_weight_to_hash w1 ; add_weight_to_hash w2
10,360✔
950
          in
951
          let f_merge = function
952
            | w, Merge.Job.Empty ->
10,360✔
953
                add_weight_pair_to_hash w ; add_string "Empty"
10,360✔
954
            | w, Merge.Job.Full { left; right; status; seq_no } ->
×
955
                add_weight_pair_to_hash w ;
956
                add_string "Full" ;
×
957
                add_string @@ Int.to_string seq_no ;
×
958
                add_string @@ Job_status.to_string status ;
×
959
                add_string (f_merge left) ;
×
960
                add_string (f_merge right)
×
961
            | w, Merge.Job.Part j ->
×
962
                add_weight_pair_to_hash w ;
963
                add_string "Part" ;
×
964
                add_string (f_merge j)
×
965
          in
966
          let f_base = function
967
            | w, Base.Job.Empty ->
2,080✔
968
                add_weight_to_hash w ; add_string "Empty"
2,080✔
969
            | w, Base.Job.Full { job; status; seq_no } ->
9,760✔
970
                add_weight_to_hash w ;
971
                add_string "Full" ;
9,760✔
972
                add_string @@ Int.to_string seq_no ;
9,760✔
973
                add_string @@ Job_status.to_string status ;
9,760✔
974
                add_string (f_base job)
9,760✔
975
          in
976
          tree_hash tree f_merge f_base )
977
    in
978
    ( match acc with
979
    | Some (a, d_lst) ->
×
980
        add_string (f_merge a) ;
×
981
        List.iter d_lst ~f:(fun d -> add_string (f_base d))
×
982
    | None ->
400✔
983
        add_string "None" ) ;
400✔
984
    add_string (Int.to_string curr_job_seq_no) ;
400✔
985
    add_string (Int.to_string max_base_jobs) ;
400✔
986
    add_string (Int.to_string delay) ;
400✔
987
    Digestif.SHA256.get !h
400✔
988

989
  module Make_foldable (M : Monad.S) = struct
990
    module Tree_foldable = Tree.Make_foldable (M)
991

992
    let fold_chronological_until :
993
           ('merge, 'base) t
994
        -> init:'acc
995
        -> f_merge:
996
             ('acc -> 'merge Merge.t -> ('acc, 'final) Continue_or_stop.t M.t)
997
        -> f_base:('acc -> 'base Base.t -> ('acc, 'final) Continue_or_stop.t M.t)
998
        -> finish:('acc -> 'final M.t)
999
        -> 'final M.t =
1000
     fun t ~init ~f_merge ~f_base ~finish ->
1001
      let open M.Let_syntax in
100✔
1002
      let open Container.Continue_or_stop in
1003
      let work_trees =
1004
        Mina_stdlib.Nonempty_list.rev t.trees
100✔
1005
        |> Mina_stdlib.Nonempty_list.to_list
1006
      in
1007
      let rec go acc = function
100✔
1008
        | [] ->
100✔
1009
            M.return (Continue acc)
1010
        | tree :: trees -> (
280✔
1011
            match%bind
1012
              Tree_foldable.fold_depth_until'
280✔
1013
                ~f_merge:(fun _ -> f_merge)
1,960✔
1014
                ~f_base ~init:acc tree
1015
            with
1016
            | Continue r ->
280✔
1017
                go r trees
1018
            | Stop e ->
×
1019
                M.return (Stop e) )
1020
      in
1021
      match%bind go init work_trees with
100✔
1022
      | Continue r ->
100✔
1023
          finish r
1024
      | Stop e ->
×
1025
          M.return e
1026
  end
1027

1028
  module Foldable_ident = Make_foldable (Monad.Ident)
1029

1030
  let fold_chronological t ~init ~f_merge ~f_base =
1031
    let open Container.Continue_or_stop in
×
1032
    Foldable_ident.fold_chronological_until t ~init
1033
      ~f_merge:(fun acc a -> Continue (f_merge acc a))
×
1034
      ~f_base:(fun acc d -> Continue (f_base acc d))
×
1035
      ~finish:Fn.id
1036
end
1037

1038
include T
1039
module State_or_error = State_or_error.Make3 (T)
1040

1041
let check b ~message = State_or_error.error_if b ~message ~value:()
1,360✔
1042

1043
let return_error e a =
1044
  State_or_error.error_if true ~message:(Error.to_string_hum e) ~value:a
×
1045

1046
let max_trees : ('merge, 'base) t -> int =
1047
 fun t -> ((Int.ceil_log2 t.max_base_jobs + 1) * (t.delay + 1)) + 1
800✔
1048

1049
let work_to_do :
1050
    type merge base.
1051
       (merge Merge.t, base Base.t) Tree.t list
1052
    -> max_base_jobs:int
1053
    -> (merge, base) Available_job.t list =
1054
 fun trees ~max_base_jobs ->
1055
  let depth = Int.ceil_log2 max_base_jobs in
6,800✔
1056
  List.concat_mapi trees ~f:(fun i tree ->
6,800✔
1057
      Tree.jobs_on_level ~depth ~level:(depth - i) tree )
5,900✔
1058

1059
let work :
1060
    type merge base.
1061
       (merge Merge.t, base Base.t) Tree.t list
1062
    -> delay:int
1063
    -> max_base_jobs:int
1064
    -> (merge, base) Available_job.t list =
1065
 fun trees ~delay ~max_base_jobs ->
1066
  let depth = Int.ceil_log2 max_base_jobs in
6,800✔
1067
  let work_trees =
6,800✔
1068
    List.take
1069
      (List.filteri trees ~f:(fun i _ -> i % delay = delay - 1))
6,800✔
1070
      (depth + 1)
1071
  in
1072
  work_to_do work_trees ~max_base_jobs
6,800✔
1073

1074
let work_for_tree t ~data_tree =
1075
  let delay = t.delay + 1 in
4,400✔
1076
  let trees =
1077
    match data_tree with
1078
    | `Current ->
3,400✔
1079
        Mina_stdlib.Nonempty_list.tail t.trees
3,400✔
1080
    | `Next ->
1,000✔
1081
        Mina_stdlib.Nonempty_list.to_list t.trees
1,000✔
1082
  in
1083
  work trees ~max_base_jobs:t.max_base_jobs ~delay
1084

1085
(*work on all the level and all the trees*)
1086
let all_work :
1087
    type merge base. (merge, base) t -> (merge, base) Available_job.t list list
1088
    =
1089
 fun t ->
1090
  let depth = Int.ceil_log2 t.max_base_jobs in
600✔
1091
  let set1 = work_for_tree t ~data_tree:`Current in
600✔
1092
  let _, other_sets =
600✔
1093
    List.fold ~init:(t, [])
1094
      (List.init ~f:Fn.id (t.delay + 1))
600✔
1095
      ~f:(fun (t, work_list) _ ->
1096
        let trees' =
1,800✔
1097
          Mina_stdlib.Nonempty_list.cons (create_tree ~depth) t.trees
1098
        in
1099
        let t' = { t with trees = trees' } in
1,800✔
1100
        match work_for_tree t' ~data_tree:`Current with
1101
        | [] ->
360✔
1102
            (t', work_list)
1103
        | work ->
1,440✔
1104
            (t', work :: work_list) )
1105
  in
1106
  if List.is_empty set1 then List.rev other_sets
160✔
1107
  else set1 :: List.rev other_sets
440✔
1108

1109
let work_for_next_update :
1110
    type merge base.
1111
    (merge, base) t -> data_count:int -> (merge, base) Available_job.t list list
1112
    =
1113
 fun t ~data_count ->
1114
  let delay = t.delay + 1 in
1,400✔
1115
  let current_tree_space =
1116
    Tree.available_space (Mina_stdlib.Nonempty_list.head t.trees)
1,400✔
1117
  in
1118
  let set1 =
1,400✔
1119
    work
1120
      (Mina_stdlib.Nonempty_list.tail t.trees)
1,400✔
1121
      ~max_base_jobs:t.max_base_jobs ~delay
1122
  in
1123
  let count = min data_count t.max_base_jobs in
1,400✔
1124
  if current_tree_space < count then
1,400✔
1125
    let set2 =
600✔
1126
      List.take
1127
        (work
600✔
1128
           (Mina_stdlib.Nonempty_list.to_list t.trees)
600✔
1129
           ~max_base_jobs:t.max_base_jobs ~delay )
1130
        ((count - current_tree_space) * 2)
1131
    in
1132
    List.filter ~f:(Fn.compose not List.is_empty) [ set1; set2 ]
600✔
1133
  else
1134
    let set = List.take set1 (2 * count) in
800✔
1135
    if List.is_empty set then [] else [ set ]
140✔
1136

1137
let free_space_on_current_tree t =
1138
  let tree = Mina_stdlib.Nonempty_list.head t.trees in
1,400✔
1139
  Tree.available_space tree
1,400✔
1140

1141
let cons b bs =
1142
  Option.value_map (Mina_stdlib.Nonempty_list.of_list_opt bs)
120✔
1143
    ~default:(Mina_stdlib.Nonempty_list.singleton b) ~f:(fun bs ->
120✔
1144
      Mina_stdlib.Nonempty_list.cons b bs )
120✔
1145

1146
let append bs bs' =
1147
  Option.value_map (Mina_stdlib.Nonempty_list.of_list_opt bs') ~default:bs
160✔
1148
    ~f:(fun bs' -> Mina_stdlib.Nonempty_list.append bs bs')
120✔
1149

1150
let add_merge_jobs :
1151
    completed_jobs:'merge list -> ('base, 'merge, _) State_or_error.t =
1152
 fun ~completed_jobs ->
1153
  let open State_or_error.Let_syntax in
800✔
1154
  if List.length completed_jobs = 0 then return None
800✔
1155
  else
1156
    let%bind state = State_or_error.get in
1157
    let delay = state.delay + 1 in
×
1158
    let depth = Int.ceil_log2 state.max_base_jobs in
1159
    let merge_jobs = List.map completed_jobs ~f:(fun j -> Job.Merge j) in
×
1160
    let jobs_required = work_for_tree state ~data_tree:`Current in
×
1161
    let%bind () =
1162
      check
×
1163
        (List.length merge_jobs > List.length jobs_required)
×
1164
        ~message:
1165
          (sprintf
×
1166
             !"More work than required: Required- %d got- %d"
1167
             (List.length jobs_required)
×
1168
             (List.length merge_jobs) )
×
1169
    in
1170
    let curr_tree, to_be_updated_trees =
×
1171
      ( Mina_stdlib.Nonempty_list.head state.trees
×
1172
      , Mina_stdlib.Nonempty_list.tail state.trees )
×
1173
    in
1174
    let%bind updated_trees, result_opt, _ =
1175
      let res =
1176
        List.foldi to_be_updated_trees
1177
          ~init:(Ok ([], None, merge_jobs))
1178
          ~f:(fun i acc tree ->
1179
            let open Or_error.Let_syntax in
×
1180
            let%bind trees, scan_result, jobs = acc in
1181
            if i % delay = delay - 1 && not (List.is_empty jobs) then
×
1182
              (*Every nth (n=delay) tree*)
1183
              match
×
1184
                Tree.update
1185
                  (List.take jobs (Tree.required_job_count tree))
×
1186
                  ~update_level:(depth - (i / delay))
1187
                  ~sequence_no:state.curr_job_seq_no ~weight_lens:Weight.merge
1188
                  tree
1189
              with
1190
              | Ok (tree', scan_result') ->
×
1191
                  Ok
1192
                    ( tree' :: trees
1193
                    , scan_result'
1194
                    , List.drop jobs (Tree.required_job_count tree) )
×
1195
              | Error e ->
×
1196
                  Error
1197
                    (Error.tag_arg e "Error while adding merge jobs to tree"
×
1198
                       ("tree_number", i) [%sexp_of: string * int] )
1199
            else Ok (tree :: trees, scan_result, jobs) )
×
1200
      in
1201
      match res with
×
1202
      | Ok res ->
×
1203
          State_or_error.return res
×
1204
      | Error e ->
×
1205
          return_error e ([], None, [])
×
1206
    in
1207
    let updated_trees, result_opt =
×
1208
      let updated_trees, result_opt =
1209
        Option.value_map result_opt
1210
          ~default:(List.rev updated_trees, None)
×
1211
          ~f:(fun res ->
1212
            match updated_trees with
×
1213
            | [] ->
×
1214
                ([], None)
1215
            | t :: ts ->
×
1216
                let tree_data = Tree.base_jobs t in
1217
                (List.rev ts, Some (res, tree_data)) )
×
1218
      in
1219
      if
×
1220
        Option.is_some result_opt
×
1221
        || List.length (curr_tree :: updated_trees) < max_trees state
×
1222
           && List.length completed_jobs = List.length jobs_required
×
1223
        (*exact number of jobs*)
1224
      then (List.map updated_trees ~f:(Tree.reset_weights `Merge), result_opt)
×
1225
      else (updated_trees, result_opt)
×
1226
    in
1227
    let all_trees = cons curr_tree updated_trees in
1228
    let%map _ = State_or_error.put { state with trees = all_trees } in
×
1229
    result_opt
×
1230

1231
let add_data : data:'base list -> (_, _, 'base) State_or_error.t =
1232
 fun ~data ->
1233
  let open State_or_error.Let_syntax in
800✔
1234
  if List.length data = 0 then return ()
640✔
1235
  else
1236
    let%bind state = State_or_error.get in
1237
    let depth = Int.ceil_log2 state.max_base_jobs in
160✔
1238
    let tree = Mina_stdlib.Nonempty_list.head state.trees in
160✔
1239
    let base_jobs = List.map data ~f:(fun j -> Job.Base j) in
160✔
1240
    let available_space = Tree.available_space tree in
160✔
1241
    let%bind () =
1242
      check
160✔
1243
        (List.length data > available_space)
160✔
1244
        ~message:
1245
          (sprintf
160✔
1246
             !"Data count (%d) exceeded available space (%d)"
1247
             (List.length data) available_space )
160✔
1248
    in
1249
    let%bind tree, _ =
1250
      match
1251
        Tree.update base_jobs ~update_level:depth
1252
          ~sequence_no:state.curr_job_seq_no ~weight_lens:Weight.base tree
1253
      with
1254
      | Ok res ->
160✔
1255
          State_or_error.return res
160✔
1256
      | Error e ->
×
1257
          return_error
×
1258
            (Error.tag ~tag:"Error while adding base jobs to the tree" e)
×
1259
            (tree, None)
1260
    in
1261
    let updated_trees =
160✔
1262
      if List.length base_jobs = available_space then
160✔
1263
        cons (create_tree ~depth) [ Tree.reset_weights `Both tree ]
120✔
1264
      else Mina_stdlib.Nonempty_list.singleton (Tree.reset_weights `Merge tree)
40✔
1265
    in
1266
    let%map _ =
1267
      State_or_error.put
160✔
1268
        { state with
1269
          trees =
1270
            append updated_trees (Mina_stdlib.Nonempty_list.tail state.trees)
160✔
1271
        }
1272
    in
1273
    ()
160✔
1274

1275
let reset_seq_no : type a b. (a, b) t -> (a, b) t =
1276
 fun state ->
1277
  let oldest_seq_no =
×
1278
    match
1279
      List.hd @@ Tree.leaves (Mina_stdlib.Nonempty_list.last state.trees)
×
1280
    with
1281
    | Some (_, Base.Job.Full { seq_no; _ }) ->
×
1282
        seq_no
1283
    | _ ->
×
1284
        0
1285
  in
1286
  let new_seq seq = seq - oldest_seq_no + 1 in
×
1287
  let f_merge (a : a Merge.t) : a Merge.t =
1288
    match a with
×
1289
    | w, Merge.Job.Full ({ seq_no; _ } as x) ->
×
1290
        (w, Merge.Job.Full { x with seq_no = new_seq seq_no })
×
1291
    | m ->
×
1292
        m
1293
  in
1294
  let f_base (b : b Base.t) : b Base.t =
1295
    match b with
×
1296
    | w, Base.Job.Full ({ seq_no; _ } as x) ->
×
1297
        (w, Base.Job.Full { x with seq_no = new_seq seq_no })
×
1298
    | b ->
×
1299
        b
1300
  in
1301
  let next_seq_no, updated_trees =
1302
    List.fold ~init:(0, []) (Mina_stdlib.Nonempty_list.to_list state.trees)
×
1303
      ~f:(fun (max_seq, updated_trees) tree ->
1304
        let tree' = Tree.map ~f_base ~f_merge tree in
×
1305
        let seq_no =
×
1306
          match List.last @@ Tree.leaves tree' with
×
1307
          | Some (_, Base.Job.Full { seq_no; _ }) ->
×
1308
              max seq_no max_seq
×
1309
          | _ ->
×
1310
              max_seq
1311
        in
1312
        (seq_no, tree' :: updated_trees) )
1313
  in
1314
  { state with
×
1315
    curr_job_seq_no = next_seq_no
1316
  ; trees =
1317
      Option.value_exn
×
1318
        (Mina_stdlib.Nonempty_list.of_list_opt (List.rev updated_trees))
×
1319
  }
1320

1321
let incr_sequence_no : type a b. (a, b) t -> (unit, a, b) State_or_error.t =
1322
 fun state ->
1323
  let open State_or_error in
400✔
1324
  if state.curr_job_seq_no + 1 = Int.max_value then
1325
    let state = reset_seq_no state in
×
1326
    put state
×
1327
  else put { state with curr_job_seq_no = state.curr_job_seq_no + 1 }
400✔
1328

1329
let update_metrics t =
1330
  Or_error.try_with (fun () ->
200✔
1331
      List.rev (Mina_stdlib.Nonempty_list.to_list t.trees)
200✔
1332
      |> List.iteri ~f:(fun i t ->
1333
             let name = sprintf "tree%d" i in
740✔
1334
             Mina_metrics.(
740✔
1335
               Gauge.set (Scan_state_metrics.scan_state_available_space ~name))
740✔
1336
               (Int.to_float @@ Tree.available_space t) ;
740✔
1337
             let base_job_count, merge_job_count = Tree.todo_job_count t in
740✔
1338
             Mina_metrics.(
740✔
1339
               Gauge.set (Scan_state_metrics.scan_state_base_snarks ~name))
740✔
1340
               (Int.to_float @@ base_job_count) ;
740✔
1341
             Mina_metrics.(
740✔
1342
               Gauge.set (Scan_state_metrics.scan_state_merge_snarks ~name))
740✔
1343
               (Int.to_float @@ merge_job_count) ) )
740✔
1344

1345
let update_helper :
1346
       data:'base list
1347
    -> completed_jobs:'merge list
1348
    -> ('a, 'merge, 'base) State_or_error.t =
1349
 fun ~data ~completed_jobs ->
1350
  let open State_or_error in
400✔
1351
  let open State_or_error.Let_syntax in
1352
  let%bind t = get in
1353
  let data_count = List.length data in
400✔
1354
  let%bind () =
1355
    check
400✔
1356
      (data_count > t.max_base_jobs)
1357
      ~message:
1358
        (sprintf
400✔
1359
           !"Data count (%d) exceeded maximum (%d)"
1360
           data_count t.max_base_jobs )
1361
  in
1362
  let required_jobs = List.concat @@ work_for_next_update t ~data_count in
400✔
1363
  let%bind () =
1364
    let required = (List.length required_jobs + 1) / 2 in
400✔
1365
    let got = (List.length completed_jobs + 1) / 2 in
400✔
1366
    check
400✔
1367
      (got < required && List.length data > t.max_base_jobs - required + got)
40✔
1368
      ~message:
1369
        (sprintf
400✔
1370
           !"Insufficient jobs (Data count %d): Required- %d got- %d"
1371
           data_count required got )
1372
  in
1373
  let delay = t.delay + 1 in
400✔
1374
  (*Increment the sequence number*)
1375
  let%bind () = incr_sequence_no t in
400✔
1376
  let latest_tree = Mina_stdlib.Nonempty_list.head t.trees in
400✔
1377
  let available_space = Tree.available_space latest_tree in
400✔
1378
  (*Possible that new base jobs is added to a new tree within an update i.e., part of it is added to the first tree and the rest of it to a new tree. This happens when the throughput is not max. This also requires merge jobs to be done on two different set of trees*)
1379
  let data1, data2 = List.split_n data available_space in
400✔
1380
  let required_jobs_for_current_tree =
400✔
1381
    work
400✔
1382
      (Mina_stdlib.Nonempty_list.tail t.trees)
400✔
1383
      ~max_base_jobs:t.max_base_jobs ~delay
1384
    |> List.length
1385
  in
1386
  let jobs1, jobs2 =
400✔
1387
    List.split_n completed_jobs required_jobs_for_current_tree
1388
  in
1389
  (*update first set of jobs and data*)
1390
  let%bind result_opt = add_merge_jobs ~completed_jobs:jobs1 in
1391
  let%bind () = add_data ~data:data1 in
1392
  (*update second set of jobs and data. This will be empty if all the data fit in the current tree*)
1393
  let%bind _ = add_merge_jobs ~completed_jobs:jobs2 in
1394
  let%bind () = add_data ~data:data2 in
1395
  let%bind state = State_or_error.get in
1396
  (*update the latest emitted value *)
1397
  let%bind () =
1398
    State_or_error.put
400✔
1399
      { state with acc = Option.merge result_opt state.acc ~f:Fn.const }
400✔
1400
  in
1401
  (*Check the tree-list length is under max*)
1402
  let%map () =
1403
    check
400✔
1404
      (Mina_stdlib.Nonempty_list.length state.trees > max_trees state)
400✔
1405
      ~message:
1406
        (sprintf
400✔
1407
           !"Tree list length (%d) exceeded maximum (%d)"
1408
           (Mina_stdlib.Nonempty_list.length state.trees)
400✔
1409
           (max_trees state) )
400✔
1410
  in
1411
  result_opt
400✔
1412

1413
let update :
1414
       data:'base list
1415
    -> completed_jobs:'merge list
1416
    -> ('merge, 'base) t
1417
    -> (('merge * 'base list) option * ('merge, 'base) t) Or_error.t =
1418
 fun ~data ~completed_jobs state ->
1419
  State_or_error.run_state (update_helper ~data ~completed_jobs) ~state
400✔
1420

1421
let all_jobs t = all_work t
600✔
1422

1423
let jobs_for_next_update t = work_for_next_update t ~data_count:t.max_base_jobs
1,000✔
1424

1425
let jobs_for_slots t ~slots = work_for_next_update t ~data_count:slots
×
1426

1427
let free_space t = t.max_base_jobs
1,280✔
1428

1429
let last_emitted_value t = t.acc
520✔
1430

1431
let current_job_sequence_number t = t.curr_job_seq_no
×
1432

1433
let base_jobs_on_latest_tree t =
1434
  let depth = Int.ceil_log2 t.max_base_jobs in
200✔
1435
  List.filter_map
200✔
1436
    (Tree.jobs_on_level ~depth ~level:depth
200✔
1437
       (Mina_stdlib.Nonempty_list.head t.trees) )
200✔
1438
    ~f:(fun job -> match job with Base d -> Some d | Merge _ -> None)
×
1439

1440
(* 0-based indexing, so 0 indicates next-to-latest tree *)
1441
let base_jobs_on_earlier_tree t ~index =
1442
  let depth = Int.ceil_log2 t.max_base_jobs in
200✔
1443
  let earlier_trees = Mina_stdlib.Nonempty_list.tail t.trees in
200✔
1444
  match List.nth earlier_trees index with
200✔
1445
  | None ->
20✔
1446
      []
1447
  | Some tree ->
180✔
1448
      let jobs = Tree.jobs_on_level ~depth ~level:depth tree in
1449
      List.filter_map jobs ~f:(fun job ->
180✔
1450
          match job with Base d -> Some d | Merge _ -> None )
×
1451

1452
let partition_if_overflowing : ('merge, 'base) t -> Space_partition.t =
1453
 fun t ->
1454
  let cur_tree_space = free_space_on_current_tree t in
1,000✔
1455
  (*Check actual work count because it would be zero initially*)
1456
  let work_count = work_for_tree t ~data_tree:`Current |> List.length in
1,000✔
1457
  let work_count_new_tree = work_for_tree t ~data_tree:`Next |> List.length in
1,000✔
1458
  { first = (cur_tree_space, work_count)
1,000✔
1459
  ; second =
1460
      ( if cur_tree_space < t.max_base_jobs then
1461
        let slots = t.max_base_jobs - cur_tree_space in
600✔
1462
        Some (slots, min work_count_new_tree (2 * slots))
600✔
1463
      else None )
400✔
1464
  }
1465

1466
let next_on_new_tree t =
1467
  let curr_tree_space = free_space_on_current_tree t in
400✔
1468
  curr_tree_space = t.max_base_jobs
400✔
1469

1470
let pending_data t =
1471
  List.map Mina_stdlib.Nonempty_list.(to_list @@ rev t.trees) ~f:Tree.base_jobs
20✔
1472

1473
let view_jobs_with_position (state : ('merge, 'base) State.t) fa fd =
1474
  List.fold ~init:[] (Mina_stdlib.Nonempty_list.to_list state.trees)
×
1475
    ~f:(fun acc tree -> Tree.view_jobs_with_position tree fa fd :: acc)
×
1476

1477
let job_count t =
1478
  State.fold_chronological t ~init:(0., 0.)
×
1479
    ~f_merge:(fun (c, c') merge_node ->
1480
      let count_todo, count_done =
×
1481
        match snd merge_node with
1482
        | Merge.Job.Part _ ->
×
1483
            (0.5, 0.)
1484
        | Full { status = Job_status.Todo; _ } ->
×
1485
            (1., 0.)
1486
        | Full { status = Job_status.Done; _ } ->
×
1487
            (0., 1.)
1488
        | Empty ->
×
1489
            (0., 0.)
1490
      in
1491
      (c +. count_todo, c' +. count_done) )
1492
    ~f_base:(fun (c, c') base_node ->
1493
      let count_todo, count_done =
×
1494
        match snd base_node with
1495
        | Base.Job.Empty ->
×
1496
            (0., 0.)
1497
        | Full { status = Job_status.Todo; _ } ->
×
1498
            (1., 0.)
1499
        | Full { status = Job_status.Done; _ } ->
×
1500
            (0., 1.)
1501
      in
1502
      (c +. count_todo, c' +. count_done) )
1503

1504
let assert_job_count t t' ~completed_job_count ~base_job_count ~value_emitted =
1505
  let todo_before, done_before = job_count t in
×
1506
  let todo_after, done_after = job_count t' in
×
1507
  (*ordered list of jobs that is actually called when distributing work*)
1508
  let all_jobs = List.concat (all_jobs t') in
×
1509
  (*list of jobs*)
1510
  let all_jobs_expected =
×
1511
    List.fold ~init:[] (Mina_stdlib.Nonempty_list.to_list t'.trees)
×
1512
      ~f:(fun acc tree -> Tree.jobs_records tree @ acc)
×
1513
    |> List.filter ~f:(fun job ->
1514
           match job with
×
1515
           | Job.Base { status = Job_status.Todo; _ }
×
1516
           | Job.Merge { status = Todo; _ } ->
×
1517
               true
1518
           | _ ->
×
1519
               false )
1520
  in
1521
  assert (List.length all_jobs = List.length all_jobs_expected) ;
×
1522
  let expected_todo_after =
1523
    let new_jobs =
1524
      if value_emitted then (completed_job_count -. 1.) /. 2.
×
1525
      else completed_job_count /. 2.
×
1526
    in
1527
    todo_before +. base_job_count -. completed_job_count +. new_jobs
1528
  in
1529
  let expected_done_after =
1530
    let jobs_from_delete_tree =
1531
      if value_emitted then Float.of_int @@ ((2 * t.max_base_jobs) - 1) else 0.
×
1532
    in
1533
    done_before +. completed_job_count -. jobs_from_delete_tree
1534
  in
1535
  assert (
×
1536
    Float.equal todo_after expected_todo_after
×
1537
    && Float.equal done_after expected_done_after )
×
1538

1539
let test_update t ~data ~completed_jobs =
1540
  let result_opt, t' = update ~data ~completed_jobs t |> Or_error.ok_exn in
×
1541
  assert_job_count t t'
×
1542
    ~base_job_count:(Float.of_int @@ List.length data)
×
1543
    ~completed_job_count:(Float.of_int @@ List.length completed_jobs)
×
1544
    ~value_emitted:(Option.is_some result_opt) ;
×
1545
  (result_opt, t')
×
1546

1547
let%test_module "test" =
1548
  ( module struct
1549
    let%test_unit "always max base jobs" =
1550
      let max_base_jobs = 512 in
×
1551
      let state = empty ~max_base_jobs ~delay:3 in
1552
      let _t' =
1553
        List.foldi ~init:([], state) (List.init 100 ~f:Fn.id)
×
1554
          ~f:(fun i (expected_results, t) _ ->
1555
            let data = List.init max_base_jobs ~f:(fun j -> i + j) in
×
1556
            let expected_results = data :: expected_results in
×
1557
            let work =
1558
              work_for_next_update t ~data_count:(List.length data)
×
1559
              |> List.concat
1560
            in
1561
            let new_merges =
×
1562
              List.map work ~f:(fun job ->
1563
                  match job with Base i -> i | Merge (i, j) -> i + j )
×
1564
            in
1565
            let result_opt, t' =
×
1566
              test_update ~data ~completed_jobs:new_merges t
1567
            in
1568
            let expected_result, remaining_expected_results =
×
1569
              Option.value_map result_opt
1570
                ~default:((0, []), expected_results)
1571
                ~f:(fun _ ->
1572
                  match List.rev expected_results with
×
1573
                  | [] ->
×
1574
                      ((0, []), [])
1575
                  | x :: xs ->
×
1576
                      ((List.sum (module Int) x ~f:Fn.id, x), List.rev xs) )
×
1577
            in
1578
            assert (
×
1579
              [%equal: int * int list]
×
1580
                (Option.value ~default:expected_result result_opt)
×
1581
                expected_result ) ;
1582
            (remaining_expected_results, t') )
1583
      in
1584
      ()
×
1585

1586
    let%test_unit "Random base jobs" =
1587
      let max_base_jobs = 512 in
×
1588
      let t = empty ~max_base_jobs ~delay:3 in
1589
      let state = ref t in
1590
      Quickcheck.test
1591
        (Quickcheck.Generator.list (Int.gen_incl 1 1))
×
1592
        ~f:(fun list ->
1593
          let t = !state in
×
1594
          let data = List.take list max_base_jobs in
1595
          let work =
×
1596
            List.take
1597
              ( work_for_next_update t ~data_count:(List.length data)
×
1598
              |> List.concat )
×
1599
              (List.length data * 2)
×
1600
          in
1601
          let new_merges =
×
1602
            List.map work ~f:(fun job ->
1603
                match job with Base i -> i | Merge (i, j) -> i + j )
×
1604
          in
1605
          let result_opt, t' = test_update ~data ~completed_jobs:new_merges t in
×
1606
          let expected_result =
×
1607
            (max_base_jobs, List.init max_base_jobs ~f:(fun _ -> 1))
×
1608
          in
1609
          assert (
×
1610
            [%equal: int * int list]
×
1611
              (Option.value ~default:expected_result result_opt)
×
1612
              expected_result ) ;
1613
          state := t' )
1614
  end )
1615

1616
let gen :
1617
       gen_data:'d Quickcheck.Generator.t
1618
    -> f_job_done:(('a, 'd) Available_job.t -> 'a)
1619
    -> f_acc:(('a * 'd list) option -> 'a * 'd list -> ('a * 'd list) option)
1620
    -> ('a, 'd) State.t Quickcheck.Generator.t =
1621
 fun ~gen_data ~f_job_done ~f_acc ->
1622
  let open Quickcheck.Generator.Let_syntax in
×
1623
  let%bind depth, delay =
1624
    Quickcheck.Generator.tuple2 (Int.gen_incl 2 5) (Int.gen_incl 0 3)
×
1625
  in
1626
  let max_base_jobs = Int.pow 2 depth in
×
1627
  let s = State.empty ~max_base_jobs ~delay in
×
1628
  let%map datas =
1629
    Quickcheck.Generator.(
1630
      list_non_empty (list_with_length max_base_jobs gen_data))
×
1631
  in
1632
  List.fold datas ~init:s ~f:(fun s chunk ->
×
1633
      let jobs =
×
1634
        List.concat (work_for_next_update s ~data_count:(List.length chunk))
×
1635
      in
1636
      let jobs_done = List.map jobs ~f:f_job_done in
×
1637
      let old_tuple = s.acc in
×
1638
      let res_opt, s =
1639
        Or_error.ok_exn @@ update ~data:chunk s ~completed_jobs:jobs_done
×
1640
      in
1641
      Option.value_map ~default:s res_opt ~f:(fun x ->
×
1642
          let tuple = if Option.is_some old_tuple then old_tuple else s.acc in
×
1643
          { s with acc = f_acc tuple x } ) )
×
1644

1645
let%test_module "scans" =
1646
  ( module struct
1647
    module Queue = Queue
1648

1649
    let rec step_on_free_space state w ds f f_acc =
1650
      let data = List.take ds state.max_base_jobs in
×
1651
      let jobs =
×
1652
        List.concat (work_for_next_update state ~data_count:(List.length data))
×
1653
      in
1654
      let jobs_done = List.map jobs ~f in
×
1655
      let old_tuple = state.acc in
×
1656
      let res_opt, state = test_update ~data state ~completed_jobs:jobs_done in
1657
      let state =
×
1658
        Option.value_map ~default:state res_opt ~f:(fun x ->
1659
            let tuple =
×
1660
              if Option.is_some old_tuple then f_acc old_tuple x else state.acc
×
1661
            in
1662
            { state with acc = tuple } )
1663
      in
1664
      let%bind () = Linear_pipe.write w state.acc in
×
1665
      let rem_ds = List.drop ds state.max_base_jobs in
×
1666
      if List.length rem_ds > 0 then step_on_free_space state w rem_ds f f_acc
×
1667
      else return state
×
1668

1669
    let do_steps ~state ~data ~f ~f_acc w =
1670
      let rec go () =
×
1671
        match%bind Linear_pipe.read' data with
×
1672
        | `Eof ->
×
1673
            return ()
1674
        | `Ok q ->
×
1675
            let ds = Queue.to_list q in
1676
            let%bind s = step_on_free_space !state w ds f f_acc in
×
1677
            state := s ;
×
1678
            go ()
1679
      in
1680
      go ()
1681

1682
    let scan ~data ~depth ~f ~f_acc =
1683
      Linear_pipe.create_reader ~close_on_exception:true (fun w ->
×
1684
          let s = ref (empty ~max_base_jobs:(Int.pow 2 depth) ~delay:1) in
×
1685
          do_steps ~state:s ~data ~f w ~f_acc )
1686

1687
    let step_repeatedly ~state ~data ~f ~f_acc =
1688
      Linear_pipe.create_reader ~close_on_exception:true (fun w ->
×
1689
          do_steps ~state ~data ~f w ~f_acc )
×
1690

1691
    let%test_module "scan (+) over ints" =
1692
      ( module struct
1693
        let f_merge_up (state : (int64 * int64 list) option) x =
1694
          let open Option.Let_syntax in
×
1695
          let%map acc = state in
1696
          (Int64.( + ) (fst acc) (fst x), snd acc @ snd x)
×
1697

1698
        let job_done (job : (Int64.t, Int64.t) Available_job.t) : Int64.t =
1699
          match job with Base x -> x | Merge (x, y) -> Int64.( + ) x y
×
1700

1701
        let%test_unit "Split only if enqueuing onto the next queue" =
1702
          let p = 4 in
×
1703
          let max_base_jobs = Int.pow 2 p in
1704
          let g = Int.gen_incl 0 max_base_jobs in
×
1705
          let state = State.empty ~max_base_jobs ~delay:1 in
×
1706
          Quickcheck.test g ~trials:1000 ~f:(fun i ->
1707
              let data = List.init i ~f:Int64.of_int in
×
1708
              let partition = partition_if_overflowing state in
×
1709
              let jobs =
×
1710
                List.concat
1711
                @@ work_for_next_update state ~data_count:(List.length data)
×
1712
              in
1713
              let jobs_done = List.map jobs ~f:job_done in
×
1714
              let tree_count_before =
×
1715
                Mina_stdlib.Nonempty_list.length state.trees
1716
              in
1717
              let _, state =
×
1718
                test_update ~data state ~completed_jobs:jobs_done
1719
              in
1720
              match partition.second with
×
1721
              | None ->
×
1722
                  let tree_count_after =
1723
                    Mina_stdlib.Nonempty_list.length state.trees
1724
                  in
1725
                  let expected_tree_count =
×
1726
                    if i = fst partition.first then tree_count_before + 1
×
1727
                    else tree_count_before
×
1728
                  in
1729
                  assert (tree_count_after = expected_tree_count)
×
1730
              | Some _ ->
×
1731
                  let tree_count_after =
1732
                    Mina_stdlib.Nonempty_list.length state.trees
1733
                  in
1734
                  let expected_tree_count =
×
1735
                    if i > fst partition.first then tree_count_before + 1
×
1736
                    else tree_count_before
×
1737
                  in
1738
                  assert (tree_count_after = expected_tree_count) )
×
1739

1740
        let%test_unit "sequence number reset" =
1741
          (*create jobs with unique sequence numbers starting from 1. At any
1742
            point, after reset, the jobs should be labelled starting from 1.
1743
          *)
1744
          Backtrace.elide := false ;
×
1745
          let p = 3 in
1746
          let g = Int.gen_incl 0 (Int.pow 2 p) in
×
1747
          let max_base_jobs = Int.pow 2 p in
×
1748
          let jobs state =
×
1749
            List.fold ~init:[] (Mina_stdlib.Nonempty_list.to_list state.trees)
×
1750
              ~f:(fun acc tree -> Tree.jobs_records tree :: acc)
×
1751
          in
1752
          let verify_sequence_number state =
1753
            let state = reset_seq_no state in
×
1754
            let jobs_list = jobs state in
×
1755
            let depth = Int.ceil_log2 max_base_jobs + 1 in
×
1756
            List.iteri jobs_list ~f:(fun i jobs ->
1757
                (*each tree has jobs up till a level below the older tree*)
1758
                (* and have the following sequence numbers after reset
1759
                   *         4
1760
                   *     3       3
1761
                   *   2   2   2   2
1762
                   *  1 1 1 1 1 1 1 1
1763
                *)
1764
                let cur_levels = depth - i in
×
1765
                let seq_sum =
1766
                  List.fold (List.init cur_levels ~f:Fn.id) ~init:0
×
1767
                    ~f:(fun acc j ->
1768
                      let j = j + i in
×
1769
                      acc + (Int.pow 2 j * (depth - j)) )
×
1770
                in
1771
                let offset = i in
×
1772
                let sum_of_all_seq_numbers =
1773
                  List.sum
1774
                    (module Int)
1775
                    ~f:(fun (job :
1776
                              (int64 Merge.Record.t, int64 Base.Record.t) Job.t
1777
                              ) ->
1778
                      match job with
×
1779
                      | Job.Merge { seq_no; _ } ->
×
1780
                          seq_no - offset
1781
                      | Base { seq_no; _ } ->
×
1782
                          seq_no - offset )
1783
                    jobs
1784
                in
1785
                assert (sum_of_all_seq_numbers = seq_sum) )
×
1786
          in
1787
          let state = ref (State.empty ~max_base_jobs ~delay:0) in
1788
          let counter = ref 0 in
1789
          Quickcheck.test g ~trials:50 ~f:(fun _ ->
1790
              let jobs = List.concat (jobs_for_next_update !state) in
×
1791
              let jobs_done = List.map jobs ~f:job_done in
×
1792
              let data = List.init max_base_jobs ~f:Int64.of_int in
×
1793
              let res_opt, s =
×
1794
                test_update ~data !state ~completed_jobs:jobs_done
1795
              in
1796
              state := s ;
×
1797
              if Option.is_some res_opt then
1798
                (*start the rest after enough jobs are created*)
1799
                if !counter >= p + 1 then verify_sequence_number !state
×
1800
                else counter := !counter + 1
×
1801
              else () )
×
1802

1803
        let%test_unit "serialize, deserialize scan state" =
1804
          Backtrace.elide := false ;
×
1805
          let g =
1806
            gen
1807
              ~gen_data:
1808
                Quickcheck.Generator.Let_syntax.(
1809
                  Int.quickcheck_generator >>| Int64.of_int)
×
1810
              ~f_job_done:job_done ~f_acc:f_merge_up
1811
          in
1812
          Quickcheck.test g ~sexp_of:[%sexp_of: (int64, int64) State.t]
1813
            ~trials:50 ~f:(fun s ->
1814
              let hash_s = State.hash s Int64.to_string Int64.to_string in
×
1815
              let sz =
×
1816
                State.Stable.Latest.bin_size_t Int64.bin_size_t Int64.bin_size_t
1817
                  s
1818
              in
1819
              let buf = Bin_prot.Common.create_buf sz in
×
1820
              ignore
×
1821
                ( State.Stable.Latest.bin_write_t Int64.bin_write_t
×
1822
                    Int64.bin_write_t buf ~pos:0 s
1823
                  : int ) ;
1824
              let deserialized =
1825
                State.Stable.Latest.bin_read_t Int64.bin_read_t Int64.bin_read_t
1826
                  ~pos_ref:(ref 0) buf
1827
              in
1828
              let new_hash =
×
1829
                State.hash deserialized Int64.to_string Int64.to_string
1830
              in
1831
              assert (Hash.equal hash_s new_hash) )
×
1832

1833
        let%test_unit "scan can be initialized from intermediate state" =
1834
          Backtrace.elide := false ;
×
1835
          let g =
1836
            gen
1837
              ~gen_data:
1838
                Quickcheck.Generator.Let_syntax.(
1839
                  Int.quickcheck_generator >>| Int64.of_int)
×
1840
              ~f_job_done:job_done ~f_acc:f_merge_up
1841
          in
1842
          Quickcheck.test g ~sexp_of:[%sexp_of: (int64, int64) State.t]
1843
            ~trials:10 ~f:(fun s ->
1844
              let s = ref s in
×
1845
              Async.Thread_safe.block_on_async_exn (fun () ->
1846
                  let do_one_next = ref false in
×
1847
                  (* For any arbitrary intermediate state *)
1848
                  (* if we then add 1 and a bunch of zeros *)
1849
                  let one_then_zeros =
1850
                    Linear_pipe.create_reader ~close_on_exception:true (fun w ->
1851
                        let rec go () =
×
1852
                          let next =
×
1853
                            if !do_one_next then (
×
1854
                              do_one_next := false ;
1855
                              Int64.one )
1856
                            else Int64.zero
×
1857
                          in
1858
                          let%bind () = Pipe.write w next in
×
1859
                          go ()
×
1860
                        in
1861
                        go () )
1862
                  in
1863
                  let pipe s =
×
1864
                    step_repeatedly ~state:s ~data:one_then_zeros ~f:job_done
×
1865
                      ~f_acc:f_merge_up
1866
                  in
1867
                  let parallelism =
1868
                    !s.max_base_jobs * Int.ceil_log2 !s.max_base_jobs
×
1869
                  in
1870
                  let fill_some_zeros v s =
1871
                    List.init (parallelism * parallelism) ~f:(fun _ -> ())
×
1872
                    |> Deferred.List.fold ~init:v ~f:(fun v _ ->
1873
                           match%map Linear_pipe.read (pipe s) with
×
1874
                           | `Eof ->
×
1875
                               v
1876
                           | `Ok (Some (v', _)) ->
×
1877
                               v'
1878
                           | `Ok None ->
×
1879
                               v )
1880
                  in
1881
                  (* after we flush intermediate work *)
1882
                  let old_acc =
1883
                    !s.acc |> Option.value ~default:Int64.(zero, [])
1884
                  in
1885
                  let%bind v = fill_some_zeros Int64.zero s in
×
1886
                  do_one_next := true ;
×
1887
                  let acc = !s.acc |> Option.value_exn in
1888
                  assert (not ([%equal: int64] (fst acc) (fst old_acc))) ;
×
1889
                  (* eventually we'll emit the acc+1 element *)
1890
                  let%map _ = fill_some_zeros v s in
×
1891
                  let acc_plus_one = !s.acc |> Option.value_exn in
×
1892
                  assert (Int64.(equal (fst acc_plus_one) (fst acc + one))) ) )
×
1893
      end )
1894

1895
    let%test_module "scan (+) over ints, map from string" =
1896
      ( module struct
1897
        let f_merge_up (tuple : (int64 * string list) option) x =
1898
          let open Option.Let_syntax in
×
1899
          let%map acc = tuple in
1900
          (Int64.( + ) (fst acc) (fst x), snd acc @ snd x)
×
1901

1902
        let job_done (job : (Int64.t, string) Available_job.t) : Int64.t =
1903
          match job with
×
1904
          | Base x ->
×
1905
              Int64.of_string x
1906
          | Merge (x, y) ->
×
1907
              Int64.( + ) x y
1908

1909
        let%test_unit "scan behaves like a fold long-term" =
1910
          let a_bunch_of_ones_then_zeros x =
×
1911
            { Linear_pipe.Reader.pipe =
×
1912
                Pipe.unfold ~init:x ~f:(fun count ->
1913
                    let next =
×
1914
                      if count <= 0 then "0" else Int.to_string (x - count)
×
1915
                    in
1916
                    return (Some (next, count - 1)) )
1917
            ; has_reader = false
1918
            }
1919
          in
1920
          let depth = 7 in
1921
          let n = 1000 in
1922
          let result =
1923
            scan
1924
              ~data:(a_bunch_of_ones_then_zeros n)
×
1925
              ~depth ~f:job_done ~f_acc:f_merge_up
1926
          in
1927
          Async.Thread_safe.block_on_async_exn (fun () ->
1928
              let%map after_3n =
1929
                List.init (4 * n) ~f:(fun _ -> ())
×
1930
                |> Deferred.List.fold ~init:Int64.zero ~f:(fun acc _ ->
×
1931
                       match%map Linear_pipe.read result with
×
1932
                       | `Eof ->
×
1933
                           acc
1934
                       | `Ok (Some (v, _)) ->
×
1935
                           v
1936
                       | `Ok None ->
×
1937
                           acc )
1938
              in
1939
              let expected =
×
1940
                List.fold
1941
                  (List.init n ~f:(fun i -> Int64.of_int i))
×
1942
                  ~init:Int64.zero ~f:Int64.( + )
1943
              in
1944
              assert ([%equal: int64] after_3n expected) )
×
1945
      end )
1946

1947
    let%test_module "scan (concat) over strings" =
1948
      ( module struct
1949
        let f_merge_up (tuple : (string * string list) option) x =
1950
          let open Option.Let_syntax in
×
1951
          let%map acc = tuple in
1952
          (String.( ^ ) (fst acc) (fst x), snd acc @ snd x)
×
1953

1954
        let job_done (job : (string, string) Available_job.t) : string =
1955
          match job with Base x -> x | Merge (x, y) -> String.( ^ ) x y
×
1956

1957
        let%test_unit "scan performs operation in correct order with \
1958
                       non-commutative semigroup" =
1959
          Backtrace.elide := false ;
×
1960
          let a_bunch_of_nums_then_empties x =
1961
            { Linear_pipe.Reader.pipe =
×
1962
                Pipe.unfold ~init:x ~f:(fun count ->
1963
                    let next =
×
1964
                      if count <= 0 then "" else Int.to_string (x - count) ^ ","
×
1965
                    in
1966
                    return (Some (next, count - 1)) )
1967
            ; has_reader = false
1968
            }
1969
          in
1970
          let n = 100 in
1971
          let result =
1972
            scan
1973
              ~data:(a_bunch_of_nums_then_empties n)
×
1974
              ~depth:7 ~f:job_done ~f_acc:f_merge_up
1975
          in
1976
          Async.Thread_safe.block_on_async_exn (fun () ->
1977
              let%map after_42n =
1978
                List.init (42 * n) ~f:(fun _ -> ())
×
1979
                |> Deferred.List.fold ~init:"" ~f:(fun acc _ ->
×
1980
                       match%map Linear_pipe.read result with
×
1981
                       | `Eof ->
×
1982
                           acc
1983
                       | `Ok (Some (v, _)) ->
×
1984
                           v
1985
                       | `Ok None ->
×
1986
                           acc )
1987
              in
1988
              let expected =
×
1989
                List.fold
1990
                  (List.init n ~f:(fun i -> Int.to_string i ^ ","))
×
1991
                  ~init:"" ~f:String.( ^ )
1992
              in
1993
              assert (String.equal after_42n expected) )
×
1994
      end )
1995
  end )
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc