• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 678

09 Oct 2025 05:29PM UTC coverage: 32.327% (-4.8%) from 37.108%
678

push

buildkite

web-flow
Merge pull request #17927 from MinaProtocol/dkijana/port_publish_fix_master_dev

port publish fix master to dev

23380 of 72324 relevant lines covered (32.33%)

23979.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.56
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
112✔
2
open Core
3
open Async
4
open Mina_base
5
open Mina_state
6
open Pipe_lib.Strict_pipe
7
open Network_peer
8
open Mina_stdlib
9
module Ledger = Mina_ledger.Ledger
10
module Sync_ledger = Mina_ledger.Sync_ledger
11
module Transition_cache = Transition_cache
12

13
module type CONTEXT = sig
14
  val logger : Logger.t
15

16
  val precomputed_values : Precomputed_values.t
17

18
  val constraint_constants : Genesis_constants.Constraint_constants.t
19

20
  val consensus_constants : Consensus.Constants.t
21

22
  val ledger_sync_config : Syncable_ledger.daemon_config
23

24
  val proof_cache_db : Proof_cache_tag.cache_db
25

26
  val signature_kind : Mina_signature_kind.t
27
end
28

29
type Structured_log_events.t += Bootstrap_complete
30
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
×
31

32
type t =
33
  { context : (module CONTEXT)
34
  ; trust_system : Trust_system.t
35
  ; verifier : Verifier.t
36
  ; mutable best_seen_transition : Mina_block.initial_valid_block
37
  ; mutable current_root : Mina_block.initial_valid_block
38
  ; network : Mina_networking.t
39
  ; mutable num_of_root_snarked_ledger_retargeted : int
40
  }
41

42
(** An auxiliary data structure for collecting various metrics for bootstrap controller. *)
43
type bootstrap_cycle_stats =
×
44
  { cycle_result : string
×
45
  ; sync_ledger_time : Time.Span.t
×
46
  ; staged_ledger_data_download_time : Time.Span.t
×
47
  ; staged_ledger_construction_time : Time.Span.t option
×
48
  ; local_state_sync_required : bool
×
49
  ; local_state_sync_time : Time.Span.t option
×
50
  }
51
[@@deriving to_yojson]
52

53
let time_deferred deferred =
54
  let start_time = Time.now () in
×
55
  let%map result = deferred in
56
  let end_time = Time.now () in
×
57
  (Time.diff end_time start_time, result)
×
58

59
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
60
  let module Consensus_context = struct
×
61
    include Context
62

63
    let logger =
64
      Logger.extend logger
×
65
        [ ( "selection_context"
66
          , `String "Bootstrap_controller.worth_getting_root" )
67
        ]
68
  end in
69
  Consensus.Hooks.equal_select_status `Take
70
  @@ Consensus.Hooks.select
71
       ~context:(module Consensus_context)
72
       ~existing:
73
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
74
         |> With_hash.map ~f:Mina_block.consensus_state )
×
75
       ~candidate
76

77
let received_bad_proof ({ context = (module Context); _ } as t) host e =
78
  let open Context in
×
79
  Trust_system.(
80
    record t.trust_system logger host
81
      Actions.
82
        ( Violated_protocol
83
        , Some
84
            ( "Bad ancestor proof: $error"
85
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
86

87
let done_syncing_root root_sync_ledger =
88
  Option.is_some (Sync_ledger.Root.peek_valid_tree root_sync_ledger)
×
89

90
let should_sync ~root_sync_ledger t candidate_state =
91
  (not @@ done_syncing_root root_sync_ledger)
×
92
  && worth_getting_root t candidate_state
×
93

94
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
95
let start_sync_job_with_peer ~sender ~root_sync_ledger
96
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
97
  let open Context in
×
98
  let%bind () =
99
    Trust_system.(
100
      record t.trust_system logger sender
×
101
        Actions.
102
          ( Fulfilled_request
103
          , Some ("Received verified peer root and best tip", []) ))
104
  in
105
  t.best_seen_transition <- peer_best_tip ;
×
106
  t.current_root <- peer_root ;
107
  let blockchain_state =
108
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
109
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
110
  in
111
  let expected_staged_ledger_hash =
×
112
    blockchain_state |> Blockchain_state.staged_ledger_hash
113
  in
114
  let snarked_ledger_hash =
×
115
    blockchain_state |> Blockchain_state.snarked_ledger_hash
116
  in
117
  return
×
118
  @@
119
  match
120
    Sync_ledger.Root.new_goal root_sync_ledger
121
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
122
      ~data:
123
        ( State_hash.With_state_hashes.state_hash
×
124
          @@ Mina_block.Validation.block_with_hash t.current_root
×
125
        , sender
126
        , expected_staged_ledger_hash )
127
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
128
  with
129
  | `New ->
×
130
      t.num_of_root_snarked_ledger_retargeted <-
131
        t.num_of_root_snarked_ledger_retargeted + 1 ;
132
      `Syncing_new_snarked_ledger
133
  | `Update_data ->
×
134
      `Updating_root_transition
135
  | `Repeat ->
×
136
      `Ignored
137

138
let to_consensus_state h =
139
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
140

141
(** For each transition, this function would compare it with the existing one.
142
    If the incoming transition is better, then download the merkle list from
143
    that transition to its root and verify it. If we get a better root than
144
    the existing one, then reset the Sync_ledger's target by calling
145
    [start_sync_job_with_peer] function. *)
146
let on_transition ({ context = (module Context); _ } as t) ~sender
147
    ~root_sync_ledger candidate_header =
148
  let open Context in
×
149
  let candidate_consensus_state =
150
    With_hash.map ~f:to_consensus_state candidate_header
151
  in
152
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
153
    Deferred.return `Ignored
×
154
  else
155
    match%bind
156
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
157
        (With_hash.map_hash candidate_consensus_state
×
158
           ~f:State_hash.State_hashes.state_hash )
159
    with
160
    | Error e ->
×
161
        [%log error]
×
162
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
163
          !"Could not get the proof of the root transition from the network: \
164
            $error" ;
165
        Deferred.return `Ignored
×
166
    | Ok peer_root_with_proof -> (
×
167
        let pcd =
168
          peer_root_with_proof.data
169
          |> Proof_carrying_data.map
×
170
               ~f:
171
                 (Mina_block.write_all_proofs_to_disk ~signature_kind
172
                    ~proof_cache_db )
173
          |> Proof_carrying_data.map_proof
174
               ~f:
175
                 (Tuple2.map_snd
176
                    ~f:
177
                      (Mina_block.write_all_proofs_to_disk ~signature_kind
178
                         ~proof_cache_db ) )
179
        in
180
        match%bind
181
          Mina_block.verify_on_header
×
182
            ~verify:
183
              (Sync_handler.Root.verify
×
184
                 ~context:(module Context)
185
                 ~verifier:t.verifier candidate_consensus_state )
186
            pcd
187
        with
188
        | Ok (`Root root, `Best_tip best_tip) ->
×
189
            if done_syncing_root root_sync_ledger then return `Ignored
×
190
            else
191
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
192
        | Error e ->
×
193
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
194

195
(** A helper function that wraps the calls to Sync_ledger and iterate through
196
    incoming transitions, add those to the transition_cache and calls
197
    [on_transition] function. *)
198
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
199
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader =
200
  let open Context in
×
201
  let query_reader = Sync_ledger.Root.query_reader root_sync_ledger in
202
  let response_writer = Sync_ledger.Root.answer_writer root_sync_ledger in
×
203
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
204
    response_writer ;
205
  Pipe_lib.Choosable_synchronous_pipe.iter sync_ledger_reader
×
206
    ~f:(fun (b_or_h, `Valid_cb vc) ->
207
      let header_with_hash, sender, transition_cache_element =
×
208
        match b_or_h with
209
        | `Block b_env ->
×
210
            ( Envelope.Incoming.data b_env
×
211
              |> Mina_block.Validation.block_with_hash
×
212
              |> With_hash.map ~f:Mina_block.header
×
213
            , Envelope.Incoming.remote_sender_exn b_env
×
214
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
215
        | `Header h_env ->
×
216
            ( Envelope.Incoming.data h_env
×
217
              |> Mina_block.Validation.header_with_hash
×
218
            , Envelope.Incoming.remote_sender_exn h_env
×
219
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
220
      in
221
      let previous_state_hash =
222
        With_hash.data header_with_hash
×
223
        |> Mina_block.Header.protocol_state
×
224
        |> Protocol_state.previous_state_hash
225
      in
226
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
227
        (transition_cache_element, vc) ;
228
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
229
      if
×
230
        worth_getting_root t
231
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
232
      then (
×
233
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
234
          ~metadata:
235
            [ ( "state_hash"
236
              , State_hash.to_yojson
×
237
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
238
            ; ( "header"
239
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
240
            ] ;
241

242
        Deferred.ignore_m
×
243
        @@ on_transition t ~sender ~root_sync_ledger header_with_hash )
×
244
      else Deferred.unit )
×
245

246
let external_transition_compare ~context:(module Context : CONTEXT) =
247
  let get_consensus_state =
×
248
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
249
  in
250
  Comparable.lift
×
251
    (fun existing candidate ->
252
      (* To prevent the logger to spam a lot of messages, the logger input is set to null *)
253
      if
×
254
        State_hash.equal
255
          (State_hash.With_state_hashes.state_hash existing)
×
256
          (State_hash.With_state_hashes.state_hash candidate)
×
257
      then 0
×
258
      else if
×
259
        Consensus.Hooks.equal_select_status `Keep
260
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
261
      then -1
×
262
      else 1 )
×
263
    ~f:(With_hash.map ~f:get_consensus_state)
264

265
let download_snarked_ledger ~trust_system ~preferred_peers ~transition_graph
266
    ~sync_ledger_reader ~context t temp_snarked_ledger =
267
  time_deferred
×
268
    (let root_sync_ledger =
269
       Sync_ledger.Root.create temp_snarked_ledger ~context ~trust_system
270
     in
271
     don't_wait_for
×
272
       (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
273
          ~transition_graph ~sync_ledger_reader ) ;
274
     (* We ignore the resulting ledger returned here since it will always
275
        * be the same as the ledger we started with because we are syncing
276
        * a db ledger. *)
277
     let%map _, data = Sync_ledger.Root.valid_tree root_sync_ledger in
×
278
     Sync_ledger.Root.destroy root_sync_ledger ;
×
279
     data )
×
280

281
(** Run one bootstrap cycle *)
282
let run_cycle ~context:(module Context : CONTEXT) ~trust_system ~verifier
283
    ~network ~consensus_local_state ~network_transition_pipe ~preferred_peers
284
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
285
    previous_cycles =
286
  let open Context in
×
287
  (* The short-lived pipe allocated here will be closed
288
     when a follow-up pipe is allocated: in the next cycle of bootstrap
289
     or when controll is passed to the transition frontier controller.staged_ledger_construction_time
290
     Because [Choosable_synchronous_pipe.t] is used, no data will be lost: any
291
     successful read is followed by mom-blocking handling, and if the read/write
292
     pair is not consumed, it will continue in a follow-up pipe allocated in the
293
     next call to [Swappable.swap_reader].
294
  *)
295
  let%bind sync_ledger_reader = Swappable.swap_reader network_transition_pipe in
×
296
  let initial_root_transition =
×
297
    initial_root_transition |> Mina_block.Validated.remember
×
298
    |> Mina_block.Validation.reset_frontier_dependencies_validation
×
299
    |> Mina_block.Validation.reset_staged_ledger_diff_validation
300
  in
301
  let t =
×
302
    { network
303
    ; context = (module Context)
304
    ; trust_system
305
    ; verifier
306
    ; best_seen_transition = initial_root_transition
307
    ; current_root = initial_root_transition
308
    ; num_of_root_snarked_ledger_retargeted = 0
309
    }
310
  in
311
  let transition_graph = Transition_cache.create () in
312
  let temp_persistent_root_instance =
×
313
    Transition_frontier.Persistent_root.create_instance_exn persistent_root
314
  in
315
  let temp_snarked_ledger =
×
316
    Transition_frontier.Persistent_root.Instance.snarked_ledger
317
      temp_persistent_root_instance
318
  in
319
  (* step 1. download snarked_ledger *)
320
  let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
321
    download_snarked_ledger
×
322
      ~context:(module Context)
323
      ~trust_system ~preferred_peers ~transition_graph ~sync_ledger_reader t
324
      temp_snarked_ledger
325
  in
326
  Mina_metrics.(
×
327
    Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
328
      Time.Span.(to_ms sync_ledger_time)) ;
×
329
  Mina_metrics.(
330
    Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
331
      (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
332
  (* step 2. Download scan state and pending coinbases. *)
333
  let%bind ( staged_ledger_data_download_time
334
           , staged_ledger_construction_time
335
           , staged_ledger_aux_result ) =
336
    let%bind ( staged_ledger_data_download_time
337
             , staged_ledger_data_download_result ) =
338
      time_deferred
×
339
        (Mina_networking.get_staged_ledger_aux_and_pending_coinbases_at_hash
×
340
           t.network sender.peer_id hash )
341
    in
342
    match staged_ledger_data_download_result with
×
343
    | Error err ->
×
344
        Deferred.return (staged_ledger_data_download_time, None, Error err)
345
    | Ok
×
346
        ( scan_state_uncached
347
        , expected_merkle_root
348
        , pending_coinbases
349
        , protocol_states ) -> (
350
        let%map staged_ledger_construction_result =
351
          O1trace.thread "construct_root_staged_ledger" (fun () ->
×
352
              let open Deferred.Or_error.Let_syntax in
×
353
              let received_staged_ledger_hash =
354
                Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
355
                  (Staged_ledger.Scan_state.Stable.Latest.hash
×
356
                     scan_state_uncached )
357
                  expected_merkle_root pending_coinbases
358
              in
359
              [%log debug]
×
360
                ~metadata:
361
                  [ ( "expected_staged_ledger_hash"
362
                    , Staged_ledger_hash.to_yojson expected_staged_ledger_hash
×
363
                    )
364
                  ; ( "received_staged_ledger_hash"
365
                    , Staged_ledger_hash.to_yojson received_staged_ledger_hash
×
366
                    )
367
                  ]
368
                "Comparing $expected_staged_ledger_hash to \
369
                 $received_staged_ledger_hash" ;
370
              let%bind new_root =
371
                t.current_root
372
                |> Mina_block.Validation.skip_frontier_dependencies_validation
×
373
                     `This_block_belongs_to_a_detached_subtree
374
                |> Mina_block.Validation.validate_staged_ledger_hash
×
375
                     (`Staged_ledger_already_materialized
376
                       received_staged_ledger_hash )
377
                |> Result.map_error ~f:(fun _ ->
×
378
                       Error.of_string "received faulty scan state from peer" )
×
379
                |> Deferred.return
×
380
              in
381
              let protocol_states =
×
382
                List.map protocol_states
383
                  ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
384
              in
385
              let scan_state =
×
386
                Staged_ledger.Scan_state.write_all_proofs_to_disk
387
                  ~signature_kind ~proof_cache_db scan_state_uncached
388
              in
389
              let%bind protocol_states =
390
                Staged_ledger.Scan_state.check_required_protocol_states
×
391
                  scan_state ~protocol_states
392
                |> Deferred.return
×
393
              in
394
              let protocol_states_map =
×
395
                protocol_states
396
                |> List.map ~f:(fun ps ->
×
397
                       (State_hash.With_state_hashes.state_hash ps, ps) )
×
398
                |> State_hash.Map.of_alist_exn
399
              in
400
              let get_state hash =
×
401
                match Map.find protocol_states_map hash with
×
402
                | None ->
×
403
                    let new_state_hash =
404
                      State_hash.With_state_hashes.state_hash (fst new_root)
×
405
                    in
406
                    [%log error]
×
407
                      ~metadata:
408
                        [ ("new_root", State_hash.to_yojson new_state_hash)
×
409
                        ; ("state_hash", State_hash.to_yojson hash)
×
410
                        ]
411
                      "Protocol state (for scan state transactions) for \
412
                       $state_hash not found when bootstrapping to the new \
413
                       root $new_root" ;
414
                    Or_error.errorf
×
415
                      !"Protocol state (for scan state transactions) for \
×
416
                        %{sexp:State_hash.t} not found when bootstrapping to \
417
                        the new root %{sexp:State_hash.t}"
418
                      hash new_state_hash
419
                | Some protocol_state ->
×
420
                    Ok (With_hash.data protocol_state)
×
421
              in
422
              (* step 3. Construct staged ledger from snarked ledger, scan state
423
                 and pending coinbases. *)
424
              (* Construct the staged ledger before constructing the transition
425
               * frontier in order to verify the scan state we received.
426
               * TODO: reorganize the code to avoid doing this twice (#3480) *)
427
              let open Deferred.Let_syntax in
428
              let%map staged_ledger_construction_time, construction_result =
429
                time_deferred
×
430
                  (let open Deferred.Let_syntax in
431
                  let temp_mask = Ledger.Root.as_masked temp_snarked_ledger in
432
                  let%map result =
433
                    Staged_ledger
434
                    .of_scan_state_pending_coinbases_and_snarked_ledger ~logger
435
                      ~snarked_local_state:
436
                        Mina_block.(
437
                          t.current_root |> Validation.block |> header
×
438
                          |> Header.protocol_state
×
439
                          |> Protocol_state.blockchain_state
×
440
                          |> Blockchain_state.snarked_local_state)
×
441
                      ~verifier ~constraint_constants ~scan_state
442
                      ~snarked_ledger:temp_mask ~expected_merkle_root
443
                      ~pending_coinbases ~get_state ~signature_kind
444
                  in
445
                  ignore
×
446
                    ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__ temp_mask
×
447
                      : Ledger.unattached_mask ) ;
448
                  Result.map result
449
                    ~f:
450
                      (const
×
451
                         ( scan_state
452
                         , pending_coinbases
453
                         , new_root
454
                         , protocol_states ) ))
455
              in
456
              Ok (staged_ledger_construction_time, construction_result) )
×
457
        in
458
        match staged_ledger_construction_result with
×
459
        | Error err ->
×
460
            (staged_ledger_data_download_time, None, Error err)
461
        | Ok (staged_ledger_construction_time, result) ->
×
462
            ( staged_ledger_data_download_time
463
            , Some staged_ledger_construction_time
464
            , result ) )
465
  in
466
  Transition_frontier.Persistent_root.Instance.close
×
467
    temp_persistent_root_instance ;
468
  match staged_ledger_aux_result with
×
469
  | Error e ->
×
470
      let%map () =
471
        Trust_system.(
472
          record t.trust_system logger sender
×
473
            Actions.
474
              ( Outgoing_connection_error
475
              , Some
476
                  ( "Can't find scan state from the peer or received faulty \
477
                     scan state from the peer."
478
                  , [] ) ))
479
      in
480
      [%log error]
×
481
        ~metadata:
482
          [ ("error", Error_json.error_to_yojson e)
×
483
          ; ("state_hash", State_hash.to_yojson hash)
×
484
          ; ( "expected_staged_ledger_hash"
485
            , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
486
          ]
487
        "Failed to find scan state for the transition with hash $state_hash \
488
         from the peer or received faulty scan state: $error. Retry bootstrap" ;
489
      let this_cycle =
×
490
        { cycle_result = "failed to download and construct scan state"
491
        ; sync_ledger_time
492
        ; staged_ledger_data_download_time
493
        ; staged_ledger_construction_time
494
        ; local_state_sync_required = false
495
        ; local_state_sync_time = None
496
        }
497
      in
498
      `Repeat (this_cycle :: previous_cycles)
499
  | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
500
      let%bind () =
501
        Trust_system.(
502
          record t.trust_system logger sender
×
503
            Actions.
504
              ( Fulfilled_request
505
              , Some ("Received valid scan state from peer", []) ))
506
      in
507
      let best_seen_block_with_hash, _ = t.best_seen_transition in
×
508
      let consensus_state =
509
        With_hash.data best_seen_block_with_hash
×
510
        |> Mina_block.header |> Mina_block.Header.protocol_state
×
511
        |> Protocol_state.consensus_state
512
      in
513
      (* step 4. Synchronize consensus local state if necessary *)
514
      let%bind ( local_state_sync_time
515
               , (local_state_sync_required, local_state_sync_result) ) =
516
        time_deferred
×
517
          ( match
518
              Consensus.Hooks.required_local_state_sync
519
                ~constants:precomputed_values.consensus_constants
520
                ~consensus_state ~local_state:consensus_local_state
521
            with
522
          | None ->
×
523
              [%log debug]
×
524
                ~metadata:
525
                  [ ( "local_state"
526
                    , Consensus.Data.Local_state.to_yojson consensus_local_state
×
527
                    )
528
                  ; ( "consensus_state"
529
                    , Consensus.Data.Consensus_state.Value.to_yojson
×
530
                        consensus_state )
531
                  ]
532
                "Not synchronizing consensus local state" ;
533
              Deferred.return (false, Or_error.return ())
×
534
          | Some sync_jobs ->
×
535
              [%log info] "Synchronizing consensus local state" ;
×
536
              let%map result =
537
                Consensus.Hooks.sync_local_state
×
538
                  ~context:(module Context)
539
                  ~local_state:consensus_local_state ~trust_system
540
                  ~glue_sync_ledger:(Mina_networking.glue_sync_ledger t.network)
×
541
                  sync_jobs
542
              in
543
              (true, result) )
×
544
      in
545
      match local_state_sync_result with
×
546
      | Error e ->
×
547
          [%log error]
×
548
            ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
549
            "Local state sync failed: $error. Retry bootstrap" ;
550
          let this_cycle =
×
551
            { cycle_result = "failed to synchronize local state"
552
            ; sync_ledger_time
553
            ; staged_ledger_data_download_time
554
            ; staged_ledger_construction_time
555
            ; local_state_sync_required
556
            ; local_state_sync_time = Some local_state_sync_time
557
            }
558
          in
559
          Deferred.return (`Repeat (this_cycle :: previous_cycles))
560
      | Ok () ->
×
561
          (* step 5. Close the old frontier and reload a new one from disk. *)
562
          let new_root_data : Transition_frontier.Root_data.Limited.t =
563
            Transition_frontier.Root_data.Limited.create
564
              ~transition:(Mina_block.Validated.lift new_root)
×
565
              ~scan_state ~pending_coinbase ~protocol_states
566
          in
567
          let%bind () =
568
            Transition_frontier.Persistent_frontier.reset_database_exn
×
569
              persistent_frontier ~root_data:new_root_data
570
              ~genesis_state_hash:
571
                (State_hash.With_state_hashes.state_hash
×
572
                   precomputed_values.protocol_state_with_hashes )
573
          in
574
          (* TODO: lazy load db in persistent root to avoid unnecessary opens like this *)
575
          Transition_frontier.Persistent_root.(
×
576
            with_instance_exn persistent_root ~f:(fun instance ->
×
577
                Instance.set_root_state_hash instance
×
578
                @@ Mina_block.Validated.state_hash
×
579
                @@ Mina_block.Validated.lift new_root )) ;
×
580
          let%map new_frontier =
581
            let fail msg =
582
              failwith
×
583
                ( "failed to initialize transition frontier after \
584
                   bootstrapping: " ^ msg )
585
            in
586
            Transition_frontier.load
×
587
              ~context:(module Context)
588
              ~retry_with_fresh_db:false ~verifier ~consensus_local_state
589
              ~persistent_root ~persistent_frontier ~catchup_mode ()
590
            >>| function
×
591
            | Ok frontier ->
×
592
                frontier
593
            | Error (`Failure msg) ->
×
594
                fail msg
595
            | Error `Bootstrap_required ->
×
596
                fail
597
                  "bootstrap still required (indicates logical error in code)"
598
            | Error `Persistent_frontier_malformed ->
×
599
                fail "persistent frontier was malformed"
600
            | Error `Snarked_ledger_mismatch ->
×
601
                fail
602
                  "this should not happen, because we just reset the \
603
                   snarked_ledger"
604
          in
605
          [%str_log info] Bootstrap_complete ;
×
606
          let collected_transitions = Transition_cache.data transition_graph in
×
607
          let logger =
×
608
            Logger.extend logger
609
              [ ("context", `String "Filter collected transitions in bootstrap")
610
              ]
611
          in
612
          let root_consensus_state =
×
613
            Transition_frontier.(
614
              Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
615
          in
616
          let filtered_collected_transitions =
617
            List.filter collected_transitions
618
              ~f:(fun (incoming_transition, _) ->
619
                let transition =
×
620
                  Envelope.Incoming.data incoming_transition
×
621
                  |> Transition_cache.header_with_hash
622
                in
623
                Consensus.Hooks.equal_select_status `Take
×
624
                @@ Consensus.Hooks.select
625
                     ~context:(module Context)
626
                     ~existing:root_consensus_state
627
                     ~candidate:
628
                       (With_hash.map
×
629
                          ~f:
630
                            (Fn.compose Protocol_state.consensus_state
×
631
                               Mina_block.Header.protocol_state )
632
                          transition ) )
633
          in
634
          [%log debug] "Sorting filtered transitions by consensus state"
×
635
            ~metadata:[] ;
636
          let sorted_filtered_collected_transitions =
×
637
            O1trace.sync_thread "sorting_collected_transitions" (fun () ->
638
                List.sort filtered_collected_transitions
×
639
                  ~compare:
640
                    (Comparable.lift
×
641
                       ~f:(fun (x, _) ->
642
                         Transition_cache.header_with_hash
×
643
                         @@ Envelope.Incoming.data x )
×
644
                       (external_transition_compare ~context:(module Context)) ) )
645
          in
646
          let this_cycle =
×
647
            { cycle_result = "success"
648
            ; sync_ledger_time
649
            ; staged_ledger_data_download_time
650
            ; staged_ledger_construction_time
651
            ; local_state_sync_required
652
            ; local_state_sync_time = Some local_state_sync_time
653
            }
654
          in
655
          `Finished
656
            ( this_cycle :: previous_cycles
657
            , (new_frontier, sorted_filtered_collected_transitions) ) )
658

659
(** The entry point function for bootstrap controller. When bootstrap finished
660
    it would return a transition frontier with the root breadcrumb and a list
661
    of transitions collected during bootstrap.
662

663
    Bootstrap controller would do the following steps to contrust the
664
    transition frontier:
665
    1. Download the root snarked_ledger.
666
    2. Download the scan state and pending coinbases.
667
    3. Construct the staged ledger from the snarked ledger, scan state and
668
       pending coinbases.
669
    4. Synchronize the consensus local state if necessary.
670
    5. Close the old frontier and reload a new one from disk.
671
 *)
672
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
673
    ~consensus_local_state ~network_transition_pipe ~preferred_peers
674
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
675
    =
676
  let open Context in
×
677
  let run_cycle =
678
    run_cycle
679
      ~context:(module Context : CONTEXT)
680
      ~trust_system ~verifier ~network ~consensus_local_state
681
      ~network_transition_pipe ~preferred_peers ~persistent_root
682
      ~persistent_frontier ~initial_root_transition ~catchup_mode
683
  in
684
  O1trace.thread "bootstrap"
685
  @@ fun () ->
686
  let%map time_elapsed, (cycles, result) =
687
    time_deferred @@ Deferred.repeat_until_finished [] run_cycle
×
688
  in
689
  [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
690
    ~metadata:
691
      [ ("time_elapsed", Time.Span.to_yojson_hum time_elapsed)
×
692
      ; ( "bootstrap_stats"
693
        , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
694
      ] ;
695
  Mina_metrics.(
×
696
    Gauge.set Bootstrap.bootstrap_time_ms Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
697
  result
698

699
let%test_module "Bootstrap_controller tests" =
700
  ( module struct
701
    let max_frontier_length =
702
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
703

704
    let logger = Logger.null ()
×
705

706
    let () =
707
      (* Disable log messages from best_tip_diff logger. *)
708
      Logger.Consumer_registry.register ~commit_id:""
×
709
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
710
        ~transport:
711
          (Logger.Transport.create
×
712
             ( module struct
713
               type t = unit
714

715
               let transport () _ = ()
×
716
             end )
717
             () )
718
        ()
719

720
    let trust_system =
721
      let s = Trust_system.null () in
722
      don't_wait_for
×
723
        (Pipe_lib.Strict_pipe.Reader.iter
×
724
           (Trust_system.upcall_pipe s)
×
725
           ~f:(const Deferred.unit) ) ;
×
726
      s
×
727

728
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
729

730
    let proof_level = precomputed_values.proof_level
731

732
    let constraint_constants = precomputed_values.constraint_constants
733

734
    let ledger_sync_config =
735
      Syncable_ledger.create_config
×
736
        ~compile_config:Mina_compile_config.For_unit_tests.t
737
        ~max_subtree_depth:None ~default_subtree_depth:None ()
738

739
    module Context = struct
740
      let logger = logger
741

742
      let precomputed_values = precomputed_values
743

744
      let constraint_constants =
745
        Genesis_constants.For_unit_tests.Constraint_constants.t
746

747
      let consensus_constants = precomputed_values.consensus_constants
748

749
      let ledger_sync_config =
750
        Syncable_ledger.create_config
×
751
          ~compile_config:Mina_compile_config.For_unit_tests.t
752
          ~max_subtree_depth:None ~default_subtree_depth:None ()
753

754
      let proof_cache_db = Proof_cache_tag.For_tests.create_db ()
×
755

756
      let signature_kind = Mina_signature_kind.Testnet
757
    end
758

759
    let verifier =
760
      Async.Thread_safe.block_on_async_exn (fun () ->
×
761
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
762
            () )
763

764
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
765

766
    let downcast_transition ~sender transition =
767
      let transition =
×
768
        transition |> Mina_block.Validated.remember
×
769
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
770
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
771
      in
772
      Envelope.Incoming.wrap ~data:transition
×
773
        ~sender:(Envelope.Sender.Remote sender)
774

775
    let downcast_breadcrumb ~sender breadcrumb =
776
      downcast_transition ~sender
×
777
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
778

779
    let make_non_running_bootstrap ~genesis_root ~network =
780
      let transition =
×
781
        genesis_root
782
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
783
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
784
      in
785
      { context = (module Context)
×
786
      ; trust_system
787
      ; verifier
788
      ; best_seen_transition = transition
789
      ; current_root = transition
790
      ; network
791
      ; num_of_root_snarked_ledger_retargeted = 0
792
      }
793

794
    let%test_unit "Bootstrap controller caches all transitions it is passed \
795
                   through the transition_reader" =
796
      let branch_size = (max_frontier_length * 2) + 2 in
×
797
      Quickcheck.test ~trials:1
798
        (let open Quickcheck.Generator.Let_syntax in
799
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
800
        let%bind fake_network =
801
          Fake_network.Generator.(
802
            gen ~precomputed_values ~verifier ~max_frontier_length
×
803
              ~ledger_sync_config [ fresh_peer; fresh_peer ])
804
        in
805
        let%map make_branch =
806
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
807
            ~verifier
808
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
809
            branch_size
810
        in
811
        let [ me; _ ] = fake_network.peer_networks in
×
812
        let branch =
813
          Async.Thread_safe.block_on_async_exn (fun () ->
814
              make_branch (Transition_frontier.root me.state.frontier) )
×
815
        in
816
        (fake_network, branch))
×
817
        ~f:(fun (fake_network, branch) ->
818
          let [ me; other ] = fake_network.peer_networks in
×
819
          let genesis_root =
820
            Transition_frontier.(
821
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
822
            |> Mina_block.Validated.remember
823
          in
824
          let transition_graph = Transition_cache.create () in
×
825
          let sync_ledger_reader, sync_ledger_writer =
×
826
            Pipe_lib.Choosable_synchronous_pipe.create ()
827
          in
828
          let bootstrap =
×
829
            make_non_running_bootstrap ~genesis_root ~network:me.network
830
          in
831
          let root_sync_ledger =
832
            Sync_ledger.Root.create
833
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
834
              ~context:(module Context)
835
              ~trust_system
836
          in
837
          Async.Thread_safe.block_on_async_exn (fun () ->
×
838
              let sync_deferred =
×
839
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
840
                  ~preferred:[] ~sync_ledger_reader
841
              in
842
              let%bind sync_ledger_writer' =
843
                Deferred.List.fold ~init:sync_ledger_writer branch
×
844
                  ~f:(fun sync_ledger_writer breadcrumb ->
845
                    Pipe_lib.Choosable_synchronous_pipe.write sync_ledger_writer
×
846
                      ( `Block
847
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
848
                      , `Valid_cb None ) )
849
              in
850
              Pipe_lib.Choosable_synchronous_pipe.close sync_ledger_writer' ;
×
851
              sync_deferred ) ;
×
852
          let expected_transitions =
×
853
            List.map branch
854
              ~f:
855
                (Fn.compose
×
856
                   (With_hash.map ~f:Mina_block.header)
857
                   (Fn.compose Mina_block.Validation.block_with_hash
×
858
                      (Fn.compose Mina_block.Validated.remember
×
859
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
860
          in
861
          let saved_transitions =
×
862
            Transition_cache.data transition_graph
×
863
            |> List.map ~f:(fun (x, _) ->
864
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
865
          in
866
          let module E = struct
×
867
            module T = struct
868
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
869
              [@@deriving sexp]
870

871
              let compare = external_transition_compare ~context:(module Context)
872
            end
873

874
            include Comparable.Make (T)
875
          end in
876
          [%test_result: E.Set.t]
×
877
            (E.Set.of_list saved_transitions)
×
878
            ~expect:(E.Set.of_list expected_transitions) )
×
879

880
    let run_bootstrap ~timeout_duration ~my_net ~network_transition_pipe =
881
      let open Fake_network in
×
882
      let time_controller = Block_time.Controller.basic ~logger in
883
      let persistent_root =
884
        Transition_frontier.persistent_root my_net.state.frontier
885
      in
886
      let persistent_frontier =
×
887
        Transition_frontier.persistent_frontier my_net.state.frontier
888
      in
889
      let initial_root_transition =
×
890
        Transition_frontier.(
891
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
892
      in
893
      let%bind () =
894
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
895
      in
896
      [%log info] "bootstrap begin" ;
×
897
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
898
        (run
899
           ~context:(module Context)
900
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
901
           ~consensus_local_state:my_net.state.consensus_local_state
902
           ~network_transition_pipe ~persistent_root ~persistent_frontier
903
           ~catchup_mode:`Super ~initial_root_transition )
904

905
    let assert_transitions_increasingly_sorted ~root
906
        (incoming_transitions : Transition_cache.element list) =
907
      let root =
×
908
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
909
      in
910
      ignore
×
911
        ( List.fold_result ~init:root incoming_transitions
×
912
            ~f:(fun max_acc incoming_transition ->
913
              let With_hash.{ data = header; _ } =
×
914
                Transition_cache.header_with_hash
915
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
916
              in
917
              let header_len h =
×
918
                Mina_block.Header.protocol_state h
×
919
                |> Protocol_state.consensus_state
×
920
                |> Consensus.Data.Consensus_state.blockchain_length
921
              in
922
              let open Result.Let_syntax in
923
              let%map () =
924
                Result.ok_if_true
×
925
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
926
                  ~error:
927
                    (Error.of_string
×
928
                       "The blocks are not sorted in increasing order" )
929
              in
930
              header )
×
931
          |> Or_error.ok_exn
×
932
          : Mina_block.Header.t )
933

934
    let%test_unit "sync with one node after receiving a transition" =
935
      Quickcheck.test ~trials:1
×
936
        Fake_network.Generator.(
937
          gen ~precomputed_values ~verifier ~max_frontier_length
×
938
            ~ledger_sync_config
939
            [ fresh_peer
940
            ; peer_with_branch
941
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
942
            ])
943
        ~f:(fun fake_network ->
944
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
945
          let block =
946
            Envelope.Incoming.wrap
947
              ~data:
948
                ( Transition_frontier.best_tip peer_net.state.frontier
×
949
                |> Transition_frontier.Breadcrumb.validated_transition
×
950
                |> Mina_block.Validated.remember
×
951
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
952
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
953
              ~sender:(Envelope.Sender.Remote peer_net.peer)
954
          in
955
          let network_transition_pipe =
956
            Swappable.create ~name:(__MODULE__ ^ __LOC__)
957
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
958
          in
959
          Swappable.write network_transition_pipe (`Block block, `Valid_cb None) ;
×
960
          let new_frontier, sorted_external_transitions =
×
961
            Async.Thread_safe.block_on_async_exn (fun () ->
962
                run_bootstrap
×
963
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
964
                  ~my_net ~network_transition_pipe )
965
          in
966
          assert_transitions_increasingly_sorted
×
967
            ~root:(Transition_frontier.root new_frontier)
×
968
            sorted_external_transitions ;
969
          [%test_result: Ledger_hash.t]
×
970
            ( Ledger.Root.merkle_root
×
971
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
972
            ~expect:
973
              ( Ledger.Root.merkle_root
×
974
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
975
              ) )
976

977
    let%test_unit "reconstruct staged_ledgers using \
978
                   of_scan_state_and_snarked_ledger" =
979
      Quickcheck.test ~trials:1
×
980
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
981
           ~max_length:max_frontier_length ~size:max_frontier_length () )
982
        ~f:(fun frontier ->
983
          Thread_safe.block_on_async_exn
×
984
          @@ fun () ->
985
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
986
            ~f:(fun breadcrumb ->
987
              let staged_ledger =
×
988
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
989
              in
990
              let expected_merkle_root =
×
991
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
992
              in
993
              let snarked_ledger =
×
994
                Transition_frontier.root_snarked_ledger frontier
×
995
                |> Ledger.Root.as_masked
996
              in
997
              let snarked_local_state =
×
998
                Transition_frontier.root frontier
×
999
                |> Transition_frontier.Breadcrumb.protocol_state
×
1000
                |> Protocol_state.blockchain_state
×
1001
                |> Blockchain_state.snarked_local_state
1002
              in
1003
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1004
              let get_state hash =
×
1005
                match Transition_frontier.find_protocol_state frontier hash with
×
1006
                | Some protocol_state ->
×
1007
                    Ok protocol_state
1008
                | None ->
×
1009
                    Or_error.errorf
1010
                      !"Protocol state (for scan state transactions) for \
×
1011
                        %{sexp:State_hash.t} not found"
1012
                      hash
1013
              in
1014
              let pending_coinbases =
1015
                Staged_ledger.pending_coinbase_collection staged_ledger
1016
              in
1017
              let%map actual_staged_ledger =
1018
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1019
                  ~scan_state ~logger ~verifier ~constraint_constants
1020
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1021
                  ~pending_coinbases ~get_state ~signature_kind:Testnet
1022
                |> Deferred.Or_error.ok_exn
×
1023
              in
1024
              let height =
×
1025
                Transition_frontier.Breadcrumb.consensus_state breadcrumb
×
1026
                |> Consensus.Data.Consensus_state.blockchain_length
×
1027
                |> Mina_numbers.Length.to_int
1028
              in
1029
              [%test_eq: Staged_ledger_hash.t]
×
1030
                ~message:
1031
                  (sprintf "mismatch of staged ledger hash height %d" height)
×
1032
                (Transition_frontier.Breadcrumb.staged_ledger_hash breadcrumb)
×
1033
                (Staged_ledger.hash actual_staged_ledger) ) )
×
1034

1035
    (*
1036
    let%test_unit "if we see a new transition that is better than the \
1037
                   transition that we are syncing from, than we should \
1038
                   retarget our root" =
1039
      Quickcheck.test ~trials:1
1040
        Fake_network.Generator.(
1041
          gen ~max_frontier_length
1042
            [ fresh_peer
1043
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1044
            ; peer_with_branch
1045
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1046
        ~f:(fun fake_network ->
1047
          let [me; weaker_chain; stronger_chain] =
1048
            fake_network.peer_networks
1049
          in
1050
          let transition_reader, transition_writer =
1051
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1052
              (Buffered (`Capacity 10, `Overflow Drop_head))
1053
          in
1054
          Envelope.Incoming.wrap
1055
            ~data:
1056
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1057
              |> Transition_frontier.Breadcrumb.validated_transition
1058
              |> Mina_block.Validated.to_initial_validated )
1059
            ~sender:
1060
              (Envelope.Sender.Remote
1061
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1062
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1063
          Envelope.Incoming.wrap
1064
            ~data:
1065
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1066
              |> Transition_frontier.Breadcrumb.validated_transition
1067
              |> Mina_block.Validated.to_initial_validated )
1068
            ~sender:
1069
              (Envelope.Sender.Remote
1070
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1071
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1072
          let new_frontier, sorted_external_transitions =
1073
            Async.Thread_safe.block_on_async_exn (fun () ->
1074
                run_bootstrap
1075
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1076
                  ~my_net:me ~transition_reader )
1077
          in
1078
          assert_transitions_increasingly_sorted
1079
            ~root:(Transition_frontier.root new_frontier)
1080
            sorted_external_transitions ;
1081
          [%test_result: Ledger_hash.t]
1082
            ( Ledger.Db.merkle_root
1083
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1084
            ~expect:
1085
              ( Ledger.Db.merkle_root
1086
              @@ Transition_frontier.root_snarked_ledger
1087
                   stronger_chain.state.frontier ) )
1088
*)
1089
  end )
224✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc