• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2863

05 Nov 2024 06:20PM UTC coverage: 30.754% (-16.6%) from 47.311%
2863

push

buildkite

web-flow
Merge pull request #16296 from MinaProtocol/dkijania/more_multi_jobs

more multi jobs in CI

20276 of 65930 relevant lines covered (30.75%)

8631.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/lib/transition_frontier_controller/transition_frontier_controller.ml
1
open Core_kernel
2
open Async_kernel
3
open Pipe_lib
4
open Mina_block
5

6
module type CONTEXT = sig
7
  val logger : Logger.t
8

9
  val precomputed_values : Precomputed_values.t
10

11
  val constraint_constants : Genesis_constants.Constraint_constants.t
12

13
  val consensus_constants : Consensus.Constants.t
14

15
  val compile_config : Mina_compile_config.t
16
end
17

18
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
19
    ~time_controller ~collected_transitions ~frontier ~get_completed_work
20
    ~network_transition_reader ~producer_transition_reader ~clear_reader
21
    ~cache_exceptions =
22
  let open Context in
×
23
  let valid_transition_pipe_capacity = 50 in
24
  let start_time = Time.now () in
25
  let f_drop_head name head valid_cb =
×
26
    let block : Mina_block.initial_valid_block =
×
27
      Network_peer.Envelope.Incoming.data @@ Cache_lib.Cached.peek head
×
28
    in
29
    Mina_block.handle_dropped_transition
30
      (Validation.block_with_hash block |> With_hash.hash)
×
31
      ?valid_cb ~pipe_name:name ~logger
32
  in
33
  let valid_transition_reader, valid_transition_writer =
34
    let name = "valid transitions" in
35
    Strict_pipe.create ~name
×
36
      (Buffered
37
         ( `Capacity valid_transition_pipe_capacity
38
         , `Overflow
39
             (Drop_head
40
                (fun (`Block head, `Valid_cb vc) ->
41
                  Mina_metrics.(
×
42
                    Counter.inc_one
×
43
                      Pipe.Drop_on_overflow
44
                      .transition_frontier_valid_transitions) ;
45
                  f_drop_head name head vc ) ) ) )
46
  in
47
  let primary_transition_pipe_capacity =
48
    valid_transition_pipe_capacity + List.length collected_transitions
×
49
  in
50
  (* Ok to drop on overflow- catchup will be triggered if required*)
51
  let primary_transition_reader, primary_transition_writer =
52
    let name = "primary transitions" in
53
    Strict_pipe.create ~name
×
54
      (Buffered
55
         ( `Capacity primary_transition_pipe_capacity
56
         , `Overflow
57
             (Drop_head
58
                (fun (`Block head, `Valid_cb vc) ->
59
                  Mina_metrics.(
×
60
                    Counter.inc_one
×
61
                      Pipe.Drop_on_overflow
62
                      .transition_frontier_primary_transitions) ;
63
                  f_drop_head name head vc ) ) ) )
64
  in
65
  let processed_transition_reader, processed_transition_writer =
66
    Strict_pipe.create ~name:"processed transitions"
67
      (Buffered (`Capacity 30, `Overflow Crash))
68
  in
69
  let catchup_job_reader, catchup_job_writer =
×
70
    Strict_pipe.create ~name:"catchup jobs"
71
      (Buffered (`Capacity 30, `Overflow Crash))
72
  in
73
  let catchup_breadcrumbs_reader, catchup_breadcrumbs_writer =
×
74
    Strict_pipe.create ~name:"catchup breadcrumbs"
75
      (Buffered (`Capacity 30, `Overflow Crash))
76
  in
77
  let unprocessed_transition_cache =
×
78
    Transition_handler.Unprocessed_transition_cache.create ~logger
79
      ~cache_exceptions
80
  in
81
  List.iter collected_transitions ~f:(fun t ->
82
      (* since the cache was just built, it's safe to assume
83
       * registering these will not fail, so long as there
84
       * are no duplicates in the list *)
85
      let block_cached =
×
86
        Transition_handler.Unprocessed_transition_cache.register_exn
87
          unprocessed_transition_cache t
88
      in
89
      Strict_pipe.Writer.write primary_transition_writer
×
90
        (`Block block_cached, `Valid_cb None) ) ;
91
  let initial_state_hashes =
×
92
    List.map collected_transitions ~f:(fun envelope ->
93
        Network_peer.Envelope.Incoming.data envelope
×
94
        |> Validation.block_with_hash
×
95
        |> Mina_base.State_hash.With_state_hashes.state_hash )
×
96
    |> Mina_base.State_hash.Set.of_list
×
97
  in
98
  let extensions = Transition_frontier.extensions frontier in
×
99
  don't_wait_for
×
100
  @@ Pipe_lib.Broadcast_pipe.Reader.iter_until
×
101
       (Transition_frontier.Extensions.get_view_pipe extensions New_breadcrumbs)
×
102
       ~f:(fun new_breadcrumbs ->
103
         let open Mina_base.State_hash in
×
104
         let new_state_hashes =
105
           List.map new_breadcrumbs ~f:Transition_frontier.Breadcrumb.state_hash
106
           |> Set.of_list
×
107
         in
108
         if Set.is_empty @@ Set.inter initial_state_hashes new_state_hashes then
×
109
           Deferred.return false
×
110
         else (
×
111
           Mina_metrics.(
112
             Gauge.set Catchup.initial_catchup_time
×
113
               Time.(Span.to_min @@ diff (now ()) start_time)) ;
×
114
           Deferred.return true ) ) ;
115
  Transition_handler.Validator.run
×
116
    ~context:(module Context)
117
    ~trust_system ~time_controller ~frontier
118
    ~transition_reader:network_transition_reader ~valid_transition_writer
119
    ~unprocessed_transition_cache ;
120
  Strict_pipe.Reader.iter_without_pushback valid_transition_reader
121
    ~f:(fun (`Block b, `Valid_cb vc) ->
122
      Strict_pipe.Writer.write primary_transition_writer (`Block b, `Valid_cb vc) )
×
123
  |> don't_wait_for ;
×
124
  let clean_up_catchup_scheduler = Ivar.create () in
×
125
  Transition_handler.Processor.run
×
126
    ~context:(module Context)
127
    ~time_controller ~trust_system ~verifier ~frontier ~get_completed_work
128
    ~primary_transition_reader ~producer_transition_reader
129
    ~clean_up_catchup_scheduler ~catchup_job_writer ~catchup_breadcrumbs_reader
130
    ~catchup_breadcrumbs_writer ~processed_transition_writer ;
131
  Ledger_catchup.run
132
    ~context:(module Context)
133
    ~trust_system ~verifier ~network ~frontier ~catchup_job_reader
134
    ~catchup_breadcrumbs_writer ~unprocessed_transition_cache ;
135
  upon (Strict_pipe.Reader.read clear_reader) (fun _ ->
×
136
      let open Strict_pipe.Writer in
×
137
      kill valid_transition_writer ;
138
      kill primary_transition_writer ;
×
139
      kill processed_transition_writer ;
×
140
      kill catchup_job_writer ;
×
141
      kill catchup_breadcrumbs_writer ;
×
142
      if Ivar.is_full clean_up_catchup_scheduler then
×
143
        [%log error] "Ivar.fill bug is here!" ;
×
144
      Ivar.fill clean_up_catchup_scheduler () ) ;
×
145
  processed_transition_reader
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc