• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 3409

26 Feb 2025 01:10PM UTC coverage: 32.353% (-28.4%) from 60.756%
3409

push

buildkite

web-flow
Merge pull request #16687 from MinaProtocol/dw/merge-compatible-into-develop-20250225

Merge compatible into develop [20250224]

23144 of 71535 relevant lines covered (32.35%)

16324.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/lib/transition_frontier_controller/transition_frontier_controller.ml
1
open Core_kernel
2
open Async_kernel
3
open Pipe_lib
4
open Mina_block
5

6
module type CONTEXT = sig
7
  val logger : Logger.t
8

9
  val precomputed_values : Precomputed_values.t
10

11
  val constraint_constants : Genesis_constants.Constraint_constants.t
12

13
  val consensus_constants : Consensus.Constants.t
14

15
  val proof_cache_db : Proof_cache_tag.cache_db
16
end
17

18
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
19
    ~time_controller ~collected_transitions ~frontier ~get_completed_work
20
    ~network_transition_reader ~producer_transition_reader ~clear_reader
21
    ~cache_exceptions =
22
  let open Context in
×
23
  let valid_transition_pipe_capacity = 50 in
24
  let start_time = Time.now () in
25
  let f_drop_head name head valid_cb =
×
26
    let hashes =
×
27
      match head with
28
      | `Block b ->
×
29
          With_hash.hash @@ Validation.block_with_hash
×
30
          @@ Network_peer.Envelope.Incoming.data @@ Cache_lib.Cached.peek b
×
31
      | `Header h ->
×
32
          With_hash.hash @@ Validation.header_with_hash
×
33
          @@ Network_peer.Envelope.Incoming.data h
×
34
    in
35
    Mina_block.handle_dropped_transition hashes ?valid_cb ~pipe_name:name
36
      ~logger
37
  in
38
  let valid_transition_reader, valid_transition_writer =
39
    let name = "valid transitions" in
40
    Strict_pipe.create ~name
×
41
      (Buffered
42
         ( `Capacity valid_transition_pipe_capacity
43
         , `Overflow
44
             (Drop_head
45
                (fun (head, `Valid_cb vc) ->
46
                  Mina_metrics.(
×
47
                    Counter.inc_one
×
48
                      Pipe.Drop_on_overflow
49
                      .transition_frontier_valid_transitions) ;
50
                  f_drop_head name head vc ) ) ) )
51
  in
52
  let primary_transition_pipe_capacity =
53
    valid_transition_pipe_capacity + List.length collected_transitions
×
54
  in
55
  (* Ok to drop on overflow- catchup will be triggered if required*)
56
  let primary_transition_reader, primary_transition_writer =
57
    let name = "primary transitions" in
58
    Strict_pipe.create ~name
×
59
      (Buffered
60
         ( `Capacity primary_transition_pipe_capacity
61
         , `Overflow
62
             (Drop_head
63
                (fun (head, `Valid_cb vc) ->
64
                  Mina_metrics.(
×
65
                    Counter.inc_one
×
66
                      Pipe.Drop_on_overflow
67
                      .transition_frontier_primary_transitions) ;
68
                  f_drop_head name head vc ) ) ) )
69
  in
70
  let processed_transition_reader, processed_transition_writer =
71
    Strict_pipe.create ~name:"processed transitions"
72
      (Buffered (`Capacity 30, `Overflow Crash))
73
  in
74
  let catchup_job_reader, catchup_job_writer =
×
75
    Strict_pipe.create ~name:"catchup jobs"
76
      (Buffered (`Capacity 30, `Overflow Crash))
77
  in
78
  let catchup_breadcrumbs_reader, catchup_breadcrumbs_writer =
×
79
    Strict_pipe.create ~name:"catchup breadcrumbs"
80
      (Buffered (`Capacity 30, `Overflow Crash))
81
  in
82
  let unprocessed_transition_cache =
×
83
    Transition_handler.Unprocessed_transition_cache.create ~logger
84
      ~cache_exceptions
85
  in
86
  List.iter collected_transitions ~f:(fun (t, vc) ->
87
      let b_or_h =
×
88
        match Network_peer.Envelope.Incoming.data t with
89
        | `Block b ->
×
90
            (* since the cache was just built, it's safe to assume
91
             * registering these will not fail, so long as there
92
             * are no duplicates in the list *)
93
            `Block
94
              ( Transition_handler.Unprocessed_transition_cache.register_exn
95
                  unprocessed_transition_cache
×
96
              @@ Network_peer.Envelope.Incoming.map ~f:(const b) t )
×
97
        | `Header h ->
×
98
            `Header (Network_peer.Envelope.Incoming.map ~f:(const h) t)
×
99
      in
100
      Strict_pipe.Writer.write primary_transition_writer (b_or_h, `Valid_cb vc) ) ;
101
  let initial_state_hashes =
×
102
    List.map collected_transitions ~f:(fun (envelope, _) ->
103
        Network_peer.Envelope.Incoming.data envelope
×
104
        |> Bootstrap_controller.Transition_cache.header_with_hash
×
105
        |> Mina_base.State_hash.With_state_hashes.state_hash )
×
106
    |> Mina_base.State_hash.Set.of_list
×
107
  in
108
  let extensions = Transition_frontier.extensions frontier in
×
109
  don't_wait_for
×
110
  @@ Pipe_lib.Broadcast_pipe.Reader.iter_until
×
111
       (Transition_frontier.Extensions.get_view_pipe extensions New_breadcrumbs)
×
112
       ~f:(fun new_breadcrumbs ->
113
         let open Mina_base.State_hash in
×
114
         let new_state_hashes =
115
           List.map new_breadcrumbs ~f:Transition_frontier.Breadcrumb.state_hash
116
           |> Set.of_list
×
117
         in
118
         if Set.is_empty @@ Set.inter initial_state_hashes new_state_hashes then
×
119
           Deferred.return false
×
120
         else (
×
121
           Mina_metrics.(
122
             Gauge.set Catchup.initial_catchup_time
×
123
               Time.(Span.to_min @@ diff (now ()) start_time)) ;
×
124
           Deferred.return true ) ) ;
125
  Transition_handler.Validator.run
×
126
    ~context:(module Context)
127
    ~trust_system ~time_controller ~frontier
128
    ~transition_reader:network_transition_reader ~valid_transition_writer
129
    ~unprocessed_transition_cache ;
130
  Strict_pipe.Reader.iter_without_pushback valid_transition_reader
131
    ~f:(Strict_pipe.Writer.write primary_transition_writer)
×
132
  |> don't_wait_for ;
×
133
  let clean_up_catchup_scheduler = Ivar.create () in
×
134
  Transition_handler.Processor.run
×
135
    ~context:(module Context)
136
    ~time_controller ~trust_system ~verifier ~frontier ~get_completed_work
137
    ~primary_transition_reader ~producer_transition_reader
138
    ~clean_up_catchup_scheduler ~catchup_job_writer ~catchup_breadcrumbs_reader
139
    ~catchup_breadcrumbs_writer ~processed_transition_writer ;
140
  Ledger_catchup.run
141
    ~context:(module Context)
142
    ~trust_system ~verifier ~network ~frontier ~catchup_job_reader
143
    ~catchup_breadcrumbs_writer ~unprocessed_transition_cache ;
144
  upon (Strict_pipe.Reader.read clear_reader) (fun _ ->
×
145
      let open Strict_pipe.Writer in
×
146
      kill valid_transition_writer ;
147
      kill primary_transition_writer ;
×
148
      kill processed_transition_writer ;
×
149
      kill catchup_job_writer ;
×
150
      kill catchup_breadcrumbs_writer ;
×
151
      if Ivar.is_full clean_up_catchup_scheduler then
×
152
        [%log error] "Ivar.fill bug is here!" ;
×
153
      Ivar.fill clean_up_catchup_scheduler () ) ;
×
154
  processed_transition_reader
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc