• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 411

24 Jul 2025 03:14PM UTC coverage: 33.188% (-27.7%) from 60.871%
411

push

buildkite

web-flow
Merge pull request #17541 from MinaProtocol/brian/merge-compatible-into-develop

Merge compatible into develop

164 of 702 new or added lines in 96 files covered. (23.36%)

18243 existing lines in 393 files now uncovered.

23983 of 72264 relevant lines covered (33.19%)

24667.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.67
/src/lib/work_selector/work_lib.ml
1
open Core_kernel
56✔
2
open Currency
3
open Async
4

5
module Make (Inputs : Intf.Inputs_intf) = struct
6
  module Inputs = Inputs
7
  module Work_spec = Snark_work_lib.Work.Single.Spec
8

9
  module Job_status = struct
10
    type t = Assigned of Time.t
11

12
    let is_old (Assigned at_time) ~now ~reassignment_wait =
UNCOV
13
      let max_age = Time.Span.of_ms (Float.of_int reassignment_wait) in
×
UNCOV
14
      let delta = Time.diff now at_time in
×
UNCOV
15
      Time.Span.( > ) delta max_age
×
16
  end
17

18
  module State = struct
19
    module Seen_key = struct
20
      module T = struct
21
        type t = Transaction_snark.Statement.t One_or_two.t
×
UNCOV
22
        [@@deriving compare, sexp, to_yojson, hash]
×
23
      end
24

25
      include T
26
      include Comparable.Make (T)
27
    end
28

29
    type t =
30
      { mutable available_jobs :
31
          ( Inputs.Transaction_witness.t
32
          , Inputs.Ledger_proof.Cached.t )
33
          Work_spec.t
34
          One_or_two.t
35
          list
36
            (** Jobs received from [frontier_broadcast_pipe], would be updated
37
                whenever the pipe has broadcasted new frontier. The works
38
                between consecutive frontier broadcasts should be largely
39
                identical. *)
40
      ; mutable jobs_seen : Job_status.t Seen_key.Map.t
41
      ; reassignment_wait : int
42
      }
43

44
    let init :
45
           reassignment_wait:int
46
        -> frontier_broadcast_pipe:
47
             Inputs.Transition_frontier.t option
48
             Pipe_lib.Broadcast_pipe.Reader.t
49
        -> logger:Logger.t
50
        -> t =
51
     fun ~reassignment_wait ~frontier_broadcast_pipe ~logger ->
UNCOV
52
      let t =
×
53
        { available_jobs = []
54
        ; jobs_seen = Seen_key.Map.empty
55
        ; reassignment_wait
56
        }
57
      in
58
      Pipe_lib.Broadcast_pipe.Reader.iter frontier_broadcast_pipe
59
        ~f:(fun frontier_opt ->
UNCOV
60
          ( match frontier_opt with
×
UNCOV
61
          | None ->
×
UNCOV
62
              [%log debug] "No frontier, setting available work to be empty" ;
×
UNCOV
63
              t.available_jobs <- []
×
UNCOV
64
          | Some frontier ->
×
65
              Pipe_lib.Broadcast_pipe.Reader.iter
UNCOV
66
                (Inputs.Transition_frontier.best_tip_pipe frontier) ~f:(fun _ ->
×
UNCOV
67
                  let best_tip_staged_ledger =
×
68
                    Inputs.Transition_frontier.best_tip_staged_ledger frontier
69
                  in
UNCOV
70
                  let start_time = Time.now () in
×
UNCOV
71
                  ( match
×
72
                      Inputs.Staged_ledger.all_work_pairs best_tip_staged_ledger
73
                        ~get_state:
UNCOV
74
                          (Inputs.Transition_frontier.get_protocol_state
×
75
                             frontier )
76
                    with
77
                  | Error e ->
×
78
                      [%log fatal]
×
79
                        "Error occured when updating available work: $error"
80
                        ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
UNCOV
81
                  | Ok new_available_jobs ->
×
82
                      let end_time = Time.now () in
UNCOV
83
                      [%log info] "Updating new available work took $time ms"
×
84
                        ~metadata:
85
                          [ ( "time"
86
                            , `Float
87
                                ( Time.diff end_time start_time
UNCOV
88
                                |> Time.Span.to_ms ) )
×
89
                          ] ;
UNCOV
90
                      t.available_jobs <- new_available_jobs ) ;
×
91
                  Deferred.unit )
UNCOV
92
              |> Deferred.don't_wait_for ) ;
×
93
          Deferred.unit )
UNCOV
94
      |> Deferred.don't_wait_for ;
×
UNCOV
95
      t
×
96

97
    let all_unseen_works t =
UNCOV
98
      O1trace.sync_thread "work_lib_all_unseen_works" (fun () ->
×
UNCOV
99
          List.filter t.available_jobs ~f:(fun js ->
×
UNCOV
100
              not
×
UNCOV
101
              @@ Map.mem t.jobs_seen (One_or_two.map ~f:Work_spec.statement js) ) )
×
102

103
    let remove_old_assignments t ~logger =
UNCOV
104
      O1trace.sync_thread "work_lib_remove_old_assignments" (fun () ->
×
UNCOV
105
          let now = Time.now () in
×
UNCOV
106
          t.jobs_seen <-
×
UNCOV
107
            Map.filteri t.jobs_seen ~f:(fun ~key:work ~data:status ->
×
UNCOV
108
                if
×
109
                  Job_status.is_old status ~now
110
                    ~reassignment_wait:t.reassignment_wait
UNCOV
111
                then (
×
UNCOV
112
                  [%log info]
×
UNCOV
113
                    ~metadata:[ ("work", Seen_key.to_yojson work) ]
×
114
                    "Waited too long to get work for $work. Ready to be \
115
                     reassigned" ;
UNCOV
116
                  Mina_metrics.(
×
UNCOV
117
                    Counter.inc_one Snark_work.snark_work_timed_out_rpc) ;
×
118
                  false )
UNCOV
119
                else true ) )
×
120

121
    let remove t statement = t.jobs_seen <- Map.remove t.jobs_seen statement
×
122

123
    let set t x =
UNCOV
124
      t.jobs_seen <-
×
UNCOV
125
        Map.set t.jobs_seen
×
UNCOV
126
          ~key:(One_or_two.map ~f:Work_spec.statement x)
×
UNCOV
127
          ~data:(Job_status.Assigned (Time.now ()))
×
128
  end
129

130
  let does_not_have_better_fee ~snark_pool ~fee
131
      (statements : Inputs.Transaction_snark_work.Statement.t) : bool =
UNCOV
132
    Option.value_map ~default:true
×
UNCOV
133
      (Inputs.Snark_pool.get_completed_work snark_pool statements)
×
134
      ~f:(fun priced_proof ->
UNCOV
135
        let competing_fee =
×
136
          Inputs.Transaction_snark_work.Checked.fee priced_proof
137
        in
UNCOV
138
        Fee.compare fee competing_fee < 0 )
×
139

140
  module For_tests = struct
141
    let does_not_have_better_fee = does_not_have_better_fee
142
  end
143

144
  let get_expensive_work ~snark_pool ~fee
145
      (jobs : ('a, 'b) Work_spec.t One_or_two.t list) :
146
      ('a, 'b) Work_spec.t One_or_two.t list =
UNCOV
147
    O1trace.sync_thread "work_lib_get_expensive_work" (fun () ->
×
UNCOV
148
        List.filter jobs ~f:(fun job ->
×
UNCOV
149
            does_not_have_better_fee ~snark_pool ~fee
×
UNCOV
150
              (One_or_two.map job ~f:Work_spec.statement) ) )
×
151

152
  let all_pending_work ~snark_pool statements =
153
    List.filter statements ~f:(fun st ->
×
154
        Option.is_none (Inputs.Snark_pool.get_completed_work snark_pool st) )
×
155

156
  let all_work ~snark_pool (state : State.t) =
157
    O1trace.sync_thread "work_lib_all_unseen_works" (fun () ->
×
158
        List.map state.available_jobs ~f:(fun job ->
×
159
            let statement = One_or_two.map ~f:Work_spec.statement job in
×
160
            let fee_prover_opt =
×
161
              Option.map
162
                (Inputs.Snark_pool.get_completed_work snark_pool statement)
×
163
                ~f:(fun (p : Inputs.Transaction_snark_work.Checked.t) ->
164
                  ( Inputs.Transaction_snark_work.Checked.fee p
×
165
                  , Inputs.Transaction_snark_work.Checked.prover p ) )
×
166
            in
167
            (job, fee_prover_opt) ) )
×
168

169
  let all_completed_work ~snark_pool statements =
170
    List.filter_map statements ~f:(fun st ->
×
171
        Inputs.Snark_pool.get_completed_work snark_pool st )
×
172

173
  (*Seen/Unseen jobs that are not in the snark pool yet*)
174
  let pending_work_statements ~snark_pool ~fee_opt (state : State.t) =
175
    let all_todo_statements =
×
176
      List.map state.available_jobs ~f:(One_or_two.map ~f:Work_spec.statement)
177
    in
178
    let expensive_work statements ~fee =
×
179
      List.filter statements ~f:(does_not_have_better_fee ~snark_pool ~fee)
×
180
    in
181
    match fee_opt with
182
    | None ->
×
183
        all_pending_work ~snark_pool all_todo_statements
184
    | Some fee ->
×
185
        expensive_work all_todo_statements ~fee
186

187
  let completed_work_statements ~snark_pool (state : State.t) =
188
    let all_todo_statements =
×
189
      List.map state.available_jobs ~f:(One_or_two.map ~f:Work_spec.statement)
190
    in
191
    all_completed_work ~snark_pool all_todo_statements
×
192
end
56✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc