• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2863

05 Nov 2024 06:20PM UTC coverage: 30.754% (-16.6%) from 47.311%
2863

push

buildkite

web-flow
Merge pull request #16296 from MinaProtocol/dkijania/more_multi_jobs

more multi jobs in CI

20276 of 65930 relevant lines covered (30.75%)

8631.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.13
/src/lib/work_selector/work_lib.ml
1
open Core_kernel
1✔
2
open Currency
3
open Async
4

5
module Make (Inputs : Intf.Inputs_intf) = struct
6
  module Work_spec = Snark_work_lib.Work.Single.Spec
7

8
  module Job_status = struct
9
    type t = Assigned of Time.t
10

11
    let is_old (Assigned at_time) ~now ~reassignment_wait =
12
      let max_age = Time.Span.of_ms (Float.of_int reassignment_wait) in
×
13
      let delta = Time.diff now at_time in
×
14
      Time.Span.( > ) delta max_age
×
15
  end
16

17
  module State = struct
18
    module Seen_key = struct
19
      module T = struct
20
        type t = Transaction_snark.Statement.t One_or_two.t
×
21
        [@@deriving compare, sexp, to_yojson, hash]
×
22
      end
23

24
      include T
25
      include Comparable.Make (T)
26
    end
27

28
    type t =
29
      { mutable available_jobs :
30
          (Inputs.Transaction_witness.t, Inputs.Ledger_proof.t) Work_spec.t
31
          One_or_two.t
32
          list
33
      ; mutable jobs_seen : Job_status.t Seen_key.Map.t
34
      ; reassignment_wait : int
35
      }
36

37
    let init :
38
           reassignment_wait:int
39
        -> frontier_broadcast_pipe:
40
             Inputs.Transition_frontier.t option
41
             Pipe_lib.Broadcast_pipe.Reader.t
42
        -> logger:Logger.t
43
        -> t =
44
     fun ~reassignment_wait ~frontier_broadcast_pipe ~logger ->
45
      let t =
×
46
        { available_jobs = []
47
        ; jobs_seen = Seen_key.Map.empty
48
        ; reassignment_wait
49
        }
50
      in
51
      Pipe_lib.Broadcast_pipe.Reader.iter frontier_broadcast_pipe
52
        ~f:(fun frontier_opt ->
53
          ( match frontier_opt with
×
54
          | None ->
×
55
              [%log debug] "No frontier, setting available work to be empty" ;
×
56
              t.available_jobs <- []
×
57
          | Some frontier ->
×
58
              Pipe_lib.Broadcast_pipe.Reader.iter
59
                (Inputs.Transition_frontier.best_tip_pipe frontier) ~f:(fun _ ->
×
60
                  let best_tip_staged_ledger =
×
61
                    Inputs.Transition_frontier.best_tip_staged_ledger frontier
62
                  in
63
                  let start_time = Time.now () in
×
64
                  ( match
×
65
                      Inputs.Staged_ledger.all_work_pairs best_tip_staged_ledger
66
                        ~get_state:
67
                          (Inputs.Transition_frontier.get_protocol_state
×
68
                             frontier )
69
                    with
70
                  | Error e ->
×
71
                      [%log fatal]
×
72
                        "Error occured when updating available work: $error"
73
                        ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
74
                  | Ok new_available_jobs ->
×
75
                      let end_time = Time.now () in
76
                      [%log info] "Updating new available work took $time ms"
×
77
                        ~metadata:
78
                          [ ( "time"
79
                            , `Float
80
                                ( Time.diff end_time start_time
81
                                |> Time.Span.to_ms ) )
×
82
                          ] ;
83
                      t.available_jobs <- new_available_jobs ) ;
×
84
                  Deferred.unit )
85
              |> Deferred.don't_wait_for ) ;
×
86
          Deferred.unit )
87
      |> Deferred.don't_wait_for ;
×
88
      t
×
89

90
    let all_unseen_works t =
91
      O1trace.sync_thread "work_lib_all_unseen_works" (fun () ->
×
92
          List.filter t.available_jobs ~f:(fun js ->
×
93
              not
×
94
              @@ Map.mem t.jobs_seen (One_or_two.map ~f:Work_spec.statement js) ) )
×
95

96
    let remove_old_assignments t ~logger =
97
      O1trace.sync_thread "work_lib_remove_old_assignments" (fun () ->
×
98
          let now = Time.now () in
×
99
          t.jobs_seen <-
×
100
            Map.filteri t.jobs_seen ~f:(fun ~key:work ~data:status ->
×
101
                if
×
102
                  Job_status.is_old status ~now
103
                    ~reassignment_wait:t.reassignment_wait
104
                then (
×
105
                  [%log info]
×
106
                    ~metadata:[ ("work", Seen_key.to_yojson work) ]
×
107
                    "Waited too long to get work for $work. Ready to be \
108
                     reassigned" ;
109
                  Mina_metrics.(
×
110
                    Counter.inc_one Snark_work.snark_work_timed_out_rpc) ;
×
111
                  false )
112
                else true ) )
×
113

114
    let remove t x =
115
      t.jobs_seen <-
×
116
        Map.remove t.jobs_seen (One_or_two.map ~f:Work_spec.statement x)
×
117

118
    let set t x =
119
      t.jobs_seen <-
×
120
        Map.set t.jobs_seen
×
121
          ~key:(One_or_two.map ~f:Work_spec.statement x)
×
122
          ~data:(Job_status.Assigned (Time.now ()))
×
123
  end
124

125
  let does_not_have_better_fee ~snark_pool ~fee
126
      (statements : Inputs.Transaction_snark_work.Statement.t) : bool =
127
    Option.value_map ~default:true
×
128
      (Inputs.Snark_pool.get_completed_work snark_pool statements)
×
129
      ~f:(fun priced_proof ->
130
        let competing_fee = Inputs.Transaction_snark_work.fee priced_proof in
×
131
        Fee.compare fee competing_fee < 0 )
×
132

133
  module For_tests = struct
134
    let does_not_have_better_fee = does_not_have_better_fee
135
  end
136

137
  let get_expensive_work ~snark_pool ~fee
138
      (jobs : ('a, 'b) Work_spec.t One_or_two.t list) :
139
      ('a, 'b) Work_spec.t One_or_two.t list =
140
    O1trace.sync_thread "work_lib_get_expensive_work" (fun () ->
×
141
        List.filter jobs ~f:(fun job ->
×
142
            does_not_have_better_fee ~snark_pool ~fee
×
143
              (One_or_two.map job ~f:Work_spec.statement) ) )
×
144

145
  let all_pending_work ~snark_pool statements =
146
    List.filter statements ~f:(fun st ->
×
147
        Option.is_none (Inputs.Snark_pool.get_completed_work snark_pool st) )
×
148

149
  (*Seen/Unseen jobs that are not in the snark pool yet*)
150
  let pending_work_statements ~snark_pool ~fee_opt (state : State.t) =
151
    let all_todo_statements =
×
152
      List.map state.available_jobs ~f:(One_or_two.map ~f:Work_spec.statement)
153
    in
154
    let expensive_work statements ~fee =
×
155
      List.filter statements ~f:(does_not_have_better_fee ~snark_pool ~fee)
×
156
    in
157
    match fee_opt with
158
    | None ->
×
159
        all_pending_work ~snark_pool all_todo_statements
160
    | Some fee ->
×
161
        expensive_work all_todo_statements ~fee
162
end
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc