• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 1265

09 Apr 2026 08:59PM UTC coverage: 32.927% (-28.3%) from 61.266%
1265

push

buildkite

web-flow
Merge pull request #18726 from MinaProtocol/dkijania/fix-connect-to-mesa-dependency

24719 of 75073 relevant lines covered (32.93%)

16068.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.44
/src/lib/parallel_scan/parallel_scan.ml
1
open Core_kernel
147✔
2
open Async_kernel
3
open Pipe_lib
4

5
(*Glossary of type variables used in this file:
6
  1. 'base: polymorphic type for jobs in the leaves of the scan state tree
7
  2. 'merge: polymorphic type for jobs in the intermediate nodes of the scan state tree
8
  3. 'base_t: 'base Base.t
9
  4. 'merge_t: 'merge Merge.t
10
  5. 'base_job: Base.Job.t
11
  6. 'merge_job: Merge.Job.t
12
*)
13

14
(*Note:  Prefixing some of the general purpose functions that could be used in the future with an "_" to not cause "unused function" error*)
15

16
(**Sequence number for jobs in the scan state that corresponds to the order in
17
which they were added*)
18
module Sequence_number = struct
19
  [%%versioned
20
  module Stable = struct
21
    module V1 = struct
22
      type t = int [@@deriving sexp]
147✔
23

24
      let to_latest = Fn.id
25
    end
26
  end]
27
end
28

29
(**Each node on the tree is viewed as a job that needs to be completed. When a
30
job is completed, it creates a new "Todo" job and marks the old job as "Done"*)
31
module Job_status = struct
32
  [%%versioned
33
  module Stable = struct
34
    module V1 = struct
35
      type t = Todo | Done [@@deriving sexp]
×
36

37
      let to_latest = Fn.id
38
    end
39
  end]
40

41
  let to_string = function Todo -> "Todo" | Done -> "Done"
×
42
end
43

44
(**The number of new jobs- base and merge that can be added to this tree.
45
 * Each node has a weight associated to it and the
46
 * new jobs received are distributed across the tree based on this number. *)
47
module Weight = struct
48
  [%%versioned
49
  module Stable = struct
50
    [@@@no_toplevel_latest_type]
51

52
    module V1 = struct
53
      type t = { base : int; merge : int } [@@deriving sexp]
×
54

55
      let to_latest = Fn.id
56
    end
57
  end]
58

59
  type t = Stable.Latest.t = { base : int; merge : int } [@@deriving sexp, lens]
×
60
end
61

62
(**For base proofs (Proving new transactions)*)
63
module Base = struct
64
  module Record = struct
65
    [%%versioned
66
    module Stable = struct
67
      module V1 = struct
68
        type 'base t =
147✔
69
          { job : 'base
×
70
          ; seq_no : Sequence_number.Stable.V1.t
×
71
          ; status : Job_status.Stable.V1.t
×
72
          }
73
        [@@deriving sexp]
441✔
74
      end
75
    end]
76

77
    let map (t : 'a t) ~(f : 'a -> 'b) : 'b t = { t with job = f t.job }
×
78
  end
79

80
  module Job = struct
81
    [%%versioned
82
    module Stable = struct
83
      module V1 = struct
84
        type 'base t = Empty | Full of 'base Record.Stable.V1.t
×
85
        [@@deriving sexp]
441✔
86
      end
87
    end]
88

89
    let map (t : 'a t) ~(f : 'a -> 'b) : 'b t =
90
      match t with Empty -> Empty | Full r -> Full (Record.map r ~f)
×
91

92
    let job_str = function Empty -> "Base.Empty" | Full _ -> "Base.Full"
×
93
  end
94

95
  [%%versioned
96
  module Stable = struct
97
    module V1 = struct
98
      type 'base t = Weight.Stable.V1.t * 'base Job.Stable.V1.t
×
99
      [@@deriving sexp]
441✔
100
    end
101
  end]
102

103
  let map ((x, j) : 'a t) ~(f : 'a -> 'b) : 'b t = (x, Job.map j ~f)
×
104
end
105

106
(** For merge proofs: Merging two base proofs or two merge proofs*)
107
module Merge = struct
108
  module Record = struct
109
    [%%versioned
110
    module Stable = struct
111
      module V1 = struct
112
        type 'merge t =
147✔
113
          { left : 'merge
×
114
          ; right : 'merge
×
115
          ; seq_no : Sequence_number.Stable.V1.t
×
116
          ; status : Job_status.Stable.V1.t
×
117
          }
118
        [@@deriving sexp]
441✔
119
      end
120
    end]
121

122
    let map (t : 'a t) ~(f : 'a -> 'b) : 'b t =
123
      { t with left = f t.left; right = f t.right }
×
124
  end
125

126
  module Job = struct
127
    [%%versioned
128
    module Stable = struct
129
      module V1 = struct
130
        type 'merge t =
147✔
131
          | Empty
×
132
          | Part of 'merge (*When only the left component of the job is available since we always complete the jobs from left to right*)
×
133
          | Full of 'merge Record.Stable.V1.t
×
134
        [@@deriving sexp]
441✔
135
      end
136
    end]
137

138
    let map (t : 'a t) ~(f : 'a -> 'b) : 'b t =
139
      match t with
×
140
      | Empty ->
×
141
          Empty
142
      | Part x ->
×
143
          Part (f x)
×
144
      | Full r ->
×
145
          Full (Record.map r ~f)
×
146

147
    let job_str = function
148
      | Empty ->
×
149
          "Merge.Empty"
150
      | Full _ ->
×
151
          "Merge.Full"
152
      | Part _ ->
×
153
          "Merge.Part"
154
  end
155

156
  [%%versioned
157
  module Stable = struct
158
    module V1 = struct
159
      type 'merge t =
147✔
160
        (Weight.Stable.V1.t * Weight.Stable.V1.t) * 'merge Job.Stable.V1.t
×
161
      [@@deriving sexp]
441✔
162
    end
163
  end]
164

165
  let map ((x, j) : 'a t) ~(f : 'a -> 'b) : 'b t = (x, Job.map j ~f)
×
166
end
167

168
(**All the jobs on a tree that can be done. Base.Full and Merge.Full*)
169
module Available_job = struct
170
  type ('merge, 'base) t = Base of 'base | Merge of 'merge * 'merge
×
171
  [@@deriving sexp]
172
end
173

174
(**New jobs to be added (including new transactions or new merge jobs)*)
175
module Job = struct
176
  type ('merge, 'base) t = Base of 'base | Merge of 'merge [@@deriving sexp]
×
177
end
178

179
(**Space available and number of jobs required to enqueue data.
180
 first = space on the current tree and number of jobs required
181
 to be completed
182
 second = If the current-tree space is less than <max_base_jobs>
183
 then remaining number of slots on a new tree and the corresponding
184
 job count.*)
185
module Space_partition = struct
186
  [%%versioned
187
  module Stable = struct
188
    module V1 = struct
189
      type t = { first : int * int; second : (int * int) option }
×
190
      [@@deriving sexp]
735✔
191

192
      let to_latest = Fn.id
193
    end
194
  end]
195
end
196

197
(**View of a job for json output*)
198
module Job_view = struct
199
  module Extra = struct
200
    [%%versioned
201
    module Stable = struct
202
      module V1 = struct
203
        type t =
294✔
204
          { seq_no : Sequence_number.Stable.V1.t
×
205
          ; status : Job_status.Stable.V1.t
×
206
          }
207
        [@@deriving sexp]
735✔
208

209
        let to_latest = Fn.id
210
      end
211
    end]
212
  end
213

214
  module Node = struct
215
    [%%versioned
216
    module Stable = struct
217
      module V1 = struct
218
        type 'a t =
147✔
219
          | BEmpty
×
220
          | BFull of ('a * Extra.Stable.V1.t)
×
221
          | MEmpty
×
222
          | MPart of 'a
×
223
          | MFull of ('a * 'a * Extra.Stable.V1.t)
×
224
        [@@deriving sexp]
441✔
225
      end
226
    end]
227
  end
228

229
  [%%versioned
230
  module Stable = struct
231
    module V1 = struct
232
      type 'a t = { position : int; value : 'a Node.Stable.V1.t }
×
233
      [@@deriving sexp]
147✔
234
    end
235
  end]
236
end
237

238
module Hash = struct
239
  type t = Digestif.SHA256.t [@@deriving equal]
×
240
end
241

242
(**A single tree with number of leaves = max_base_jobs = 2**transaction_capacity_log_2 *)
243
module Tree = struct
244
  [%%versioned
245
  module Stable = struct
246
    module V1 = struct
247
      type ('merge_t, 'base_t) t =
147✔
248
        | Leaf of 'base_t
×
249
        | Node of
×
250
            { depth : int
×
251
            ; value : 'merge_t
×
252
            ; sub_tree : ('merge_t * 'merge_t, 'base_t * 'base_t) t
×
253
            }
254
      [@@deriving sexp]
441✔
255
    end
256
  end]
257

258
  (*Eg: Tree depth = 3
259

260
    Node M
261
    |
262
    Node (M,M)
263
    |
264
    Node ((M,M),(M,M))
265
    |
266
    Leaf (((B,B),(B,B)),((B,B),(B,B)))
267
  *)
268

269
  (*mapi where i is the level of the tree*)
270
  let rec map_depth :
271
      type a_merge b_merge c_base d_base.
272
         f_merge:(int -> a_merge -> b_merge)
273
      -> f_base:(c_base -> d_base)
274
      -> (a_merge, c_base) t
275
      -> (b_merge, d_base) t =
276
   fun ~f_merge ~f_base tree ->
277
    match tree with
3,040✔
278
    | Leaf d ->
760✔
279
        Leaf (f_base d)
760✔
280
    | Node { depth; value; sub_tree } ->
2,280✔
281
        Node
282
          { depth
283
          ; value = f_merge depth value
2,280✔
284
          ; sub_tree =
285
              map_depth
2,280✔
286
                ~f_merge:(fun i (x, y) -> (f_merge i x, f_merge i y))
3,040✔
287
                ~f_base:(fun (x, y) -> (f_base x, f_base y))
5,320✔
288
                sub_tree
289
          }
290

291
  let map :
292
      type a_merge b_merge c_base d_base.
293
         f_merge:(a_merge -> b_merge)
294
      -> f_base:(c_base -> d_base)
295
      -> (a_merge, c_base) t
296
      -> (b_merge, d_base) t =
297
   fun ~f_merge ~f_base tree ->
298
    map_depth tree ~f_base ~f_merge:(fun _ -> f_merge)
760✔
299

300
  (* foldi where i is the cur_level*)
301
  module Make_foldable (M : Monad.S) = struct
302
    let rec fold_depth_until' :
303
        type merge_t accum base_t final.
304
           f_merge:
305
             (int -> accum -> merge_t -> (accum, final) Continue_or_stop.t M.t)
306
        -> f_base:(accum -> base_t -> (accum, final) Continue_or_stop.t M.t)
307
        -> init:accum
308
        -> (merge_t, base_t) t
309
        -> (accum, final) Continue_or_stop.t M.t =
310
     fun ~f_merge ~f_base ~init:acc t ->
311
      let open Container.Continue_or_stop in
28,720✔
312
      let open M.Let_syntax in
313
      match t with
314
      | Leaf d ->
7,180✔
315
          f_base acc d
316
      | Node { depth; value; sub_tree } -> (
21,540✔
317
          match%bind f_merge depth acc value with
21,540✔
318
          | Continue acc' ->
21,540✔
319
              fold_depth_until'
320
                ~f_merge:(fun i acc (x, y) ->
321
                  match%bind f_merge i acc x with
28,720✔
322
                  | Continue r ->
28,720✔
323
                      f_merge i r y
324
                  | x ->
×
325
                      M.return x )
326
                ~f_base:(fun acc (x, y) ->
327
                  match%bind f_base acc x with
50,260✔
328
                  | Continue r ->
50,260✔
329
                      f_base r y
330
                  | x ->
×
331
                      M.return x )
332
                ~init:acc' sub_tree
333
          | x ->
×
334
              M.return x )
335

336
    let fold_depth_until :
337
        type merge_t base_t accum final.
338
           f_merge:
339
             (int -> accum -> merge_t -> (accum, final) Continue_or_stop.t M.t)
340
        -> f_base:(accum -> base_t -> (accum, final) Continue_or_stop.t M.t)
341
        -> init:accum
342
        -> finish:(accum -> final M.t)
343
        -> (merge_t, base_t) t
344
        -> final M.t =
345
     fun ~f_merge ~f_base ~init ~finish t ->
346
      let open M.Let_syntax in
6,900✔
347
      match%bind fold_depth_until' ~f_merge ~f_base ~init t with
6,900✔
348
      | Continue result ->
6,900✔
349
          finish result
350
      | Stop e ->
×
351
          M.return e
352
  end
353

354
  module Foldable_ident = Make_foldable (Monad.Ident)
355

356
  let fold_depth :
357
      type merge_t base_t accum.
358
         f_merge:(int -> accum -> merge_t -> accum)
359
      -> f_base:(accum -> base_t -> accum)
360
      -> init:accum
361
      -> (merge_t, base_t) t
362
      -> accum =
363
   fun ~f_merge ~f_base ~init t ->
364
    Foldable_ident.fold_depth_until
6,900✔
365
      ~f_merge:(fun i acc a -> Continue (f_merge i acc a))
48,300✔
366
      ~f_base:(fun acc d -> Continue (f_base acc d))
55,200✔
367
      ~init ~finish:Fn.id t
368

369
  let fold :
370
      type merge_t base_t accum.
371
         f_merge:(accum -> merge_t -> accum)
372
      -> f_base:(accum -> base_t -> accum)
373
      -> init:accum
374
      -> (merge_t, base_t) t
375
      -> accum =
376
   fun ~f_merge ~f_base ~init t ->
377
    fold_depth t ~init ~f_merge:(fun _ -> f_merge) ~f_base
760✔
378

379
  let _fold_until :
380
      type merge_t base_t accum final.
381
         f_merge:(accum -> merge_t -> (accum, final) Continue_or_stop.t)
382
      -> f_base:(accum -> base_t -> (accum, final) Continue_or_stop.t)
383
      -> init:accum
384
      -> finish:(accum -> final)
385
      -> (merge_t, base_t) t
386
      -> final =
387
   fun ~f_merge ~f_base ~init ~finish t ->
388
    Foldable_ident.fold_depth_until
×
389
      ~f_merge:(fun _ -> f_merge)
×
390
      ~f_base ~init ~finish t
391

392
  (*
393
    result -> final proof
394
    f_merge, f_base are to update the nodes with new jobs and mark old jobs as "Done"*)
395
  let rec update_split :
396
      type merge_t base_t data weight result.
397
         f_merge:(data -> int -> merge_t -> (merge_t * result option) Or_error.t)
398
      -> f_base:(data -> base_t -> base_t Or_error.t)
399
      -> weight_merge:(merge_t -> weight * weight)
400
      -> jobs:data
401
      -> update_level:int
402
      -> jobs_split:(weight * weight -> data -> data * data)
403
      -> (merge_t, base_t) t
404
      -> ((merge_t, base_t) t * result option) Or_error.t =
405
   fun ~f_merge ~f_base ~weight_merge ~jobs ~update_level ~jobs_split t ->
406
    let open Or_error.Let_syntax in
640✔
407
    match t with
408
    | Leaf d ->
160✔
409
        let%map updated = f_base jobs d in
160✔
410
        (Leaf updated, None)
160✔
411
    | Node { depth; value; sub_tree } ->
480✔
412
        let weight_left_subtree, weight_right_subtree = weight_merge value in
413
        (*update the jobs at the current level*)
414
        let%bind value', scan_result = f_merge jobs depth value in
480✔
415
        (*get the updated subtree*)
416
        let%map sub, _ =
417
          if update_level = depth then Ok (sub_tree, None)
×
418
          else
419
            (*split the jobs for the next level*)
420
            let new_jobs_list =
480✔
421
              jobs_split (weight_left_subtree, weight_right_subtree) jobs
422
            in
423
            update_split
480✔
424
              ~f_merge:(fun (b, b') i (x, y) ->
425
                let%bind left = f_merge b i x in
640✔
426
                let%map right = f_merge b' i y in
640✔
427
                ((fst left, fst right), Option.both (snd left) (snd right)) )
640✔
428
              ~f_base:(fun (b, b') (x, x') ->
429
                let%bind left = f_base b x in
1,120✔
430
                let%map right = f_base b' x' in
1,120✔
431
                (left, right) )
1,120✔
432
              ~weight_merge:(fun (a, b) -> (weight_merge a, weight_merge b))
640✔
433
              ~update_level
434
              ~jobs_split:(fun (x, y) (a, b) -> (jobs_split x a, jobs_split y b))
640✔
435
              ~jobs:new_jobs_list sub_tree
436
        in
437
        (Node { depth; value = value'; sub_tree = sub }, scan_result)
480✔
438

439
  let rec update_accumulate :
440
      type merge_t base_t data.
441
         f_merge:(data * data -> merge_t -> merge_t * data)
442
      -> f_base:(base_t -> base_t * data)
443
      -> (merge_t, base_t) t
444
      -> (merge_t, base_t) t * data =
445
   fun ~f_merge ~f_base t ->
446
    let transpose ((x1, y1), (x2, y2)) = ((x1, x2), (y1, y2)) in
640✔
447
    match t with
448
    | Leaf d ->
160✔
449
        let new_base, count_list = f_base d in
450
        (Leaf new_base, count_list)
160✔
451
    | Node { depth; value; sub_tree } ->
480✔
452
        (*get the updated subtree*)
453
        let sub, counts =
454
          update_accumulate
455
            ~f_merge:(fun (b1, b2) (x, y) ->
456
              transpose (f_merge b1 x, f_merge b2 y) )
640✔
457
            ~f_base:(fun (x, y) -> transpose (f_base x, f_base y))
1,120✔
458
            sub_tree
459
        in
460
        let value', count_list = f_merge counts value in
480✔
461
        (Node { depth; value = value'; sub_tree = sub }, count_list)
480✔
462

463
  let update :
464
         ('merge_job, 'base_job) Job.t list
465
      -> update_level:int
466
      -> sequence_no:int
467
      -> weight_lens:(Weight.t, int) Lens.t
468
      -> ('merge_t, 'base_t) t
469
      -> (('merge_t, 'base_t) t * 'b option) Or_error.t =
470
   fun completed_jobs ~update_level ~sequence_no:seq_no ~weight_lens tree ->
471
    let open Or_error.Let_syntax in
160✔
472
    let add_merges (jobs : ('b, 'c) Job.t list) cur_level
473
        (((w1, w2) as weight), m) =
474
      let left, right = (weight_lens.get w1, weight_lens.get w2) in
1,120✔
475
      if cur_level = update_level - 1 then
476
        (*Create new jobs from the completed ones*)
477
        let%map new_weight, m' =
478
          match (jobs, m) with
479
          | [], _ ->
80✔
480
              Ok (weight, m)
481
          | [ Job.Merge a; Merge b ], Merge.Job.Empty ->
×
482
              Ok
483
                ( (weight_lens.set (left - 1) w1, weight_lens.set (right - 1) w2)
×
484
                , Full { left = a; right = b; seq_no; status = Job_status.Todo }
485
                )
486
          | [ Merge a ], Empty ->
×
487
              Ok
488
                ( (weight_lens.set (left - 1) w1, weight_lens.set right w2)
×
489
                , Part a )
490
          | [ Merge b ], Part a ->
×
491
              Ok
492
                ( (weight_lens.set left w1, weight_lens.set (right - 1) w2)
×
493
                , Full { left = a; right = b; seq_no; status = Job_status.Todo }
494
                )
495
          | [ Base _ ], Empty ->
×
496
              (*Depending on whether this is the first or second of the two base jobs*)
497
              let weight =
498
                if left = 0 then
499
                  (weight_lens.set left w1, weight_lens.set (right - 1) w2)
×
500
                else (weight_lens.set (left - 1) w1, weight_lens.set right w2)
×
501
              in
502
              Ok (weight, m)
503
          | [ Base _; Base _ ], Empty ->
560✔
504
              Ok
505
                ( (weight_lens.set (left - 1) w1, weight_lens.set (right - 1) w2)
560✔
506
                , m )
507
          | xs, m ->
×
508
              Or_error.errorf
×
509
                "Got %d jobs when updating level %d and when one of the merge \
510
                 nodes at level %d is %s"
511
                (List.length xs) update_level cur_level (Merge.Job.job_str m)
×
512
        in
513
        ((new_weight, m'), None)
640✔
514
      else if cur_level = update_level then
480✔
515
        (*Mark completed jobs as Done*)
516
        match (jobs, m) with
×
517
        | [ Merge a ], Full ({ status = Job_status.Todo; _ } as x) ->
×
518
            let new_job = Merge.Job.Full { x with status = Job_status.Done } in
519
            let scan_result, weight' =
520
              if cur_level = 0 then
521
                (Some a, (weight_lens.set 0 w1, weight_lens.set 0 w2))
×
522
              else (None, weight)
×
523
            in
524
            Ok ((weight', new_job), scan_result)
525
        | [], m ->
×
526
            Ok ((weight, m), None)
527
        | xs, m ->
×
528
            Or_error.errorf
529
              "Got %d jobs when updating level %d and when one of the merge \
530
               nodes at level %d is %s"
531
              (List.length xs) update_level cur_level (Merge.Job.job_str m)
×
532
      else if cur_level < update_level - 1 then
480✔
533
        (*Update the job count for all the level above*)
534
        match jobs with
480✔
535
        | [] ->
40✔
536
            Ok ((weight, m), None)
537
        | _ ->
440✔
538
            let jobs_sent_left = min (List.length jobs) left in
440✔
539
            let jobs_sent_right =
440✔
540
              min (List.length jobs - jobs_sent_left) right
440✔
541
            in
542
            let new_weight =
440✔
543
              ( weight_lens.set (left - jobs_sent_left) w1
440✔
544
              , weight_lens.set (right - jobs_sent_right) w2 )
440✔
545
            in
546
            Ok ((new_weight, m), None)
547
      else Ok ((weight, m), None)
×
548
    in
549
    let add_bases jobs (w, d) =
550
      let weight = weight_lens.get w in
1,280✔
551
      match (jobs, d) with
1,280✔
552
      | [], _ ->
160✔
553
          Ok (w, d)
554
      | [ Job.Base d ], Base.Job.Empty ->
1,120✔
555
          Ok
556
            ( weight_lens.set (weight - 1) w
1,120✔
557
            , Base.Job.Full { job = d; seq_no; status = Job_status.Todo } )
558
      | [ Job.Merge _ ], Full b ->
×
559
          Ok (w, Full { b with status = Job_status.Done })
560
      | xs, _ ->
×
561
          Or_error.errorf
562
            "Got %d jobs when updating level %d and when one of the base nodes \
563
             is %s"
564
            (List.length xs) update_level (Base.Job.job_str d)
×
565
    in
566
    let jobs = completed_jobs in
567
    update_split ~f_merge:add_merges ~f_base:add_bases tree ~weight_merge:fst
568
      ~jobs ~update_level ~jobs_split:(fun (w1, w2) a ->
569
        let l = weight_lens.get w1 in
1,120✔
570
        let r = weight_lens.get w2 in
1,120✔
571
        (List.take a l, List.take (List.drop a l) r) )
1,120✔
572

573
  let reset_weights :
574
         [ `Base | `Merge | `Both ]
575
      -> ('merge_t, 'base_t) t
576
      -> ('merge_t, 'base_t) t =
577
   fun weight_type tree ->
578
    let set_all_zero weight = Weight.base.set 0 (Weight.merge.set 0 weight) in
160✔
579
    let f_base base =
580
      let set_one (lens : (Weight.t, int) Lens.t) weight = lens.set 1 weight in
1,120✔
581
      let set_zero (lens : (Weight.t, int) Lens.t) weight = lens.set 0 weight in
1,440✔
582
      let update_merge_weight weight =
583
        (*When updating the merge-weight of base nodes, only the nodes with
584
          "Todo" status needs to be included*)
585
        match snd base with
1,280✔
586
        | Base.Job.Full { status = Job_status.Todo; _ } ->
1,120✔
587
            set_one Weight.merge weight
588
        | _ ->
160✔
589
            set_zero Weight.merge weight
590
      in
591
      let update_base_weight weight =
592
        (*When updating the base-weight of base nodes, only the Empty nodes
593
          need to be included*)
594
        match snd base with
960✔
595
        | Base.Job.Empty ->
×
596
            set_one Weight.base weight
597
        | _ ->
960✔
598
            set_zero Weight.base weight
599
      in
600
      let new_weight, dummy_right_for_base_nodes =
601
        match weight_type with
602
        | `Merge ->
320✔
603
            (update_merge_weight (fst base), set_zero Weight.merge (fst base))
320✔
604
        | `Base ->
×
605
            (update_base_weight (fst base), set_zero Weight.base (fst base))
×
606
        | `Both ->
960✔
607
            let w' = update_base_weight (fst base) in
960✔
608
            (update_merge_weight w', set_all_zero w')
960✔
609
      in
610
      ((new_weight, snd base), (new_weight, dummy_right_for_base_nodes))
1,280✔
611
    in
612
    let f_merge lst m =
613
      let (w1, w2), (w3, w4) = lst in
1,120✔
614
      let reset (lens : (Weight.t, int) Lens.t) (w, w') =
615
        (* Weights of all other jobs is sum of weights of its children*)
616
        ( lens.set (lens.get w1 + lens.get w2) w
1,960✔
617
        , lens.set (lens.get w3 + lens.get w4) w' )
1,960✔
618
      in
619
      let w' =
620
        match weight_type with
621
        | `Merge -> (
280✔
622
            (*When updating the merge-weight of merge nodes, only the nodes
623
              with "Todo" status needs to be included*)
624
            let lens = Weight.merge in
625
            match m with
626
            | (w1', w2'), Merge.Job.Full { status = Job_status.Todo; _ } ->
×
627
                (lens.set 1 w1', lens.set 0 w2')
×
628
            | w, _ ->
280✔
629
                reset lens w )
280✔
630
        | `Base ->
×
631
            (* The base-weight of merge nodes is the sum of weights of its
632
               children*)
633
            reset Weight.base (fst m)
×
634
        | `Both ->
840✔
635
            reset Weight.merge (reset Weight.base (fst m))
840✔
636
      in
637
      ((w', snd m), w')
1,120✔
638
    in
639
    fst (update_accumulate ~f_merge ~f_base tree)
160✔
640

641
  let jobs_on_level :
642
         depth:int
643
      -> level:int
644
      -> ('merge_t, 'base_t) t
645
      -> ('merge_job, 'base_job) Available_job.t list =
646
   fun ~depth ~level tree ->
647
    fold_depth ~init:[]
5,380✔
648
      ~f_merge:(fun i acc a ->
649
        match (i = level, a) with
37,660✔
650
        | true, (_weight, Merge.Job.Full { left; right; status = Todo; _ }) ->
×
651
            Available_job.Merge (left, right) :: acc
652
        | _ ->
37,660✔
653
            acc )
654
      ~f_base:(fun acc d ->
655
        match (level = depth, d) with
43,040✔
656
        | true, (_weight, Base.Job.Full { job; status = Todo; _ }) ->
36,240✔
657
            Available_job.Base job :: acc
658
        | _ ->
6,800✔
659
            acc )
660
      tree
661
    |> List.rev
662

663
  let to_hashable_jobs :
664
      ('merge_t, 'base_t) t -> ('merge_job, 'base_job) Job.t list =
665
   fun tree ->
666
    fold ~init:[]
760✔
667
      ~f_merge:(fun acc a ->
668
        match a with
5,320✔
669
        | _, Merge.Job.Full { status = Job_status.Done; _ } ->
×
670
            acc
671
        | _ ->
5,320✔
672
            Job.Merge a :: acc )
673
      ~f_base:(fun acc d ->
674
        match d with
6,080✔
675
        | _, Base.Job.Full { status = Job_status.Done; _ } ->
×
676
            acc
677
        | _ ->
6,080✔
678
            Job.Base d :: acc )
679
      tree
680
    |> List.rev
681

682
  let jobs_records : ('merge_t, 'base_t) t -> ('merge_job, 'base_job) Job.t list
683
      =
684
   fun tree ->
685
    fold ~init:[]
×
686
      ~f_merge:(fun acc a ->
687
        match a with
×
688
        | _weight, Merge.Job.Full x ->
×
689
            Job.Merge x :: acc
690
        | _ ->
×
691
            acc )
692
      ~f_base:(fun acc d ->
693
        match d with _weight, Base.Job.Full j -> Job.Base j :: acc | _ -> acc )
×
694
      tree
695
    |> List.rev
696

697
  let base_jobs : ('merge_t, _ * 'base_job Base.Job.t) t -> 'base_job list =
698
   fun tree ->
699
    fold_depth ~init:[]
20✔
700
      ~f_merge:(fun _ _ _ -> [])
140✔
701
      ~f_base:(fun acc d ->
702
        match d with _, Base.Job.Full { job; _ } -> job :: acc | _ -> acc )
×
703
      tree
704
    |> List.rev
705

706
  (*calculates the number of base and merge jobs that is currently with the Todo status*)
707
  let todo_job_count :
708
      (_ * 'merge_job Merge.Job.t, _ * 'base_job Base.Job.t) t -> int * int =
709
   fun tree ->
710
    fold_depth ~init:(0, 0)
740✔
711
      ~f_merge:(fun _ (b, m) (_, j) ->
712
        match j with
5,180✔
713
        | Merge.Job.Full { status = Job_status.Todo; _ } ->
×
714
            (b, m + 1)
715
        | _ ->
5,180✔
716
            (b, m) )
717
      ~f_base:(fun (b, m) (_, d) ->
718
        match d with
5,920✔
719
        | Base.Job.Full { status = Job_status.Todo; _ } ->
4,880✔
720
            (b + 1, m)
721
        | _ ->
1,040✔
722
            (b, m) )
723
      tree
724

725
  let leaves : ('merge_t, 'base_t) t -> 'base_t list =
726
   fun tree ->
727
    fold_depth ~init:[]
×
728
      ~f_merge:(fun _ _ _ -> [])
×
729
      ~f_base:(fun acc d ->
730
        match d with _, Base.Job.Full _ -> d :: acc | _ -> acc )
×
731
      tree
732
    |> List.rev
733

734
  let rec _view_tree :
735
      type merge_t base_t.
736
         (merge_t, base_t) t
737
      -> show_merge:(merge_t -> string)
738
      -> show_base:(base_t -> string)
739
      -> string =
740
   fun tree ~show_merge ~show_base ->
741
    match tree with
×
742
    | Leaf d ->
×
743
        sprintf !"Leaf %s\n" (show_base d)
×
744
    | Node { value; sub_tree; _ } ->
×
745
        let curr = sprintf !"Node %s\n" (show_merge value) in
×
746
        let subtree =
×
747
          _view_tree sub_tree
748
            ~show_merge:(fun (x, y) ->
749
              sprintf !"%s  %s" (show_merge x) (show_merge y) )
×
750
            ~show_base:(fun (x, y) ->
751
              sprintf !"%s  %s" (show_base x) (show_base y) )
×
752
        in
753
        curr ^ subtree
×
754

755
  let required_job_count = function
756
    | Node { value = (w1, w2), _; _ } ->
×
757
        Weight.merge.get w1 + Weight.merge.get w2
×
758
    | Leaf (w, _) ->
×
759
        Weight.merge.get w
760

761
  let available_space = function
762
    | Node { value = (w1, w2), _; _ } ->
3,700✔
763
        Weight.base.get w1 + Weight.base.get w2
3,700✔
764
    | Leaf (w, _) ->
×
765
        Weight.base.get w
766

767
  let view_jobs_with_position (tree : ('a, 'd) t) fa fd : 'c Job_view.t list =
768
    let f_merge acc a =
×
769
      let view =
×
770
        match snd a with
771
        | Merge.Job.Empty ->
×
772
            Job_view.Node.MEmpty
773
        | Part a ->
×
774
            MPart (fa a)
×
775
        | Full { left; right; seq_no; status } ->
×
776
            MFull (fa left, fa right, { Job_view.Extra.status; seq_no })
×
777
      in
778
      view :: acc
779
    in
780
    let f_base acc a =
781
      let view =
×
782
        match snd a with
783
        | Base.Job.Empty ->
×
784
            Job_view.Node.BEmpty
785
        | Full { seq_no; status; job } ->
×
786
            BFull (fd job, { seq_no; status })
×
787
      in
788
      view :: acc
789
    in
790
    let lst = fold ~f_merge ~f_base ~init:[] tree in
791
    let len = List.length lst - 1 in
×
792
    List.rev_mapi lst ~f:(fun i value -> { Job_view.position = len - i; value })
×
793
end
794

795
(*This structure works well because we always complete all the nodes on a specific level before proceeding to the next level*)
796
module T = struct
797
  module Binable_arg = struct
798
    [%%versioned
799
    module Stable = struct
800
      [@@@no_toplevel_latest_type]
801

802
      module V1 = struct
803
        type ('merge, 'base) t =
147✔
804
          { trees :
805
              ( 'merge Merge.Stable.V1.t
806
              , 'base Base.Stable.V1.t )
807
              Tree.Stable.V1.t
808
              Mina_stdlib.Nonempty_list.Stable.V1.t
809
          ; acc : ('merge * 'base list) option
810
          ; curr_job_seq_no : int
811
          ; max_base_jobs : int
812
          ; delay : int
813
          }
441✔
814
      end
815
    end]
816
  end
817

818
  [%%versioned_binable
819
  module Stable = struct
820
    module V1 = struct
821
      type ('merge, 'base) t = ('merge, 'base) Binable_arg.Stable.V1.t =
×
822
        { trees :
×
823
            ('merge Merge.Stable.V1.t, 'base Base.Stable.V1.t) Tree.Stable.V1.t
824
            Mina_stdlib.Nonempty_list.Stable.V1.t
825
              (*use non empty list*)
826
        ; acc : ('merge * 'base list) option
×
827
              (*last emitted proof and the corresponding transactions*)
828
        ; curr_job_seq_no : int
×
829
              (*Sequence number for the jobs added every block*)
830
        ; max_base_jobs : int (*transaction_capacity_log_2*)
×
831
        ; delay : int
×
832
        }
833
      [@@deriving sexp]
834

835
      (* Delete all the completed jobs because
836
         1. They are completed
837
         2. They are not required to create new jobs anymore
838
         3. We are not exposing these jobs for any sort of computation as of now
839
      *)
840
      let with_leaner_trees ({ trees; _ } as t) =
841
        let trees =
220✔
842
          Mina_stdlib.Nonempty_list.map trees ~f:(fun tree ->
843
              Tree.map tree
760✔
844
                ~f_merge:(fun merge_node ->
845
                  match snd merge_node with
5,320✔
846
                  | Merge.Job.Full { status = Job_status.Done; _ } ->
×
847
                      (fst merge_node, Merge.Job.Empty)
×
848
                  | _ ->
5,320✔
849
                      merge_node )
850
                ~f_base:Fn.id )
851
        in
852
        { t with trees }
220✔
853

854
      include
855
        Binable.Of_binable2_without_uuid
856
          (Binable_arg.Stable.V1)
857
          (struct
858
            type nonrec ('merge, 'base) t = ('merge, 'base) t
859

860
            let to_binable = with_leaner_trees
861

862
            let of_binable = Fn.id
863
          end)
864
    end
865
  end]
866

867
  [%%define_locally Stable.Latest.(with_leaner_trees)]
868

869
  let create_tree_for_level ~level ~depth ~merge_job ~base_job =
870
    let rec go :
1,940✔
871
        type merge_t base_t.
872
        int -> (int -> merge_t) -> base_t -> (merge_t, base_t) Tree.t =
873
     fun d fmerge base ->
874
      if d >= depth then Tree.Leaf base
1,940✔
875
      else
876
        let sub_tree =
5,820✔
877
          go (d + 1) (fun i -> (fmerge i, fmerge i)) (base, base)
7,760✔
878
        in
879
        Node { depth = d; value = fmerge d; sub_tree }
5,820✔
880
    in
881
    let weight base merge = { Weight.base; merge } in
29,100✔
882
    let base_weight = if level = -1 then weight 0 0 else weight 1 0 in
×
883
    go 0
884
      (fun d ->
885
        let weight =
13,580✔
886
          if level = -1 then (weight 0 0, weight 0 0)
×
887
          else
888
            let x = Int.pow 2 level / Int.pow 2 (d + 1) in
13,580✔
889
            (weight x 0, weight x 0)
13,580✔
890
        in
891
        (weight, merge_job) )
892
      (base_weight, base_job)
893

894
  let create_tree ~depth =
895
    create_tree_for_level ~level:depth ~depth ~merge_job:Merge.Job.Empty
1,940✔
896
      ~base_job:Base.Job.Empty
897

898
  let empty : type merge base. max_base_jobs:int -> delay:int -> (merge, base) t
899
      =
900
   fun ~max_base_jobs ~delay ->
901
    let depth = Int.ceil_log2 max_base_jobs in
20✔
902
    let first_tree :
20✔
903
        ( (Weight.t * Weight.t) * merge Merge.Job.t
904
        , Weight.t * base Base.Job.t )
905
        Tree.t =
906
      create_tree ~depth
907
    in
908
    { trees = Mina_stdlib.Nonempty_list.singleton first_tree
20✔
909
    ; acc = None
910
    ; curr_job_seq_no = 0
911
    ; max_base_jobs
912
    ; delay
913
    }
914
end
915

916
module State = struct
917
  include T
918
  module Hash = Hash
919

920
  let map (type a1 a2 b1 b2) (t : (a1, a2) t) ~(f1 : a1 -> b1) ~(f2 : a2 -> b2)
921
      : (b1, b2) t =
922
    { t with
×
923
      trees =
924
        Mina_stdlib.Nonempty_list.map t.trees
×
925
          ~f:
926
            (Tree.map_depth
927
               ~f_merge:(fun _ -> Merge.map ~f:f1)
×
928
               ~f_base:(Base.map ~f:f2) )
929
    ; acc = Option.map t.acc ~f:(fun (m, bs) -> (f1 m, List.map bs ~f:f2))
×
930
    }
931

932
  let hash t f_merge f_base =
933
    let { trees; acc; max_base_jobs; curr_job_seq_no; delay; _ } =
220✔
934
      with_leaner_trees t
935
    in
936
    let h = ref (Digestif.SHA256.init ()) in
220✔
937
    let add_string s = h := Digestif.SHA256.feed_string !h s in
60,360✔
938
    let () =
939
      let tree_hash tree f_merge f_base =
940
        List.iter (Tree.to_hashable_jobs tree) ~f:(fun job ->
760✔
941
            match job with Job.Merge a -> f_merge a | Base d -> f_base d )
5,320✔
942
      in
943
      Mina_stdlib.Nonempty_list.iter trees ~f:(fun tree ->
220✔
944
          let add_weight_to_hash { Weight.base = b; merge = m } =
760✔
945
            add_string @@ Int.to_string b ;
16,720✔
946
            add_string @@ Int.to_string m
16,720✔
947
          in
948
          let add_weight_pair_to_hash (w1, w2) =
949
            add_weight_to_hash w1 ; add_weight_to_hash w2
5,320✔
950
          in
951
          let f_merge = function
952
            | w, Merge.Job.Empty ->
5,320✔
953
                add_weight_pair_to_hash w ; add_string "Empty"
5,320✔
954
            | w, Merge.Job.Full { left; right; status; seq_no } ->
×
955
                add_weight_pair_to_hash w ;
956
                add_string "Full" ;
×
957
                add_string @@ Int.to_string seq_no ;
×
958
                add_string @@ Job_status.to_string status ;
×
959
                add_string (f_merge left) ;
×
960
                add_string (f_merge right)
×
961
            | w, Merge.Job.Part j ->
×
962
                add_weight_pair_to_hash w ;
963
                add_string "Part" ;
×
964
                add_string (f_merge j)
×
965
          in
966
          let f_base = function
967
            | w, Base.Job.Empty ->
1,200✔
968
                add_weight_to_hash w ; add_string "Empty"
1,200✔
969
            | w, Base.Job.Full { job; status; seq_no } ->
4,880✔
970
                add_weight_to_hash w ;
971
                add_string "Full" ;
4,880✔
972
                add_string @@ Int.to_string seq_no ;
4,880✔
973
                add_string @@ Job_status.to_string status ;
4,880✔
974
                add_string (f_base job)
4,880✔
975
          in
976
          tree_hash tree f_merge f_base )
977
    in
978
    ( match acc with
979
    | Some (a, d_lst) ->
×
980
        add_string (f_merge a) ;
×
981
        List.iter d_lst ~f:(fun d -> add_string (f_base d))
×
982
    | None ->
220✔
983
        add_string "None" ) ;
220✔
984
    add_string (Int.to_string curr_job_seq_no) ;
220✔
985
    add_string (Int.to_string max_base_jobs) ;
220✔
986
    add_string (Int.to_string delay) ;
220✔
987
    Digestif.SHA256.get !h
220✔
988

989
  module Make_foldable (M : Monad.S) = struct
990
    module Tree_foldable = Tree.Make_foldable (M)
991

992
    let fold_chronological_until :
993
           ('merge, 'base) t
994
        -> init:'acc
995
        -> f_merge:
996
             ('acc -> 'merge Merge.t -> ('acc, 'final) Continue_or_stop.t M.t)
997
        -> f_base:('acc -> 'base Base.t -> ('acc, 'final) Continue_or_stop.t M.t)
998
        -> finish:('acc -> 'final M.t)
999
        -> 'final M.t =
1000
     fun t ~init ~f_merge ~f_base ~finish ->
1001
      let open M.Let_syntax in
100✔
1002
      let open Container.Continue_or_stop in
1003
      let work_trees =
1004
        Mina_stdlib.Nonempty_list.rev t.trees
100✔
1005
        |> Mina_stdlib.Nonempty_list.to_list
1006
      in
1007
      let rec go acc = function
100✔
1008
        | [] ->
100✔
1009
            M.return (Continue acc)
1010
        | tree :: trees -> (
280✔
1011
            match%bind
1012
              Tree_foldable.fold_depth_until'
280✔
1013
                ~f_merge:(fun _ -> f_merge)
1,960✔
1014
                ~f_base ~init:acc tree
1015
            with
1016
            | Continue r ->
280✔
1017
                go r trees
1018
            | Stop e ->
×
1019
                M.return (Stop e) )
1020
      in
1021
      match%bind go init work_trees with
100✔
1022
      | Continue r ->
100✔
1023
          finish r
1024
      | Stop e ->
×
1025
          M.return e
1026
  end
1027

1028
  module Foldable_ident = Make_foldable (Monad.Ident)
1029

1030
  let fold_chronological t ~init ~f_merge ~f_base =
1031
    let open Container.Continue_or_stop in
×
1032
    Foldable_ident.fold_chronological_until t ~init
1033
      ~f_merge:(fun acc a -> Continue (f_merge acc a))
×
1034
      ~f_base:(fun acc d -> Continue (f_base acc d))
×
1035
      ~finish:Fn.id
1036
end
1037

1038
include T
1039
module State_or_error = Mina_stdlib.State_or_error.Make3 (T)
1040

1041
let check b ~message = State_or_error.error_if b ~message ~value:()
1,360✔
1042

1043
let return_error e a =
1044
  State_or_error.error_if true ~message:(Error.to_string_hum e) ~value:a
×
1045

1046
let max_trees : ('merge, 'base) t -> int =
1047
 fun t -> ((Int.ceil_log2 t.max_base_jobs + 1) * (t.delay + 1)) + 1
800✔
1048

1049
let work_to_do :
1050
    type merge base.
1051
       (merge Merge.t, base Base.t) Tree.t list
1052
    -> max_base_jobs:int
1053
    -> (merge, base) Available_job.t list =
1054
 fun trees ~max_base_jobs ->
1055
  let depth = Int.ceil_log2 max_base_jobs in
6,160✔
1056
  List.concat_mapi trees ~f:(fun i tree ->
6,160✔
1057
      Tree.jobs_on_level ~depth ~level:(depth - i) tree )
5,380✔
1058

1059
let work :
1060
    type merge base.
1061
       (merge Merge.t, base Base.t) Tree.t list
1062
    -> delay:int
1063
    -> max_base_jobs:int
1064
    -> (merge, base) Available_job.t list =
1065
 fun trees ~delay ~max_base_jobs ->
1066
  let depth = Int.ceil_log2 max_base_jobs in
6,160✔
1067
  let work_trees =
6,160✔
1068
    List.take
1069
      (List.filteri trees ~f:(fun i _ -> i % delay = delay - 1))
6,160✔
1070
      (depth + 1)
1071
  in
1072
  work_to_do work_trees ~max_base_jobs
6,160✔
1073

1074
let work_for_tree t ~data_tree =
1075
  let delay = t.delay + 1 in
4,400✔
1076
  let trees =
1077
    match data_tree with
1078
    | `Current ->
3,400✔
1079
        Mina_stdlib.Nonempty_list.tail t.trees
3,400✔
1080
    | `Next ->
1,000✔
1081
        Mina_stdlib.Nonempty_list.to_list t.trees
1,000✔
1082
  in
1083
  work trees ~max_base_jobs:t.max_base_jobs ~delay
1084

1085
(*work on all the level and all the trees*)
1086
let all_work :
1087
    type merge base. (merge, base) t -> (merge, base) Available_job.t list list
1088
    =
1089
 fun t ->
1090
  let depth = Int.ceil_log2 t.max_base_jobs in
600✔
1091
  let set1 = work_for_tree t ~data_tree:`Current in
600✔
1092
  let _, other_sets =
600✔
1093
    List.fold ~init:(t, [])
1094
      (List.init ~f:Fn.id (t.delay + 1))
600✔
1095
      ~f:(fun (t, work_list) _ ->
1096
        let trees' =
1,800✔
1097
          Mina_stdlib.Nonempty_list.cons (create_tree ~depth) t.trees
1098
        in
1099
        let t' = { t with trees = trees' } in
1,800✔
1100
        match work_for_tree t' ~data_tree:`Current with
1101
        | [] ->
360✔
1102
            (t', work_list)
1103
        | work ->
1,440✔
1104
            (t', work :: work_list) )
1105
  in
1106
  if List.is_empty set1 then List.rev other_sets
160✔
1107
  else set1 :: List.rev other_sets
440✔
1108

1109
let work_for_next_update :
1110
    type merge base.
1111
    (merge, base) t -> data_count:int -> (merge, base) Available_job.t list list
1112
    =
1113
 fun t ~data_count ->
1114
  let delay = t.delay + 1 in
1,000✔
1115
  let current_tree_space =
1116
    Tree.available_space (Mina_stdlib.Nonempty_list.head t.trees)
1,000✔
1117
  in
1118
  let set1 =
1,000✔
1119
    work
1120
      (Mina_stdlib.Nonempty_list.tail t.trees)
1,000✔
1121
      ~max_base_jobs:t.max_base_jobs ~delay
1122
  in
1123
  let count = min data_count t.max_base_jobs in
1,000✔
1124
  if current_tree_space < count then
1,000✔
1125
    let set2 =
360✔
1126
      List.take
1127
        (work
360✔
1128
           (Mina_stdlib.Nonempty_list.to_list t.trees)
360✔
1129
           ~max_base_jobs:t.max_base_jobs ~delay )
1130
        ((count - current_tree_space) * 2)
1131
    in
1132
    List.filter ~f:(Fn.compose not List.is_empty) [ set1; set2 ]
360✔
1133
  else
1134
    let set = List.take set1 (2 * count) in
640✔
1135
    if List.is_empty set then [] else [ set ]
100✔
1136

1137
let free_space_on_current_tree t =
1138
  let tree = Mina_stdlib.Nonempty_list.head t.trees in
1,400✔
1139
  Tree.available_space tree
1,400✔
1140

1141
let cons b bs =
1142
  Option.value_map (Mina_stdlib.Nonempty_list.of_list_opt bs)
120✔
1143
    ~default:(Mina_stdlib.Nonempty_list.singleton b) ~f:(fun bs ->
120✔
1144
      Mina_stdlib.Nonempty_list.cons b bs )
120✔
1145

1146
let append bs bs' =
1147
  Option.value_map (Mina_stdlib.Nonempty_list.of_list_opt bs') ~default:bs
160✔
1148
    ~f:(fun bs' -> Mina_stdlib.Nonempty_list.append bs bs')
120✔
1149

1150
let add_merge_jobs :
1151
    completed_jobs:'merge list -> ('base, 'merge, _) State_or_error.t =
1152
 fun ~completed_jobs ->
1153
  let open State_or_error.Let_syntax in
800✔
1154
  if List.length completed_jobs = 0 then return None
800✔
1155
  else
1156
    let%bind state = State_or_error.get in
1157
    let delay = state.delay + 1 in
×
1158
    let depth = Int.ceil_log2 state.max_base_jobs in
1159
    let merge_jobs = List.map completed_jobs ~f:(fun j -> Job.Merge j) in
×
1160
    let jobs_required = work_for_tree state ~data_tree:`Current in
×
1161
    let%bind () =
1162
      check
×
1163
        (List.length merge_jobs > List.length jobs_required)
×
1164
        ~message:
1165
          (sprintf
×
1166
             !"More work than required: Required- %d got- %d"
1167
             (List.length jobs_required)
×
1168
             (List.length merge_jobs) )
×
1169
    in
1170
    let curr_tree, to_be_updated_trees =
×
1171
      ( Mina_stdlib.Nonempty_list.head state.trees
×
1172
      , Mina_stdlib.Nonempty_list.tail state.trees )
×
1173
    in
1174
    let%bind updated_trees, result_opt, _ =
1175
      let res =
1176
        List.foldi to_be_updated_trees
1177
          ~init:(Ok ([], None, merge_jobs))
1178
          ~f:(fun i acc tree ->
1179
            let open Or_error.Let_syntax in
×
1180
            let%bind trees, scan_result, jobs = acc in
1181
            if i % delay = delay - 1 && not (List.is_empty jobs) then
×
1182
              (*Every nth (n=delay) tree*)
1183
              match
×
1184
                Tree.update
1185
                  (List.take jobs (Tree.required_job_count tree))
×
1186
                  ~update_level:(depth - (i / delay))
1187
                  ~sequence_no:state.curr_job_seq_no ~weight_lens:Weight.merge
1188
                  tree
1189
              with
1190
              | Ok (tree', scan_result') ->
×
1191
                  Ok
1192
                    ( tree' :: trees
1193
                    , scan_result'
1194
                    , List.drop jobs (Tree.required_job_count tree) )
×
1195
              | Error e ->
×
1196
                  Error
1197
                    (Error.tag_arg e "Error while adding merge jobs to tree"
×
1198
                       ("tree_number", i) [%sexp_of: string * int] )
1199
            else Ok (tree :: trees, scan_result, jobs) )
×
1200
      in
1201
      match res with
×
1202
      | Ok res ->
×
1203
          State_or_error.return res
×
1204
      | Error e ->
×
1205
          return_error e ([], None, [])
×
1206
    in
1207
    let updated_trees, result_opt =
×
1208
      let updated_trees, result_opt =
1209
        Option.value_map result_opt
1210
          ~default:(List.rev updated_trees, None)
×
1211
          ~f:(fun res ->
1212
            match updated_trees with
×
1213
            | [] ->
×
1214
                ([], None)
1215
            | t :: ts ->
×
1216
                let tree_data = Tree.base_jobs t in
1217
                (List.rev ts, Some (res, tree_data)) )
×
1218
      in
1219
      if
×
1220
        Option.is_some result_opt
×
1221
        || List.length (curr_tree :: updated_trees) < max_trees state
×
1222
           && List.length completed_jobs = List.length jobs_required
×
1223
        (*exact number of jobs*)
1224
      then (List.map updated_trees ~f:(Tree.reset_weights `Merge), result_opt)
×
1225
      else (updated_trees, result_opt)
×
1226
    in
1227
    let all_trees = cons curr_tree updated_trees in
1228
    let%map _ = State_or_error.put { state with trees = all_trees } in
×
1229
    result_opt
×
1230

1231
let add_data : data:'base list -> (_, _, 'base) State_or_error.t =
1232
 fun ~data ->
1233
  let open State_or_error.Let_syntax in
800✔
1234
  if List.length data = 0 then return ()
640✔
1235
  else
1236
    let%bind state = State_or_error.get in
1237
    let depth = Int.ceil_log2 state.max_base_jobs in
160✔
1238
    let tree = Mina_stdlib.Nonempty_list.head state.trees in
160✔
1239
    let base_jobs = List.map data ~f:(fun j -> Job.Base j) in
160✔
1240
    let available_space = Tree.available_space tree in
160✔
1241
    let%bind () =
1242
      check
160✔
1243
        (List.length data > available_space)
160✔
1244
        ~message:
1245
          (sprintf
160✔
1246
             !"Data count (%d) exceeded available space (%d)"
1247
             (List.length data) available_space )
160✔
1248
    in
1249
    let%bind tree, _ =
1250
      match
1251
        Tree.update base_jobs ~update_level:depth
1252
          ~sequence_no:state.curr_job_seq_no ~weight_lens:Weight.base tree
1253
      with
1254
      | Ok res ->
160✔
1255
          State_or_error.return res
160✔
1256
      | Error e ->
×
1257
          return_error
×
1258
            (Error.tag ~tag:"Error while adding base jobs to the tree" e)
×
1259
            (tree, None)
1260
    in
1261
    let updated_trees =
160✔
1262
      if List.length base_jobs = available_space then
160✔
1263
        cons (create_tree ~depth) [ Tree.reset_weights `Both tree ]
120✔
1264
      else Mina_stdlib.Nonempty_list.singleton (Tree.reset_weights `Merge tree)
40✔
1265
    in
1266
    let%map _ =
1267
      State_or_error.put
160✔
1268
        { state with
1269
          trees =
1270
            append updated_trees (Mina_stdlib.Nonempty_list.tail state.trees)
160✔
1271
        }
1272
    in
1273
    ()
160✔
1274

1275
let reset_seq_no : type a b. (a, b) t -> (a, b) t =
1276
 fun state ->
1277
  let oldest_seq_no =
×
1278
    match
1279
      List.hd @@ Tree.leaves (Mina_stdlib.Nonempty_list.last state.trees)
×
1280
    with
1281
    | Some (_, Base.Job.Full { seq_no; _ }) ->
×
1282
        seq_no
1283
    | _ ->
×
1284
        0
1285
  in
1286
  let new_seq seq = seq - oldest_seq_no + 1 in
×
1287
  let f_merge (a : a Merge.t) : a Merge.t =
1288
    match a with
×
1289
    | w, Merge.Job.Full ({ seq_no; _ } as x) ->
×
1290
        (w, Merge.Job.Full { x with seq_no = new_seq seq_no })
×
1291
    | m ->
×
1292
        m
1293
  in
1294
  let f_base (b : b Base.t) : b Base.t =
1295
    match b with
×
1296
    | w, Base.Job.Full ({ seq_no; _ } as x) ->
×
1297
        (w, Base.Job.Full { x with seq_no = new_seq seq_no })
×
1298
    | b ->
×
1299
        b
1300
  in
1301
  let next_seq_no, updated_trees =
1302
    List.fold ~init:(0, []) (Mina_stdlib.Nonempty_list.to_list state.trees)
×
1303
      ~f:(fun (max_seq, updated_trees) tree ->
1304
        let tree' = Tree.map ~f_base ~f_merge tree in
×
1305
        let seq_no =
×
1306
          match List.last @@ Tree.leaves tree' with
×
1307
          | Some (_, Base.Job.Full { seq_no; _ }) ->
×
1308
              max seq_no max_seq
×
1309
          | _ ->
×
1310
              max_seq
1311
        in
1312
        (seq_no, tree' :: updated_trees) )
1313
  in
1314
  { state with
×
1315
    curr_job_seq_no = next_seq_no
1316
  ; trees =
1317
      Option.value_exn
×
1318
        (Mina_stdlib.Nonempty_list.of_list_opt (List.rev updated_trees))
×
1319
  }
1320

1321
let incr_sequence_no : type a b. (a, b) t -> (unit, a, b) State_or_error.t =
1322
 fun state ->
1323
  let open State_or_error in
400✔
1324
  if state.curr_job_seq_no + 1 = Int.max_value then
1325
    let state = reset_seq_no state in
×
1326
    put state
×
1327
  else put { state with curr_job_seq_no = state.curr_job_seq_no + 1 }
400✔
1328

1329
let update_metrics t =
1330
  Or_error.try_with (fun () ->
200✔
1331
      List.rev (Mina_stdlib.Nonempty_list.to_list t.trees)
200✔
1332
      |> List.iteri ~f:(fun i t ->
1333
             let name = sprintf "tree%d" i in
740✔
1334
             Mina_metrics.(
740✔
1335
               Gauge.set (Scan_state_metrics.scan_state_available_space ~name))
740✔
1336
               (Int.to_float @@ Tree.available_space t) ;
740✔
1337
             let base_job_count, merge_job_count = Tree.todo_job_count t in
740✔
1338
             Mina_metrics.(
740✔
1339
               Gauge.set (Scan_state_metrics.scan_state_base_snarks ~name))
740✔
1340
               (Int.to_float @@ base_job_count) ;
740✔
1341
             Mina_metrics.(
740✔
1342
               Gauge.set (Scan_state_metrics.scan_state_merge_snarks ~name))
740✔
1343
               (Int.to_float @@ merge_job_count) ) )
740✔
1344

1345
let update_helper :
1346
       data:'base list
1347
    -> completed_jobs:'merge list
1348
    -> ('a, 'merge, 'base) State_or_error.t =
1349
 fun ~data ~completed_jobs ->
1350
  let open State_or_error in
400✔
1351
  let open State_or_error.Let_syntax in
1352
  let%bind t = get in
1353
  let data_count = List.length data in
400✔
1354
  let%bind () =
1355
    check
400✔
1356
      (data_count > t.max_base_jobs)
1357
      ~message:
1358
        (sprintf
400✔
1359
           !"Data count (%d) exceeded maximum (%d)"
1360
           data_count t.max_base_jobs )
1361
  in
1362
  let required_jobs = List.concat @@ work_for_next_update t ~data_count in
400✔
1363
  let%bind () =
1364
    let required = (List.length required_jobs + 1) / 2 in
400✔
1365
    let got = (List.length completed_jobs + 1) / 2 in
400✔
1366
    check
400✔
1367
      (got < required && List.length data > t.max_base_jobs - required + got)
40✔
1368
      ~message:
1369
        (sprintf
400✔
1370
           !"Insufficient jobs (Data count %d): Required- %d got- %d"
1371
           data_count required got )
1372
  in
1373
  let delay = t.delay + 1 in
400✔
1374
  (*Increment the sequence number*)
1375
  let%bind () = incr_sequence_no t in
400✔
1376
  let latest_tree = Mina_stdlib.Nonempty_list.head t.trees in
400✔
1377
  let available_space = Tree.available_space latest_tree in
400✔
1378
  (*Possible that new base jobs is added to a new tree within an update i.e., part of it is added to the first tree and the rest of it to a new tree. This happens when the throughput is not max. This also requires merge jobs to be done on two different set of trees*)
1379
  let data1, data2 = List.split_n data available_space in
400✔
1380
  let required_jobs_for_current_tree =
400✔
1381
    work
400✔
1382
      (Mina_stdlib.Nonempty_list.tail t.trees)
400✔
1383
      ~max_base_jobs:t.max_base_jobs ~delay
1384
    |> List.length
1385
  in
1386
  let jobs1, jobs2 =
400✔
1387
    List.split_n completed_jobs required_jobs_for_current_tree
1388
  in
1389
  (*update first set of jobs and data*)
1390
  let%bind result_opt = add_merge_jobs ~completed_jobs:jobs1 in
1391
  let%bind () = add_data ~data:data1 in
1392
  (*update second set of jobs and data. This will be empty if all the data fit in the current tree*)
1393
  let%bind _ = add_merge_jobs ~completed_jobs:jobs2 in
1394
  let%bind () = add_data ~data:data2 in
1395
  let%bind state = State_or_error.get in
1396
  (*update the latest emitted value *)
1397
  let%bind () =
1398
    State_or_error.put
400✔
1399
      { state with acc = Option.merge result_opt state.acc ~f:Fn.const }
400✔
1400
  in
1401
  (*Check the tree-list length is under max*)
1402
  let%map () =
1403
    check
400✔
1404
      (Mina_stdlib.Nonempty_list.length state.trees > max_trees state)
400✔
1405
      ~message:
1406
        (sprintf
400✔
1407
           !"Tree list length (%d) exceeded maximum (%d)"
1408
           (Mina_stdlib.Nonempty_list.length state.trees)
400✔
1409
           (max_trees state) )
400✔
1410
  in
1411
  result_opt
400✔
1412

1413
let update :
1414
       data:'base list
1415
    -> completed_jobs:'merge list
1416
    -> ('merge, 'base) t
1417
    -> (('merge * 'base list) option * ('merge, 'base) t) Or_error.t =
1418
 fun ~data ~completed_jobs state ->
1419
  State_or_error.run_state (update_helper ~data ~completed_jobs) ~state
400✔
1420

1421
let all_jobs t = all_work t
600✔
1422

1423
let jobs_for_next_update t = work_for_next_update t ~data_count:t.max_base_jobs
600✔
1424

1425
let jobs_for_slots t ~slots = work_for_next_update t ~data_count:slots
×
1426

1427
let free_space t = t.max_base_jobs
1,280✔
1428

1429
let last_emitted_value t = t.acc
520✔
1430

1431
let current_job_sequence_number t = t.curr_job_seq_no
×
1432

1433
let partition_if_overflowing : ('merge, 'base) t -> Space_partition.t =
1434
 fun t ->
1435
  let cur_tree_space = free_space_on_current_tree t in
1,000✔
1436
  (*Check actual work count because it would be zero initially*)
1437
  let work_count = work_for_tree t ~data_tree:`Current |> List.length in
1,000✔
1438
  let work_count_new_tree = work_for_tree t ~data_tree:`Next |> List.length in
1,000✔
1439
  { first = (cur_tree_space, work_count)
1,000✔
1440
  ; second =
1441
      ( if cur_tree_space < t.max_base_jobs then
1442
        let slots = t.max_base_jobs - cur_tree_space in
600✔
1443
        Some (slots, min work_count_new_tree (2 * slots))
600✔
1444
      else None )
400✔
1445
  }
1446

1447
let next_on_new_tree t =
1448
  let curr_tree_space = free_space_on_current_tree t in
400✔
1449
  curr_tree_space = t.max_base_jobs
400✔
1450

1451
let pending_data t =
1452
  List.map Mina_stdlib.Nonempty_list.(to_list @@ rev t.trees) ~f:Tree.base_jobs
20✔
1453

1454
let view_jobs_with_position (state : ('merge, 'base) State.t) fa fd =
1455
  List.fold ~init:[] (Mina_stdlib.Nonempty_list.to_list state.trees)
×
1456
    ~f:(fun acc tree -> Tree.view_jobs_with_position tree fa fd :: acc)
×
1457

1458
let job_count t =
1459
  State.fold_chronological t ~init:(0., 0.)
×
1460
    ~f_merge:(fun (c, c') merge_node ->
1461
      let count_todo, count_done =
×
1462
        match snd merge_node with
1463
        | Merge.Job.Part _ ->
×
1464
            (0.5, 0.)
1465
        | Full { status = Job_status.Todo; _ } ->
×
1466
            (1., 0.)
1467
        | Full { status = Job_status.Done; _ } ->
×
1468
            (0., 1.)
1469
        | Empty ->
×
1470
            (0., 0.)
1471
      in
1472
      (c +. count_todo, c' +. count_done) )
1473
    ~f_base:(fun (c, c') base_node ->
1474
      let count_todo, count_done =
×
1475
        match snd base_node with
1476
        | Base.Job.Empty ->
×
1477
            (0., 0.)
1478
        | Full { status = Job_status.Todo; _ } ->
×
1479
            (1., 0.)
1480
        | Full { status = Job_status.Done; _ } ->
×
1481
            (0., 1.)
1482
      in
1483
      (c +. count_todo, c' +. count_done) )
1484

1485
let assert_job_count t t' ~completed_job_count ~base_job_count ~value_emitted =
1486
  let todo_before, done_before = job_count t in
×
1487
  let todo_after, done_after = job_count t' in
×
1488
  (*ordered list of jobs that is actually called when distributing work*)
1489
  let all_jobs = List.concat (all_jobs t') in
×
1490
  (*list of jobs*)
1491
  let all_jobs_expected =
×
1492
    List.fold ~init:[] (Mina_stdlib.Nonempty_list.to_list t'.trees)
×
1493
      ~f:(fun acc tree -> Tree.jobs_records tree @ acc)
×
1494
    |> List.filter ~f:(fun job ->
1495
           match job with
×
1496
           | Job.Base { status = Job_status.Todo; _ }
×
1497
           | Job.Merge { status = Todo; _ } ->
×
1498
               true
1499
           | _ ->
×
1500
               false )
1501
  in
1502
  assert (List.length all_jobs = List.length all_jobs_expected) ;
×
1503
  let expected_todo_after =
1504
    let new_jobs =
1505
      if value_emitted then (completed_job_count -. 1.) /. 2.
×
1506
      else completed_job_count /. 2.
×
1507
    in
1508
    todo_before +. base_job_count -. completed_job_count +. new_jobs
1509
  in
1510
  let expected_done_after =
1511
    let jobs_from_delete_tree =
1512
      if value_emitted then Float.of_int @@ ((2 * t.max_base_jobs) - 1) else 0.
×
1513
    in
1514
    done_before +. completed_job_count -. jobs_from_delete_tree
1515
  in
1516
  assert (
×
1517
    Float.equal todo_after expected_todo_after
×
1518
    && Float.equal done_after expected_done_after )
×
1519

1520
let test_update t ~data ~completed_jobs =
1521
  let result_opt, t' = update ~data ~completed_jobs t |> Or_error.ok_exn in
×
1522
  assert_job_count t t'
×
1523
    ~base_job_count:(Float.of_int @@ List.length data)
×
1524
    ~completed_job_count:(Float.of_int @@ List.length completed_jobs)
×
1525
    ~value_emitted:(Option.is_some result_opt) ;
×
1526
  (result_opt, t')
×
1527

1528
let%test_module "test" =
1529
  ( module struct
1530
    let%test_unit "always max base jobs" =
1531
      let max_base_jobs = 512 in
×
1532
      let state = empty ~max_base_jobs ~delay:3 in
1533
      let _t' =
1534
        List.foldi ~init:([], state) (List.init 100 ~f:Fn.id)
×
1535
          ~f:(fun i (expected_results, t) _ ->
1536
            let data = List.init max_base_jobs ~f:(fun j -> i + j) in
×
1537
            let expected_results = data :: expected_results in
×
1538
            let work =
1539
              work_for_next_update t ~data_count:(List.length data)
×
1540
              |> List.concat
1541
            in
1542
            let new_merges =
×
1543
              List.map work ~f:(fun job ->
1544
                  match job with Base i -> i | Merge (i, j) -> i + j )
×
1545
            in
1546
            let result_opt, t' =
×
1547
              test_update ~data ~completed_jobs:new_merges t
1548
            in
1549
            let expected_result, remaining_expected_results =
×
1550
              Option.value_map result_opt
1551
                ~default:((0, []), expected_results)
1552
                ~f:(fun _ ->
1553
                  match List.rev expected_results with
×
1554
                  | [] ->
×
1555
                      ((0, []), [])
1556
                  | x :: xs ->
×
1557
                      ((List.sum (module Int) x ~f:Fn.id, x), List.rev xs) )
×
1558
            in
1559
            assert (
×
1560
              [%equal: int * int list]
×
1561
                (Option.value ~default:expected_result result_opt)
×
1562
                expected_result ) ;
1563
            (remaining_expected_results, t') )
1564
      in
1565
      ()
×
1566

1567
    let%test_unit "Random base jobs" =
1568
      let max_base_jobs = 512 in
×
1569
      let t = empty ~max_base_jobs ~delay:3 in
1570
      let state = ref t in
1571
      Quickcheck.test
1572
        (Quickcheck.Generator.list (Int.gen_incl 1 1))
×
1573
        ~f:(fun list ->
1574
          let t = !state in
×
1575
          let data = List.take list max_base_jobs in
1576
          let work =
×
1577
            List.take
1578
              ( work_for_next_update t ~data_count:(List.length data)
×
1579
              |> List.concat )
×
1580
              (List.length data * 2)
×
1581
          in
1582
          let new_merges =
×
1583
            List.map work ~f:(fun job ->
1584
                match job with Base i -> i | Merge (i, j) -> i + j )
×
1585
          in
1586
          let result_opt, t' = test_update ~data ~completed_jobs:new_merges t in
×
1587
          let expected_result =
×
1588
            (max_base_jobs, List.init max_base_jobs ~f:(fun _ -> 1))
×
1589
          in
1590
          assert (
×
1591
            [%equal: int * int list]
×
1592
              (Option.value ~default:expected_result result_opt)
×
1593
              expected_result ) ;
1594
          state := t' )
1595
  end )
1596

1597
let gen :
1598
       gen_data:'d Quickcheck.Generator.t
1599
    -> f_job_done:(('a, 'd) Available_job.t -> 'a)
1600
    -> f_acc:(('a * 'd list) option -> 'a * 'd list -> ('a * 'd list) option)
1601
    -> ('a, 'd) State.t Quickcheck.Generator.t =
1602
 fun ~gen_data ~f_job_done ~f_acc ->
1603
  let open Quickcheck.Generator.Let_syntax in
×
1604
  let%bind depth, delay =
1605
    Quickcheck.Generator.tuple2 (Int.gen_incl 2 5) (Int.gen_incl 0 3)
×
1606
  in
1607
  let max_base_jobs = Int.pow 2 depth in
×
1608
  let s = State.empty ~max_base_jobs ~delay in
×
1609
  let%map datas =
1610
    Quickcheck.Generator.(
1611
      list_non_empty (list_with_length max_base_jobs gen_data))
×
1612
  in
1613
  List.fold datas ~init:s ~f:(fun s chunk ->
×
1614
      let jobs =
×
1615
        List.concat (work_for_next_update s ~data_count:(List.length chunk))
×
1616
      in
1617
      let jobs_done = List.map jobs ~f:f_job_done in
×
1618
      let old_tuple = s.acc in
×
1619
      let res_opt, s =
1620
        Or_error.ok_exn @@ update ~data:chunk s ~completed_jobs:jobs_done
×
1621
      in
1622
      Option.value_map ~default:s res_opt ~f:(fun x ->
×
1623
          let tuple = if Option.is_some old_tuple then old_tuple else s.acc in
×
1624
          { s with acc = f_acc tuple x } ) )
×
1625

1626
let%test_module "scans" =
1627
  ( module struct
1628
    module Queue = Queue
1629

1630
    let rec step_on_free_space state w ds f f_acc =
1631
      let data = List.take ds state.max_base_jobs in
×
1632
      let jobs =
×
1633
        List.concat (work_for_next_update state ~data_count:(List.length data))
×
1634
      in
1635
      let jobs_done = List.map jobs ~f in
×
1636
      let old_tuple = state.acc in
×
1637
      let res_opt, state = test_update ~data state ~completed_jobs:jobs_done in
1638
      let state =
×
1639
        Option.value_map ~default:state res_opt ~f:(fun x ->
1640
            let tuple =
×
1641
              if Option.is_some old_tuple then f_acc old_tuple x else state.acc
×
1642
            in
1643
            { state with acc = tuple } )
1644
      in
1645
      let%bind () = Linear_pipe.write w state.acc in
×
1646
      let rem_ds = List.drop ds state.max_base_jobs in
×
1647
      if List.length rem_ds > 0 then step_on_free_space state w rem_ds f f_acc
×
1648
      else return state
×
1649

1650
    let do_steps ~state ~data ~f ~f_acc w =
1651
      let rec go () =
×
1652
        match%bind Linear_pipe.read' data with
×
1653
        | `Eof ->
×
1654
            return ()
1655
        | `Ok q ->
×
1656
            let ds = Queue.to_list q in
1657
            let%bind s = step_on_free_space !state w ds f f_acc in
×
1658
            state := s ;
×
1659
            go ()
1660
      in
1661
      go ()
1662

1663
    let scan ~data ~depth ~f ~f_acc =
1664
      Linear_pipe.create_reader ~close_on_exception:true (fun w ->
×
1665
          let s = ref (empty ~max_base_jobs:(Int.pow 2 depth) ~delay:1) in
×
1666
          do_steps ~state:s ~data ~f w ~f_acc )
1667

1668
    let step_repeatedly ~state ~data ~f ~f_acc =
1669
      Linear_pipe.create_reader ~close_on_exception:true (fun w ->
×
1670
          do_steps ~state ~data ~f w ~f_acc )
×
1671

1672
    let%test_module "scan (+) over ints" =
1673
      ( module struct
1674
        let f_merge_up (state : (int64 * int64 list) option) x =
1675
          let open Option.Let_syntax in
×
1676
          let%map acc = state in
1677
          (Int64.( + ) (fst acc) (fst x), snd acc @ snd x)
×
1678

1679
        let job_done (job : (Int64.t, Int64.t) Available_job.t) : Int64.t =
1680
          match job with Base x -> x | Merge (x, y) -> Int64.( + ) x y
×
1681

1682
        let%test_unit "Split only if enqueuing onto the next queue" =
1683
          let p = 4 in
×
1684
          let max_base_jobs = Int.pow 2 p in
1685
          let g = Int.gen_incl 0 max_base_jobs in
×
1686
          let state = State.empty ~max_base_jobs ~delay:1 in
×
1687
          Quickcheck.test g ~trials:1000 ~f:(fun i ->
1688
              let data = List.init i ~f:Int64.of_int in
×
1689
              let partition = partition_if_overflowing state in
×
1690
              let jobs =
×
1691
                List.concat
1692
                @@ work_for_next_update state ~data_count:(List.length data)
×
1693
              in
1694
              let jobs_done = List.map jobs ~f:job_done in
×
1695
              let tree_count_before =
×
1696
                Mina_stdlib.Nonempty_list.length state.trees
1697
              in
1698
              let _, state =
×
1699
                test_update ~data state ~completed_jobs:jobs_done
1700
              in
1701
              match partition.second with
×
1702
              | None ->
×
1703
                  let tree_count_after =
1704
                    Mina_stdlib.Nonempty_list.length state.trees
1705
                  in
1706
                  let expected_tree_count =
×
1707
                    if i = fst partition.first then tree_count_before + 1
×
1708
                    else tree_count_before
×
1709
                  in
1710
                  assert (tree_count_after = expected_tree_count)
×
1711
              | Some _ ->
×
1712
                  let tree_count_after =
1713
                    Mina_stdlib.Nonempty_list.length state.trees
1714
                  in
1715
                  let expected_tree_count =
×
1716
                    if i > fst partition.first then tree_count_before + 1
×
1717
                    else tree_count_before
×
1718
                  in
1719
                  assert (tree_count_after = expected_tree_count) )
×
1720

1721
        let%test_unit "sequence number reset" =
1722
          (*create jobs with unique sequence numbers starting from 1. At any
1723
            point, after reset, the jobs should be labelled starting from 1.
1724
          *)
1725
          Backtrace.elide := false ;
×
1726
          let p = 3 in
1727
          let g = Int.gen_incl 0 (Int.pow 2 p) in
×
1728
          let max_base_jobs = Int.pow 2 p in
×
1729
          let jobs state =
×
1730
            List.fold ~init:[] (Mina_stdlib.Nonempty_list.to_list state.trees)
×
1731
              ~f:(fun acc tree -> Tree.jobs_records tree :: acc)
×
1732
          in
1733
          let verify_sequence_number state =
1734
            let state = reset_seq_no state in
×
1735
            let jobs_list = jobs state in
×
1736
            let depth = Int.ceil_log2 max_base_jobs + 1 in
×
1737
            List.iteri jobs_list ~f:(fun i jobs ->
1738
                (*each tree has jobs up till a level below the older tree*)
1739
                (* and have the following sequence numbers after reset
1740
                   *         4
1741
                   *     3       3
1742
                   *   2   2   2   2
1743
                   *  1 1 1 1 1 1 1 1
1744
                *)
1745
                let cur_levels = depth - i in
×
1746
                let seq_sum =
1747
                  List.fold (List.init cur_levels ~f:Fn.id) ~init:0
×
1748
                    ~f:(fun acc j ->
1749
                      let j = j + i in
×
1750
                      acc + (Int.pow 2 j * (depth - j)) )
×
1751
                in
1752
                let offset = i in
×
1753
                let sum_of_all_seq_numbers =
1754
                  List.sum
1755
                    (module Int)
1756
                    ~f:(fun (job :
1757
                              (int64 Merge.Record.t, int64 Base.Record.t) Job.t
1758
                              ) ->
1759
                      match job with
×
1760
                      | Job.Merge { seq_no; _ } ->
×
1761
                          seq_no - offset
1762
                      | Base { seq_no; _ } ->
×
1763
                          seq_no - offset )
1764
                    jobs
1765
                in
1766
                assert (sum_of_all_seq_numbers = seq_sum) )
×
1767
          in
1768
          let state = ref (State.empty ~max_base_jobs ~delay:0) in
1769
          let counter = ref 0 in
1770
          Quickcheck.test g ~trials:50 ~f:(fun _ ->
1771
              let jobs = List.concat (jobs_for_next_update !state) in
×
1772
              let jobs_done = List.map jobs ~f:job_done in
×
1773
              let data = List.init max_base_jobs ~f:Int64.of_int in
×
1774
              let res_opt, s =
×
1775
                test_update ~data !state ~completed_jobs:jobs_done
1776
              in
1777
              state := s ;
×
1778
              if Option.is_some res_opt then
1779
                (*start the rest after enough jobs are created*)
1780
                if !counter >= p + 1 then verify_sequence_number !state
×
1781
                else counter := !counter + 1
×
1782
              else () )
×
1783

1784
        let%test_unit "serialize, deserialize scan state" =
1785
          Backtrace.elide := false ;
×
1786
          let g =
1787
            gen
1788
              ~gen_data:
1789
                Quickcheck.Generator.Let_syntax.(
1790
                  Int.quickcheck_generator >>| Int64.of_int)
×
1791
              ~f_job_done:job_done ~f_acc:f_merge_up
1792
          in
1793
          Quickcheck.test g ~sexp_of:[%sexp_of: (int64, int64) State.t]
1794
            ~trials:50 ~f:(fun s ->
1795
              let hash_s = State.hash s Int64.to_string Int64.to_string in
×
1796
              let sz =
×
1797
                State.Stable.Latest.bin_size_t Int64.bin_size_t Int64.bin_size_t
1798
                  s
1799
              in
1800
              let buf = Bin_prot.Common.create_buf sz in
×
1801
              ignore
×
1802
                ( State.Stable.Latest.bin_write_t Int64.bin_write_t
×
1803
                    Int64.bin_write_t buf ~pos:0 s
1804
                  : int ) ;
1805
              let deserialized =
1806
                State.Stable.Latest.bin_read_t Int64.bin_read_t Int64.bin_read_t
1807
                  ~pos_ref:(ref 0) buf
1808
              in
1809
              let new_hash =
×
1810
                State.hash deserialized Int64.to_string Int64.to_string
1811
              in
1812
              assert (Hash.equal hash_s new_hash) )
×
1813

1814
        let%test_unit "scan can be initialized from intermediate state" =
1815
          Backtrace.elide := false ;
×
1816
          let g =
1817
            gen
1818
              ~gen_data:
1819
                Quickcheck.Generator.Let_syntax.(
1820
                  Int.quickcheck_generator >>| Int64.of_int)
×
1821
              ~f_job_done:job_done ~f_acc:f_merge_up
1822
          in
1823
          Quickcheck.test g ~sexp_of:[%sexp_of: (int64, int64) State.t]
1824
            ~trials:10 ~f:(fun s ->
1825
              let s = ref s in
×
1826
              Async.Thread_safe.block_on_async_exn (fun () ->
1827
                  let do_one_next = ref false in
×
1828
                  (* For any arbitrary intermediate state *)
1829
                  (* if we then add 1 and a bunch of zeros *)
1830
                  let one_then_zeros =
1831
                    Linear_pipe.create_reader ~close_on_exception:true (fun w ->
1832
                        let rec go () =
×
1833
                          let next =
×
1834
                            if !do_one_next then (
×
1835
                              do_one_next := false ;
1836
                              Int64.one )
1837
                            else Int64.zero
×
1838
                          in
1839
                          let%bind () = Pipe.write w next in
×
1840
                          go ()
×
1841
                        in
1842
                        go () )
1843
                  in
1844
                  let pipe s =
×
1845
                    step_repeatedly ~state:s ~data:one_then_zeros ~f:job_done
×
1846
                      ~f_acc:f_merge_up
1847
                  in
1848
                  let parallelism =
1849
                    !s.max_base_jobs * Int.ceil_log2 !s.max_base_jobs
×
1850
                  in
1851
                  let fill_some_zeros v s =
1852
                    List.init (parallelism * parallelism) ~f:(fun _ -> ())
×
1853
                    |> Deferred.List.fold ~init:v ~f:(fun v _ ->
1854
                           match%map Linear_pipe.read (pipe s) with
×
1855
                           | `Eof ->
×
1856
                               v
1857
                           | `Ok (Some (v', _)) ->
×
1858
                               v'
1859
                           | `Ok None ->
×
1860
                               v )
1861
                  in
1862
                  (* after we flush intermediate work *)
1863
                  let old_acc =
1864
                    !s.acc |> Option.value ~default:Int64.(zero, [])
1865
                  in
1866
                  let%bind v = fill_some_zeros Int64.zero s in
×
1867
                  do_one_next := true ;
×
1868
                  let acc = !s.acc |> Option.value_exn in
1869
                  assert (not ([%equal: int64] (fst acc) (fst old_acc))) ;
×
1870
                  (* eventually we'll emit the acc+1 element *)
1871
                  let%map _ = fill_some_zeros v s in
×
1872
                  let acc_plus_one = !s.acc |> Option.value_exn in
×
1873
                  assert (Int64.(equal (fst acc_plus_one) (fst acc + one))) ) )
×
1874
      end )
1875

1876
    let%test_module "scan (+) over ints, map from string" =
1877
      ( module struct
1878
        let f_merge_up (tuple : (int64 * string list) option) x =
1879
          let open Option.Let_syntax in
×
1880
          let%map acc = tuple in
1881
          (Int64.( + ) (fst acc) (fst x), snd acc @ snd x)
×
1882

1883
        let job_done (job : (Int64.t, string) Available_job.t) : Int64.t =
1884
          match job with
×
1885
          | Base x ->
×
1886
              Int64.of_string x
1887
          | Merge (x, y) ->
×
1888
              Int64.( + ) x y
1889

1890
        let%test_unit "scan behaves like a fold long-term" =
1891
          let a_bunch_of_ones_then_zeros x =
×
1892
            { Linear_pipe.Reader.pipe =
×
1893
                Pipe.unfold ~init:x ~f:(fun count ->
1894
                    let next =
×
1895
                      if count <= 0 then "0" else Int.to_string (x - count)
×
1896
                    in
1897
                    return (Some (next, count - 1)) )
1898
            ; has_reader = false
1899
            }
1900
          in
1901
          let depth = 7 in
1902
          let n = 1000 in
1903
          let result =
1904
            scan
1905
              ~data:(a_bunch_of_ones_then_zeros n)
×
1906
              ~depth ~f:job_done ~f_acc:f_merge_up
1907
          in
1908
          Async.Thread_safe.block_on_async_exn (fun () ->
1909
              let%map after_3n =
1910
                List.init (4 * n) ~f:(fun _ -> ())
×
1911
                |> Deferred.List.fold ~init:Int64.zero ~f:(fun acc _ ->
×
1912
                       match%map Linear_pipe.read result with
×
1913
                       | `Eof ->
×
1914
                           acc
1915
                       | `Ok (Some (v, _)) ->
×
1916
                           v
1917
                       | `Ok None ->
×
1918
                           acc )
1919
              in
1920
              let expected =
×
1921
                List.fold
1922
                  (List.init n ~f:(fun i -> Int64.of_int i))
×
1923
                  ~init:Int64.zero ~f:Int64.( + )
1924
              in
1925
              assert ([%equal: int64] after_3n expected) )
×
1926
      end )
1927

1928
    let%test_module "scan (concat) over strings" =
1929
      ( module struct
1930
        let f_merge_up (tuple : (string * string list) option) x =
1931
          let open Option.Let_syntax in
×
1932
          let%map acc = tuple in
1933
          (String.( ^ ) (fst acc) (fst x), snd acc @ snd x)
×
1934

1935
        let job_done (job : (string, string) Available_job.t) : string =
1936
          match job with Base x -> x | Merge (x, y) -> String.( ^ ) x y
×
1937

1938
        let%test_unit "scan performs operation in correct order with \
1939
                       non-commutative semigroup" =
1940
          Backtrace.elide := false ;
×
1941
          let a_bunch_of_nums_then_empties x =
1942
            { Linear_pipe.Reader.pipe =
×
1943
                Pipe.unfold ~init:x ~f:(fun count ->
1944
                    let next =
×
1945
                      if count <= 0 then "" else Int.to_string (x - count) ^ ","
×
1946
                    in
1947
                    return (Some (next, count - 1)) )
1948
            ; has_reader = false
1949
            }
1950
          in
1951
          let n = 100 in
1952
          let result =
1953
            scan
1954
              ~data:(a_bunch_of_nums_then_empties n)
×
1955
              ~depth:7 ~f:job_done ~f_acc:f_merge_up
1956
          in
1957
          Async.Thread_safe.block_on_async_exn (fun () ->
1958
              let%map after_42n =
1959
                List.init (42 * n) ~f:(fun _ -> ())
×
1960
                |> Deferred.List.fold ~init:"" ~f:(fun acc _ ->
×
1961
                       match%map Linear_pipe.read result with
×
1962
                       | `Eof ->
×
1963
                           acc
1964
                       | `Ok (Some (v, _)) ->
×
1965
                           v
1966
                       | `Ok None ->
×
1967
                           acc )
1968
              in
1969
              let expected =
×
1970
                List.fold
1971
                  (List.init n ~f:(fun i -> Int.to_string i ^ ","))
×
1972
                  ~init:"" ~f:String.( ^ )
1973
              in
1974
              assert (String.equal after_42n expected) )
×
1975
      end )
1976
  end )
294✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc