• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 3409

26 Feb 2025 01:10PM UTC coverage: 32.353% (-28.4%) from 60.756%
3409

push

buildkite

web-flow
Merge pull request #16687 from MinaProtocol/dw/merge-compatible-into-develop-20250225

Merge compatible into develop [20250224]

23144 of 71535 relevant lines covered (32.35%)

16324.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.6
/src/lib/work_selector/work_lib.ml
1
open Core_kernel
6✔
2
open Currency
3
open Async
4

5
module Make (Inputs : Intf.Inputs_intf) = struct
6
  module Inputs = Inputs
7
  module Work_spec = Snark_work_lib.Work.Single.Spec
8

9
  module Job_status = struct
10
    type t = Assigned of Time.t
11

12
    let is_old (Assigned at_time) ~now ~reassignment_wait =
13
      let max_age = Time.Span.of_ms (Float.of_int reassignment_wait) in
×
14
      let delta = Time.diff now at_time in
×
15
      Time.Span.( > ) delta max_age
×
16
  end
17

18
  module State = struct
19
    module Seen_key = struct
20
      module T = struct
21
        type t = Transaction_snark.Statement.t One_or_two.t
×
22
        [@@deriving compare, sexp, to_yojson, hash]
×
23
      end
24

25
      include T
26
      include Comparable.Make (T)
27
    end
28

29
    type t =
30
      { mutable available_jobs :
31
          (Inputs.Transaction_witness.t, Inputs.Ledger_proof.t) Work_spec.t
32
          One_or_two.t
33
          list
34
      ; mutable jobs_seen : Job_status.t Seen_key.Map.t
35
      ; reassignment_wait : int
36
      }
37

38
    let init :
39
           reassignment_wait:int
40
        -> frontier_broadcast_pipe:
41
             Inputs.Transition_frontier.t option
42
             Pipe_lib.Broadcast_pipe.Reader.t
43
        -> logger:Logger.t
44
        -> t =
45
     fun ~reassignment_wait ~frontier_broadcast_pipe ~logger ->
46
      let t =
×
47
        { available_jobs = []
48
        ; jobs_seen = Seen_key.Map.empty
49
        ; reassignment_wait
50
        }
51
      in
52
      Pipe_lib.Broadcast_pipe.Reader.iter frontier_broadcast_pipe
53
        ~f:(fun frontier_opt ->
54
          ( match frontier_opt with
×
55
          | None ->
×
56
              [%log debug] "No frontier, setting available work to be empty" ;
×
57
              t.available_jobs <- []
×
58
          | Some frontier ->
×
59
              Pipe_lib.Broadcast_pipe.Reader.iter
60
                (Inputs.Transition_frontier.best_tip_pipe frontier) ~f:(fun _ ->
×
61
                  let best_tip_staged_ledger =
×
62
                    Inputs.Transition_frontier.best_tip_staged_ledger frontier
63
                  in
64
                  let start_time = Time.now () in
×
65
                  ( match
×
66
                      Inputs.Staged_ledger.all_work_pairs best_tip_staged_ledger
67
                        ~get_state:
68
                          (Inputs.Transition_frontier.get_protocol_state
×
69
                             frontier )
70
                    with
71
                  | Error e ->
×
72
                      [%log fatal]
×
73
                        "Error occured when updating available work: $error"
74
                        ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
75
                  | Ok new_available_jobs ->
×
76
                      let end_time = Time.now () in
77
                      [%log info] "Updating new available work took $time ms"
×
78
                        ~metadata:
79
                          [ ( "time"
80
                            , `Float
81
                                ( Time.diff end_time start_time
82
                                |> Time.Span.to_ms ) )
×
83
                          ] ;
84
                      let new_available_jobs_unwrapped :
×
85
                          ( Inputs.Transaction_witness.t
86
                          , Inputs.Ledger_proof.t )
87
                          Work_spec.t
88
                          One_or_two.t
89
                          list =
90
                        let f =
91
                          Snark_work_lib.Work.Single.Spec.map ~f_witness:ident
92
                            ~f_proof:
93
                              Inputs.Ledger_proof.Cached.read_proof_from_disk
94
                        in
95
                        List.map new_available_jobs ~f:(One_or_two.map ~f)
×
96
                      in
97
                      t.available_jobs <- new_available_jobs_unwrapped ) ;
98
                  Deferred.unit )
99
              |> Deferred.don't_wait_for ) ;
×
100
          Deferred.unit )
101
      |> Deferred.don't_wait_for ;
×
102
      t
×
103

104
    let all_unseen_works t =
105
      O1trace.sync_thread "work_lib_all_unseen_works" (fun () ->
×
106
          List.filter t.available_jobs ~f:(fun js ->
×
107
              not
×
108
              @@ Map.mem t.jobs_seen (One_or_two.map ~f:Work_spec.statement js) ) )
×
109

110
    let remove_old_assignments t ~logger =
111
      O1trace.sync_thread "work_lib_remove_old_assignments" (fun () ->
×
112
          let now = Time.now () in
×
113
          t.jobs_seen <-
×
114
            Map.filteri t.jobs_seen ~f:(fun ~key:work ~data:status ->
×
115
                if
×
116
                  Job_status.is_old status ~now
117
                    ~reassignment_wait:t.reassignment_wait
118
                then (
×
119
                  [%log info]
×
120
                    ~metadata:[ ("work", Seen_key.to_yojson work) ]
×
121
                    "Waited too long to get work for $work. Ready to be \
122
                     reassigned" ;
123
                  Mina_metrics.(
×
124
                    Counter.inc_one Snark_work.snark_work_timed_out_rpc) ;
×
125
                  false )
126
                else true ) )
×
127

128
    let remove t x =
129
      t.jobs_seen <-
×
130
        Map.remove t.jobs_seen (One_or_two.map ~f:Work_spec.statement x)
×
131

132
    let set t x =
133
      t.jobs_seen <-
×
134
        Map.set t.jobs_seen
×
135
          ~key:(One_or_two.map ~f:Work_spec.statement x)
×
136
          ~data:(Job_status.Assigned (Time.now ()))
×
137
  end
138

139
  let does_not_have_better_fee ~snark_pool ~fee
140
      (statements : Inputs.Transaction_snark_work.Statement.t) : bool =
141
    Option.value_map ~default:true
×
142
      (Inputs.Snark_pool.get_completed_work snark_pool statements)
×
143
      ~f:(fun priced_proof ->
144
        let competing_fee =
×
145
          Inputs.Transaction_snark_work.Checked.fee priced_proof
146
        in
147
        Fee.compare fee competing_fee < 0 )
×
148

149
  module For_tests = struct
150
    let does_not_have_better_fee = does_not_have_better_fee
151
  end
152

153
  let get_expensive_work ~snark_pool ~fee
154
      (jobs : ('a, 'b) Work_spec.t One_or_two.t list) :
155
      ('a, 'b) Work_spec.t One_or_two.t list =
156
    O1trace.sync_thread "work_lib_get_expensive_work" (fun () ->
×
157
        List.filter jobs ~f:(fun job ->
×
158
            does_not_have_better_fee ~snark_pool ~fee
×
159
              (One_or_two.map job ~f:Work_spec.statement) ) )
×
160

161
  let all_pending_work ~snark_pool statements =
162
    List.filter statements ~f:(fun st ->
×
163
        Option.is_none (Inputs.Snark_pool.get_completed_work snark_pool st) )
×
164

165
  let all_work ~snark_pool (state : State.t) =
166
    O1trace.sync_thread "work_lib_all_unseen_works" (fun () ->
×
167
        List.map state.available_jobs ~f:(fun job ->
×
168
            let statement = One_or_two.map ~f:Work_spec.statement job in
×
169
            let fee_prover_opt =
×
170
              Option.map
171
                (Inputs.Snark_pool.get_completed_work snark_pool statement)
×
172
                ~f:(fun (p : Inputs.Transaction_snark_work.Checked.t) ->
173
                  ( Inputs.Transaction_snark_work.Checked.fee p
×
174
                  , Inputs.Transaction_snark_work.Checked.prover p ) )
×
175
            in
176
            (job, fee_prover_opt) ) )
×
177

178
  let all_completed_work ~snark_pool statements =
179
    List.filter_map statements ~f:(fun st ->
×
180
        Inputs.Snark_pool.get_completed_work snark_pool st )
×
181

182
  (*Seen/Unseen jobs that are not in the snark pool yet*)
183
  let pending_work_statements ~snark_pool ~fee_opt (state : State.t) =
184
    let all_todo_statements =
×
185
      List.map state.available_jobs ~f:(One_or_two.map ~f:Work_spec.statement)
186
    in
187
    let expensive_work statements ~fee =
×
188
      List.filter statements ~f:(does_not_have_better_fee ~snark_pool ~fee)
×
189
    in
190
    match fee_opt with
191
    | None ->
×
192
        all_pending_work ~snark_pool all_todo_statements
193
    | Some fee ->
×
194
        expensive_work all_todo_statements ~fee
195

196
  let completed_work_statements ~snark_pool (state : State.t) =
197
    let all_todo_statements =
×
198
      List.map state.available_jobs ~f:(One_or_two.map ~f:Work_spec.statement)
199
    in
200
    all_completed_work ~snark_pool all_todo_statements
×
201
end
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc