• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 3311

08 Feb 2025 09:10PM UTC coverage: 36.017% (-24.8%) from 60.828%
3311

push

buildkite

web-flow
Merge pull request #16591 from MinaProtocol/feature/stop-unknown-stream_idx

Stop sending data on libp2p streams after the first error

7 of 14 new or added lines in 1 file covered. (50.0%)

16388 existing lines in 340 files now uncovered.

25649 of 71214 relevant lines covered (36.02%)

26723.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.18
/src/lib/sync_handler/sync_handler.ml
1
open Core_kernel
5✔
2
open Async
3
open Mina_base
4
module Ledger = Mina_ledger.Ledger
5
module Sync_ledger = Mina_ledger.Sync_ledger
6
open Frontier_base
7
open Network_peer
8

9
module type CONTEXT = sig
10
  val logger : Logger.t
11

12
  val precomputed_values : Precomputed_values.t
13

14
  val constraint_constants : Genesis_constants.Constraint_constants.t
15

16
  val consensus_constants : Consensus.Constants.t
17
end
18

19
module type Inputs_intf = sig
20
  module Transition_frontier : module type of Transition_frontier
21

22
  module Best_tip_prover :
23
    Mina_intf.Best_tip_prover_intf
24
      with type transition_frontier := Transition_frontier.t
25
end
26

27
module Make (Inputs : Inputs_intf) :
28
  Mina_intf.Sync_handler_intf
29
    with type transition_frontier := Inputs.Transition_frontier.t = struct
30
  open Inputs
31

32
  let find_in_root_history frontier state_hash =
UNCOV
33
    let open Transition_frontier.Extensions in
×
34
    let root_history =
UNCOV
35
      get_extension (Transition_frontier.extensions frontier) Root_history
×
36
    in
UNCOV
37
    Root_history.lookup root_history state_hash
×
38

39
  let protocol_states_in_root_history frontier state_hash =
40
    let open Transition_frontier.Extensions in
×
41
    let root_history =
42
      get_extension (Transition_frontier.extensions frontier) Root_history
×
43
    in
44
    Root_history.protocol_states_for_scan_state root_history state_hash
×
45

46
  let get_ledger_by_hash ~frontier ledger_hash =
47
    let root_ledger =
23✔
48
      Ledger.Any_ledger.cast (module Ledger.Db)
49
      @@ Transition_frontier.root_snarked_ledger frontier
23✔
50
    in
51
    let staking_epoch_ledger =
23✔
52
      Transition_frontier.consensus_local_state frontier
23✔
53
      |> Consensus.Data.Local_state.staking_epoch_ledger
54
    in
55
    let next_epoch_ledger =
23✔
56
      Transition_frontier.consensus_local_state frontier
23✔
57
      |> Consensus.Data.Local_state.next_epoch_ledger
58
    in
59
    if
23✔
60
      Ledger_hash.equal ledger_hash
61
        (Ledger.Any_ledger.M.merkle_root root_ledger)
23✔
UNCOV
62
    then Some root_ledger
×
63
    else if
23✔
64
      Ledger_hash.equal ledger_hash
65
        (Consensus.Data.Local_state.Snapshot.Ledger_snapshot.merkle_root
23✔
66
           staking_epoch_ledger )
67
    then
68
      match staking_epoch_ledger with
11✔
69
      | Consensus.Data.Local_state.Snapshot.Ledger_snapshot.Genesis_epoch_ledger
1✔
70
          _ ->
71
          None
72
      | Ledger_db ledger ->
10✔
73
          Some (Ledger.Any_ledger.cast (module Ledger.Db) ledger)
10✔
74
    else if
12✔
75
      Ledger_hash.equal ledger_hash
76
        (Consensus.Data.Local_state.Snapshot.Ledger_snapshot.merkle_root
12✔
77
           next_epoch_ledger )
78
    then
79
      match next_epoch_ledger with
12✔
80
      | Consensus.Data.Local_state.Snapshot.Ledger_snapshot.Genesis_epoch_ledger
×
81
          _ ->
82
          None
83
      | Ledger_db ledger ->
12✔
84
          Some (Ledger.Any_ledger.cast (module Ledger.Db) ledger)
12✔
85
    else None
×
86

87
  let answer_query :
88
         frontier:Inputs.Transition_frontier.t
89
      -> Ledger_hash.t
90
      -> Sync_ledger.Query.t Envelope.Incoming.t
91
      -> context:(module CONTEXT)
92
      -> trust_system:Trust_system.t
93
      -> Sync_ledger.Answer.t Or_error.t Deferred.t =
94
   fun ~frontier hash query ~context:(module Context) ~trust_system ->
95
    let (module C : Syncable_ledger.CONTEXT) =
23✔
96
      ( module struct
97
        let logger = Context.logger
98

99
        let compile_config = Context.precomputed_values.compile_config
100
      end )
101
    in
102
    match get_ledger_by_hash ~frontier hash with
103
    | None ->
1✔
104
        return
105
          (Or_error.error_string
1✔
106
             (sprintf
1✔
107
                !"Failed to find ledger for hash %{sexp:Ledger_hash.t}"
1✔
108
                hash ) )
109
    | Some ledger ->
22✔
110
        let responder =
111
          Sync_ledger.Any_ledger.Responder.create ledger ignore
112
            ~context:(module C)
113
            ~trust_system
114
        in
115
        Sync_ledger.Any_ledger.Responder.answer_query responder query
22✔
116

117
  let get_staged_ledger_aux_and_pending_coinbases_at_hash ~logger ~frontier
118
      state_hash =
UNCOV
119
    let open Option.Let_syntax in
×
120
    let protocol_states scan_state =
UNCOV
121
      Staged_ledger.Scan_state.required_state_hashes scan_state
×
UNCOV
122
      |> State_hash.Set.to_list
×
123
      |> List.fold_until ~init:(Some [])
124
           ~f:(fun acc hash ->
UNCOV
125
             match
×
126
               Option.map2
UNCOV
127
                 (Transition_frontier.find_protocol_state frontier hash)
×
128
                 acc ~f:List.cons
129
             with
130
             | None ->
×
131
                 Stop None
UNCOV
132
             | Some acc' ->
×
133
                 Continue (Some acc') )
134
           ~finish:Fn.id
135
    in
136
    match
UNCOV
137
      let%bind breadcrumb = Transition_frontier.find frontier state_hash in
×
UNCOV
138
      let staged_ledger =
×
139
        Transition_frontier.Breadcrumb.staged_ledger breadcrumb
140
      in
UNCOV
141
      let scan_state = Staged_ledger.scan_state staged_ledger in
×
UNCOV
142
      let staged_ledger_hash = Breadcrumb.staged_ledger_hash breadcrumb in
×
UNCOV
143
      let merkle_root = Staged_ledger_hash.ledger_hash staged_ledger_hash in
×
UNCOV
144
      let%map scan_state_protocol_states = protocol_states scan_state in
×
UNCOV
145
      let pending_coinbase =
×
146
        Staged_ledger.pending_coinbase_collection staged_ledger
147
      in
UNCOV
148
      [%log debug]
×
149
        ~metadata:
150
          [ ( "staged_ledger_hash"
UNCOV
151
            , Staged_ledger_hash.to_yojson staged_ledger_hash )
×
152
          ]
153
        "sending scan state and pending coinbase" ;
UNCOV
154
      (scan_state, merkle_root, pending_coinbase, scan_state_protocol_states)
×
155
    with
UNCOV
156
    | Some res ->
×
157
        Some res
158
    | None ->
×
159
        let open Root_data.Historical in
160
        let%bind root = find_in_root_history frontier state_hash in
×
161
        let%map scan_state_protocol_states =
162
          protocol_states_in_root_history frontier state_hash
×
163
        in
164
        ( scan_state root
×
165
        , staged_ledger_target_ledger_hash root
×
166
        , pending_coinbase root
×
167
        , scan_state_protocol_states )
168

169
  let get_transition_chain ~frontier hashes =
UNCOV
170
    let open Option.Let_syntax in
×
171
    let%bind () =
172
      let requested = List.length hashes in
UNCOV
173
      if requested <= Transition_frontier.max_catchup_chunk_length then Some ()
×
174
      else (
×
175
        [%log' trace (Logger.create ())]
×
176
          ~metadata:[ ("n", `Int requested) ]
177
          "get_transition_chain requested $n > %d hashes"
178
          Transition_frontier.max_catchup_chunk_length ;
179
        None )
×
180
    in
UNCOV
181
    let get hash =
×
182
      let%map validated_transition =
UNCOV
183
        Option.merge
×
184
          Transition_frontier.(
UNCOV
185
            find frontier hash >>| Breadcrumb.validated_transition)
×
UNCOV
186
          ( find_in_root_history frontier hash
×
UNCOV
187
          >>| Root_data.Historical.transition )
×
188
          ~f:Fn.const
189
      in
UNCOV
190
      With_hash.data @@ Mina_block.Validated.forget validated_transition
×
191
    in
192
    match Transition_frontier.catchup_state frontier with
UNCOV
193
    | Full _ ->
×
194
        (* Super catchup *)
UNCOV
195
        Option.return @@ List.filter_map hashes ~f:get
×
UNCOV
196
    | Hash _ ->
×
197
        (* Normal catchup *)
UNCOV
198
        Option.all @@ List.map hashes ~f:get
×
199

200
  let best_tip_path ~frontier =
201
    let rec go acc b =
×
202
      let acc = Breadcrumb.state_hash b :: acc in
×
203
      match Transition_frontier.find frontier (Breadcrumb.parent_hash b) with
×
204
      | None ->
×
205
          acc
206
      | Some b' ->
×
207
          go acc b'
208
    in
209
    go [] (Transition_frontier.best_tip frontier)
×
210

211
  module Root = struct
212
    let prove ~context:(module Context : CONTEXT) ~frontier seen_consensus_state
213
        =
UNCOV
214
      let module Context = struct
×
215
        include Context
216

217
        let compile_config = precomputed_values.compile_config
218

219
        let logger =
UNCOV
220
          Logger.extend logger [ ("selection_context", `String "Root.prove") ]
×
221
      end in
222
      let open Option.Let_syntax in
223
      let%bind best_tip_with_witness =
UNCOV
224
        Best_tip_prover.prove ~context:(module Context) frontier
×
225
      in
UNCOV
226
      let is_tip_better =
×
227
        Consensus.Hooks.equal_select_status
228
          (Consensus.Hooks.select
229
             ~context:(module Context)
230
             ~existing:
UNCOV
231
               (With_hash.map ~f:Mina_block.consensus_state
×
232
                  best_tip_with_witness.data )
233
             ~candidate:seen_consensus_state )
234
          `Keep
235
      in
UNCOV
236
      let%map () = Option.some_if is_tip_better () in
×
UNCOV
237
      { best_tip_with_witness with
×
UNCOV
238
        data = With_hash.data best_tip_with_witness.data
×
239
      }
240

241
    let verify ~context:(module Context : CONTEXT) ~verifier observed_state
242
        peer_root =
UNCOV
243
      let module Context = struct
×
244
        include Context
245

246
        let compile_config = precomputed_values.compile_config
247

248
        let logger =
UNCOV
249
          Logger.extend logger [ ("selection_context", `String "Root.verify") ]
×
250
      end in
251
      let open Context in
252
      let open Deferred.Result.Let_syntax in
253
      (*TODO: use precomputed_values.genesis_constants that's already passed*)
254
      let%bind ( (`Root _, `Best_tip (best_tip_transition, _)) as
255
               verified_witness ) =
UNCOV
256
        Best_tip_prover.verify ~verifier
×
257
          ~genesis_constants:precomputed_values.genesis_constants
258
          ~precomputed_values peer_root
259
      in
UNCOV
260
      let is_before_best_tip candidate =
×
UNCOV
261
        Consensus.Hooks.equal_select_status
×
262
          (Consensus.Hooks.select
263
             ~context:(module Context)
264
             ~existing:
UNCOV
265
               (With_hash.map ~f:Mina_block.consensus_state best_tip_transition)
×
266
             ~candidate )
267
          `Keep
268
      in
269
      let%map () =
UNCOV
270
        Deferred.return
×
UNCOV
271
          (Result.ok_if_true
×
UNCOV
272
             (is_before_best_tip observed_state)
×
273
             ~error:
UNCOV
274
               (Error.createf
×
UNCOV
275
                  !"Peer lied about it's best tip %{sexp:State_hash.t}"
×
UNCOV
276
                  (State_hash.With_state_hashes.state_hash best_tip_transition) ) )
×
277
      in
UNCOV
278
      verified_witness
×
279
  end
280
end
281

282
include Make (struct
283
  module Transition_frontier = Transition_frontier
284
  module Best_tip_prover = Best_tip_prover
285
end)
10✔
286

287
(* TODO: port these tests *)
288
(*
289
let%test_module "Sync_handler" =
290
  ( module struct
291
    let logger = Logger.null ()
292

293
    let hb_logger = Logger.create ()
294

295
    let pids = Child_processes.Termination.create_pid_table ()
296

297
    let trust_system = Trust_system.null ()
298

299
    let f_with_verifier ~f ~logger ~pids =
300
      let%map verifier = Verifier.create ~logger ~pids in
301
      f ~logger ~verifier
302

303
    let%test "sync with ledgers from another peer via glue_sync_ledger" =
304
      Backtrace.elide := false ;
305
      Printexc.record_backtrace true ;
306
      heartbeat_flag := true ;
307
      Ledger.with_ephemeral_ledger ~f:(fun dest_ledger ->
308
          Thread_safe.block_on_async_exn (fun () ->
309
              print_heartbeat hb_logger |> don't_wait_for ;
310
              let%bind frontier =
311
                create_root_frontier ~logger ~pids Test_genesis_ledger.accounts
312
              in
313
              let source_ledger =
314
                Transition_frontier.For_tests.root_snarked_ledger frontier
315
                |> Ledger.of_database
316
              in
317
              let desired_root = Ledger.merkle_root source_ledger in
318
              let sync_ledger =
319
                Sync_ledger.Mask.create dest_ledger ~logger ~trust_system
320
              in
321
              let query_reader = Sync_ledger.Mask.query_reader sync_ledger in
322
              let answer_writer = Sync_ledger.Mask.answer_writer sync_ledger in
323
              let peer =
324
                Network_peer.Peer.create Unix.Inet_addr.localhost
325
                  ~discovery_port:0 ~communication_port:1
326
              in
327
              let network =
328
                Network.create_stub ~logger
329
                  ~ip_table:
330
                    (Hashtbl.of_alist_exn
331
                       (module Unix.Inet_addr)
332
                       [(peer.host, frontier)])
333
                  ~peers:(Hash_set.of_list (module Network_peer.Peer) [peer])
334
              in
335
              Network.glue_sync_ledger network query_reader answer_writer ;
336
              match%map
337
                Sync_ledger.Mask.fetch sync_ledger desired_root ~data:()
338
                  ~equal:(fun () () -> true)
339
              with
340
              | `Ok synced_ledger ->
341
                  heartbeat_flag := false ;
342
                  Ledger_hash.equal
343
                    (Ledger.merkle_root dest_ledger)
344
                    (Ledger.merkle_root source_ledger)
345
                  && Ledger_hash.equal
346
                       (Ledger.merkle_root synced_ledger)
347
                       (Ledger.merkle_root source_ledger)
348
              | `Target_changed _ ->
349
                  heartbeat_flag := false ;
350
                  failwith "target of sync_ledger should not change" ) )
351

352
    let to_external_transition breadcrumb =
353
      Transition_frontier.Breadcrumb.validated_transition breadcrumb
354
      |> Mina_block.Validated.forget
355

356
    let%test "a node should be able to give a valid proof of their root" =
357
      heartbeat_flag := true ;
358
      let max_length = 4 in
359
      (* Generating this many breadcrumbs will ernsure the transition_frontier to be full  *)
360
      let num_breadcrumbs = max_length + 2 in
361
      Thread_safe.block_on_async_exn (fun () ->
362
          print_heartbeat hb_logger |> don't_wait_for ;
363
          let%bind frontier =
364
            create_root_frontier ~logger ~pids Test_genesis_ledger.accounts
365
          in
366
          let%bind () =
367
            build_frontier_randomly frontier
368
              ~gen_root_breadcrumb_builder:
369
                (gen_linear_breadcrumbs ~logger ~pids ~trust_system
370
                   ~size:num_breadcrumbs
371
                   ~accounts_with_secret_keys:Test_genesis_ledger.accounts)
372
          in
373
          let seen_transition =
374
            Transition_frontier.(
375
              all_breadcrumbs frontier |> List.permute |> List.hd_exn
376
              |> Breadcrumb.validated_transition)
377
          in
378
          let observed_state =
379
            Mina_block.Validated.protocol_state seen_transition
380
            |> Protocol_state.consensus_state
381
          in
382
          let root_with_proof =
383
            Option.value_exn ~message:"Could not produce an ancestor proof"
384
              (Sync_handler.Root.prove ~logger ~frontier observed_state)
385
          in
386
          let%bind verify =
387
            f_with_verifier ~f:Sync_handler.Root.verify ~logger ~pids
388
          in
389
          let%map `Root (root_transition, _), `Best_tip (best_tip_transition, _)
390
              =
391
            verify observed_state root_with_proof |> Deferred.Or_error.ok_exn
392
          in
393
          heartbeat_flag := false ;
394
          Mina_block.(
395
            equal
396
              (With_hash.data root_transition)
397
              (to_external_transition (Transition_frontier.root frontier))
398
            && equal
399
                 (With_hash.data best_tip_transition)
400
                 (to_external_transition
401
                    (Transition_frontier.best_tip frontier))) )
402
  end )
403
*)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc