• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2817

23 Oct 2024 05:55PM UTC coverage: 33.411% (-27.7%) from 61.089%
2817

push

buildkite

web-flow
Merge pull request #16270 from MinaProtocol/dkijania/fix_promotion_job

Fix verify promoted docker check

22271 of 66658 relevant lines covered (33.41%)

131054.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.17
/src/lib/transition_frontier/persistent_frontier/diff_buffer.ml
1
(* TODO: flush on timeout interval in addition to meeting flush capacity *)
3✔
2
open Async_kernel
3
open Core_kernel
4
open Frontier_base
5

6
let max_latency
7
    { Genesis_constants.Constraint_constants.block_window_duration_ms; _ } =
8
  Block_time.Span.(
×
9
    (block_window_duration_ms |> Int64.of_int |> Block_time.Span.of_ms)
×
10
    * of_ms 5L)
×
11

12
module Capacity = struct
13
  let flush = 30
14

15
  let max = flush * 4
16
end
17

18
(* TODO: lift up as Block_time utility *)
19
module Timer = struct
20
  open Block_time
21

22
  type t =
23
    { time_controller : Controller.t
24
    ; f : unit -> unit
25
    ; span : Span.t
26
    ; mutable timeout : unit Timeout.t option
27
    }
28

29
  let create ~time_controller ~f span =
30
    { time_controller; span; f; timeout = None }
×
31

32
  let start t =
33
    assert (Option.is_none t.timeout) ;
×
34
    let rec run_timeout t =
35
      t.timeout <-
×
36
        Some
37
          (Timeout.create t.time_controller t.span ~f:(fun _ ->
×
38
               t.f () ; run_timeout t ) )
×
39
    in
40
    run_timeout t
41

42
  let stop t =
43
    Option.iter t.timeout ~f:(fun timeout ->
×
44
        Timeout.cancel t.time_controller timeout () ) ;
×
45
    t.timeout <- None
×
46

47
  let reset t = stop t ; start t
×
48
end
49

50
type work = { diffs : Diff.Lite.E.t list }
51

52
module Rev_dyn_array : sig
53
  type 'a t
54

55
  val create : unit -> _ t
56

57
  val length : _ t -> int
58

59
  val clear : _ t -> unit
60

61
  val to_list : 'a t -> 'a list
62

63
  val add : 'a t -> 'a -> unit
64
end = struct
65
  type 'a t = { mutable length : int; mutable rev_list : 'a list }
66

67
  let create () = { length = 0; rev_list = [] }
×
68

69
  let length { length; _ } = length
×
70

71
  let to_list { rev_list; _ } = List.rev rev_list
×
72

73
  let clear t =
74
    t.length <- 0 ;
×
75
    t.rev_list <- []
76

77
  let add t x =
78
    t.length <- t.length + 1 ;
×
79
    t.rev_list <- x :: t.rev_list
80
end
81

82
type t =
83
  { diff_array : Diff.Lite.E.t Rev_dyn_array.t
84
  ; worker : Worker.t
85
        (* timer unfortunately needs to be mutable to break recursion *)
86
  ; mutable timer : Timer.t option
87
  ; mutable flush_job : unit Deferred.t option
88
  ; mutable closed : bool
89
  }
90

91
let check_for_overflow t =
92
  if Rev_dyn_array.length t.diff_array > Capacity.max then
×
93
    failwith "persistence buffer overflow"
×
94

95
let should_flush t = Rev_dyn_array.length t.diff_array >= Capacity.flush
×
96

97
let flush t =
98
  let rec flush_job t =
×
99
    let diffs = Rev_dyn_array.to_list t.diff_array in
×
100
    Rev_dyn_array.clear t.diff_array ;
×
101
    let%bind () = Worker.dispatch t.worker diffs in
×
102
    if should_flush t then flush_job t
×
103
    else (
×
104
      t.flush_job <- None ;
105
      Deferred.unit )
106
  in
107
  assert (Option.is_none t.flush_job) ;
×
108
  if Rev_dyn_array.length t.diff_array > 0 then
×
109
    t.flush_job <- Some (flush_job t)
×
110

111
let create ~(constraint_constants : Genesis_constants.Constraint_constants.t)
112
    ~time_controller ~worker =
113
  let t =
×
114
    { diff_array = Rev_dyn_array.create ()
×
115
    ; worker
116
    ; timer = None
117
    ; flush_job = None
118
    ; closed = false
119
    }
120
  in
121
  let timer =
122
    Timer.create ~time_controller
123
      ~f:(fun () -> if Option.is_none t.flush_job then flush t)
×
124
      (max_latency constraint_constants)
×
125
  in
126
  t.timer <- Some timer ;
×
127
  t
128

129
let write t ~diffs =
130
  if t.closed then failwith "attempt to write to diff buffer after closed" ;
×
131
  List.iter diffs ~f:(Rev_dyn_array.add t.diff_array) ;
×
132
  if should_flush t && Option.is_none t.flush_job then flush t
×
133
  else check_for_overflow t
×
134

135
let close_and_finish_copy t =
136
  ( match t.timer with
×
137
  | None ->
×
138
      failwith "diff buffer timer was never initialized"
139
  | Some timer ->
×
140
      Timer.stop timer ) ;
×
141
  t.closed <- true ;
142
  let%bind () = Option.value t.flush_job ~default:Deferred.unit in
×
143
  flush t ;
×
144
  Option.value t.flush_job ~default:Deferred.unit
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc