• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 745

29 Oct 2025 09:20AM UTC coverage: 37.022%. First build
745

push

buildkite

web-flow
Merge pull request #18015 from MinaProtocol/lyh/compat-into-dev-oct28

Merge compatible into develop Oct. 28th

68 of 134 new or added lines in 15 files covered. (50.75%)

26927 of 72733 relevant lines covered (37.02%)

36514.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.84
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
113✔
2
open Core
3
open Async
4
open Mina_base
5
open Mina_state
6
open Pipe_lib.Strict_pipe
7
open Network_peer
8
open Mina_stdlib
9
module Ledger = Mina_ledger.Ledger
10
module Root_ledger = Mina_ledger.Root
11
module Sync_ledger = Mina_ledger.Sync_ledger
12
module Transition_cache = Transition_cache
13

14
module type CONTEXT = sig
15
  val logger : Logger.t
16

17
  val precomputed_values : Precomputed_values.t
18

19
  val constraint_constants : Genesis_constants.Constraint_constants.t
20

21
  val consensus_constants : Consensus.Constants.t
22

23
  val ledger_sync_config : Syncable_ledger.daemon_config
24

25
  val proof_cache_db : Proof_cache_tag.cache_db
26

27
  val signature_kind : Mina_signature_kind.t
28
end
29

30
type Structured_log_events.t += Bootstrap_complete
31
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
3✔
32

33
type t =
34
  { context : (module CONTEXT)
35
  ; trust_system : Trust_system.t
36
  ; verifier : Verifier.t
37
  ; mutable best_seen_transition : Mina_block.initial_valid_block
38
  ; mutable current_root : Mina_block.initial_valid_block
39
  ; network : Mina_networking.t
40
  ; mutable num_of_root_snarked_ledger_retargeted : int
41
  }
42

43
(** An auxiliary data structure for collecting various metrics for bootstrap controller. *)
44
type bootstrap_cycle_stats =
×
45
  { cycle_result : string
×
46
  ; sync_ledger_time : Time.Span.t
×
47
  ; staged_ledger_data_download_time : Time.Span.t
×
48
  ; staged_ledger_construction_time : Time.Span.t option
×
49
  ; local_state_sync_required : bool
×
50
  ; local_state_sync_time : Time.Span.t option
×
51
  }
52
[@@deriving to_yojson]
53

54
let time_deferred deferred =
55
  let start_time = Time.now () in
×
56
  let%map result = deferred in
57
  let end_time = Time.now () in
×
58
  (Time.diff end_time start_time, result)
×
59

60
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
61
  let module Consensus_context = struct
×
62
    include Context
63

64
    let logger =
65
      Logger.extend logger
×
66
        [ ( "selection_context"
67
          , `String "Bootstrap_controller.worth_getting_root" )
68
        ]
69
  end in
70
  Consensus.Hooks.equal_select_status `Take
71
  @@ Consensus.Hooks.select
72
       ~context:(module Consensus_context)
73
       ~existing:
74
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
75
         |> With_hash.map ~f:Mina_block.consensus_state )
×
76
       ~candidate
77

78
let received_bad_proof ({ context = (module Context); _ } as t) host e =
79
  let open Context in
×
80
  Trust_system.(
81
    record t.trust_system logger host
82
      Actions.
83
        ( Violated_protocol
84
        , Some
85
            ( "Bad ancestor proof: $error"
86
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
87

88
let done_syncing_root root_sync_ledger =
89
  Option.is_some (Sync_ledger.Root.peek_valid_tree root_sync_ledger)
×
90

91
let should_sync ~root_sync_ledger t candidate_state =
92
  (not @@ done_syncing_root root_sync_ledger)
×
93
  && worth_getting_root t candidate_state
×
94

95
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
96
let start_sync_job_with_peer ~sender ~root_sync_ledger
97
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
98
  let open Context in
×
99
  let%bind () =
100
    Trust_system.(
101
      record t.trust_system logger sender
×
102
        Actions.
103
          ( Fulfilled_request
104
          , Some ("Received verified peer root and best tip", []) ))
105
  in
106
  t.best_seen_transition <- peer_best_tip ;
×
107
  t.current_root <- peer_root ;
108
  let blockchain_state =
109
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
110
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
111
  in
112
  let expected_staged_ledger_hash =
×
113
    blockchain_state |> Blockchain_state.staged_ledger_hash
114
  in
115
  let snarked_ledger_hash =
×
116
    blockchain_state |> Blockchain_state.snarked_ledger_hash
117
  in
118
  return
×
119
  @@
120
  match
121
    Sync_ledger.Root.new_goal root_sync_ledger
122
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
123
      ~data:
124
        ( State_hash.With_state_hashes.state_hash
×
125
          @@ Mina_block.Validation.block_with_hash t.current_root
×
126
        , sender
127
        , expected_staged_ledger_hash )
128
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
129
  with
130
  | `New ->
×
131
      t.num_of_root_snarked_ledger_retargeted <-
132
        t.num_of_root_snarked_ledger_retargeted + 1 ;
133
      `Syncing_new_snarked_ledger
134
  | `Update_data ->
×
135
      `Updating_root_transition
136
  | `Repeat ->
×
137
      `Ignored
138

139
let to_consensus_state h =
140
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
141

142
(** For each transition, this function would compare it with the existing one.
143
    If the incoming transition is better, then download the merkle list from
144
    that transition to its root and verify it. If we get a better root than
145
    the existing one, then reset the Sync_ledger's target by calling
146
    [start_sync_job_with_peer] function. *)
147
let on_transition ({ context = (module Context); _ } as t) ~sender
148
    ~root_sync_ledger candidate_header =
149
  let open Context in
×
150
  let candidate_consensus_state =
151
    With_hash.map ~f:to_consensus_state candidate_header
152
  in
153
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
154
    Deferred.return `Ignored
×
155
  else
156
    match%bind
157
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
158
        (With_hash.map_hash candidate_consensus_state
×
159
           ~f:State_hash.State_hashes.state_hash )
160
    with
161
    | Error e ->
×
162
        [%log error]
×
163
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
164
          !"Could not get the proof of the root transition from the network: \
165
            $error" ;
166
        Deferred.return `Ignored
×
167
    | Ok peer_root_with_proof -> (
×
168
        let pcd =
169
          peer_root_with_proof.data
170
          |> Proof_carrying_data.map
×
171
               ~f:
172
                 (Mina_block.write_all_proofs_to_disk ~signature_kind
173
                    ~proof_cache_db )
174
          |> Proof_carrying_data.map_proof
175
               ~f:
176
                 (Tuple2.map_snd
177
                    ~f:
178
                      (Mina_block.write_all_proofs_to_disk ~signature_kind
179
                         ~proof_cache_db ) )
180
        in
181
        match%bind
182
          Mina_block.verify_on_header
×
183
            ~verify:
184
              (Sync_handler.Root.verify
×
185
                 ~context:(module Context)
186
                 ~verifier:t.verifier candidate_consensus_state )
187
            pcd
188
        with
189
        | Ok (`Root root, `Best_tip best_tip) ->
×
190
            if done_syncing_root root_sync_ledger then return `Ignored
×
191
            else
192
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
193
        | Error e ->
×
194
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
195

196
(** A helper function that wraps the calls to Sync_ledger and iterate through
197
    incoming transitions, add those to the transition_cache and calls
198
    [on_transition] function. *)
199
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
200
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader =
201
  let open Context in
×
202
  let query_reader = Sync_ledger.Root.query_reader root_sync_ledger in
203
  let response_writer = Sync_ledger.Root.answer_writer root_sync_ledger in
×
204
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
205
    response_writer ;
206
  Pipe_lib.Choosable_synchronous_pipe.iter sync_ledger_reader
×
207
    ~f:(fun (b_or_h, `Valid_cb vc) ->
208
      let header_with_hash, sender, transition_cache_element =
×
209
        match b_or_h with
210
        | `Block b_env ->
×
211
            ( Envelope.Incoming.data b_env
×
212
              |> Mina_block.Validation.block_with_hash
×
213
              |> With_hash.map ~f:Mina_block.header
×
214
            , Envelope.Incoming.remote_sender_exn b_env
×
215
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
216
        | `Header h_env ->
×
217
            ( Envelope.Incoming.data h_env
×
218
              |> Mina_block.Validation.header_with_hash
×
219
            , Envelope.Incoming.remote_sender_exn h_env
×
220
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
221
      in
222
      let previous_state_hash =
223
        With_hash.data header_with_hash
×
224
        |> Mina_block.Header.protocol_state
×
225
        |> Protocol_state.previous_state_hash
226
      in
227
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
228
        (transition_cache_element, vc) ;
229
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
230
      if
×
231
        worth_getting_root t
232
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
233
      then (
×
234
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
235
          ~metadata:
236
            [ ( "state_hash"
237
              , State_hash.to_yojson
×
238
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
239
            ; ( "header"
240
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
241
            ] ;
242

243
        Deferred.ignore_m
×
244
        @@ on_transition t ~sender ~root_sync_ledger header_with_hash )
×
245
      else Deferred.unit )
×
246

247
let external_transition_compare ~context:(module Context : CONTEXT) =
248
  let get_consensus_state =
×
249
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
250
  in
251
  Comparable.lift
×
252
    (fun existing candidate ->
253
      (* To prevent the logger to spam a lot of messages, the logger input is set to null *)
254
      if
×
255
        State_hash.equal
256
          (State_hash.With_state_hashes.state_hash existing)
×
257
          (State_hash.With_state_hashes.state_hash candidate)
×
258
      then 0
×
259
      else if
×
260
        Consensus.Hooks.equal_select_status `Keep
261
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
262
      then -1
×
263
      else 1 )
×
264
    ~f:(With_hash.map ~f:get_consensus_state)
265

266
let download_snarked_ledger ~trust_system ~preferred_peers ~transition_graph
267
    ~sync_ledger_reader ~context t temp_snarked_ledger =
268
  time_deferred
×
269
    (let root_sync_ledger =
270
       Sync_ledger.Root.create temp_snarked_ledger ~context ~trust_system
271
     in
272
     don't_wait_for
×
273
       (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
274
          ~transition_graph ~sync_ledger_reader ) ;
275
     (* We ignore the resulting ledger returned here since it will always
276
        * be the same as the ledger we started with because we are syncing
277
        * a db ledger. *)
278
     let%map _, data = Sync_ledger.Root.valid_tree root_sync_ledger in
×
279
     Sync_ledger.Root.destroy root_sync_ledger ;
×
280
     data )
×
281

282
(** Run one bootstrap cycle *)
283
let run_cycle ~context:(module Context : CONTEXT) ~trust_system ~verifier
284
    ~network ~consensus_local_state ~network_transition_pipe ~preferred_peers
285
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
286
    previous_cycles =
287
  let open Context in
×
288
  (* The short-lived pipe allocated here will be closed
289
     when a follow-up pipe is allocated: in the next cycle of bootstrap
290
     or when controll is passed to the transition frontier controller.staged_ledger_construction_time
291
     Because [Choosable_synchronous_pipe.t] is used, no data will be lost: any
292
     successful read is followed by mom-blocking handling, and if the read/write
293
     pair is not consumed, it will continue in a follow-up pipe allocated in the
294
     next call to [Swappable.swap_reader].
295
  *)
296
  let%bind sync_ledger_reader = Swappable.swap_reader network_transition_pipe in
×
297
  let initial_root_transition =
×
298
    initial_root_transition |> Mina_block.Validated.remember
×
299
    |> Mina_block.Validation.reset_frontier_dependencies_validation
×
300
    |> Mina_block.Validation.reset_staged_ledger_diff_validation
301
  in
302
  let t =
×
303
    { network
304
    ; context = (module Context)
305
    ; trust_system
306
    ; verifier
307
    ; best_seen_transition = initial_root_transition
308
    ; current_root = initial_root_transition
309
    ; num_of_root_snarked_ledger_retargeted = 0
310
    }
311
  in
312
  let transition_graph = Transition_cache.create () in
313
  let temp_persistent_root_instance =
×
314
    Transition_frontier.Persistent_root.create_instance_exn persistent_root
315
  in
316
  let temp_snarked_ledger =
×
317
    Transition_frontier.Persistent_root.Instance.snarked_ledger
318
      temp_persistent_root_instance
319
  in
320
  (* step 1. download snarked_ledger *)
321
  let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
322
    download_snarked_ledger
×
323
      ~context:(module Context)
324
      ~trust_system ~preferred_peers ~transition_graph ~sync_ledger_reader t
325
      temp_snarked_ledger
326
  in
327
  Mina_metrics.(
×
328
    Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
329
      Time.Span.(to_ms sync_ledger_time)) ;
×
330
  Mina_metrics.(
331
    Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
332
      (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
333
  (* step 2. Download scan state and pending coinbases. *)
334
  let%bind ( staged_ledger_data_download_time
335
           , staged_ledger_construction_time
336
           , staged_ledger_aux_result ) =
337
    let%bind ( staged_ledger_data_download_time
338
             , staged_ledger_data_download_result ) =
339
      time_deferred
×
340
        (Mina_networking.get_staged_ledger_aux_and_pending_coinbases_at_hash
×
341
           t.network sender.peer_id hash )
342
    in
343
    match staged_ledger_data_download_result with
×
344
    | Error err ->
×
345
        Deferred.return (staged_ledger_data_download_time, None, Error err)
346
    | Ok
×
347
        ( scan_state_uncached
348
        , expected_merkle_root
349
        , pending_coinbases
350
        , protocol_states ) -> (
351
        let%map staged_ledger_construction_result =
352
          O1trace.thread "construct_root_staged_ledger" (fun () ->
×
353
              let open Deferred.Or_error.Let_syntax in
×
354
              let received_staged_ledger_hash =
355
                Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
356
                  (Staged_ledger.Scan_state.Stable.Latest.hash
×
357
                     scan_state_uncached )
358
                  expected_merkle_root pending_coinbases
359
              in
360
              [%log debug]
×
361
                ~metadata:
362
                  [ ( "expected_staged_ledger_hash"
363
                    , Staged_ledger_hash.to_yojson expected_staged_ledger_hash
×
364
                    )
365
                  ; ( "received_staged_ledger_hash"
366
                    , Staged_ledger_hash.to_yojson received_staged_ledger_hash
×
367
                    )
368
                  ]
369
                "Comparing $expected_staged_ledger_hash to \
370
                 $received_staged_ledger_hash" ;
371
              let%bind new_root =
372
                t.current_root
373
                |> Mina_block.Validation.skip_frontier_dependencies_validation
×
374
                     `This_block_belongs_to_a_detached_subtree
375
                |> Mina_block.Validation.validate_staged_ledger_hash
×
376
                     (`Staged_ledger_already_materialized
377
                       received_staged_ledger_hash )
378
                |> Result.map_error ~f:(fun _ ->
×
379
                       Error.of_string "received faulty scan state from peer" )
×
380
                |> Deferred.return
×
381
              in
382
              let protocol_states =
×
383
                List.map protocol_states
384
                  ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
385
              in
386
              let scan_state =
×
387
                Staged_ledger.Scan_state.write_all_proofs_to_disk
388
                  ~signature_kind ~proof_cache_db scan_state_uncached
389
              in
390
              let%bind protocol_states =
391
                Staged_ledger.Scan_state.check_required_protocol_states
×
392
                  scan_state ~protocol_states
393
                |> Deferred.return
×
394
              in
395
              let protocol_states_map =
×
396
                protocol_states
397
                |> List.map ~f:(fun ps ->
×
398
                       (State_hash.With_state_hashes.state_hash ps, ps) )
×
399
                |> State_hash.Map.of_alist_exn
400
              in
401
              let get_state hash =
×
402
                match Map.find protocol_states_map hash with
×
403
                | None ->
×
404
                    let new_state_hash =
405
                      State_hash.With_state_hashes.state_hash (fst new_root)
×
406
                    in
407
                    [%log error]
×
408
                      ~metadata:
409
                        [ ("new_root", State_hash.to_yojson new_state_hash)
×
410
                        ; ("state_hash", State_hash.to_yojson hash)
×
411
                        ]
412
                      "Protocol state (for scan state transactions) for \
413
                       $state_hash not found when bootstrapping to the new \
414
                       root $new_root" ;
415
                    Or_error.errorf
×
416
                      !"Protocol state (for scan state transactions) for \
×
417
                        %{sexp:State_hash.t} not found when bootstrapping to \
418
                        the new root %{sexp:State_hash.t}"
419
                      hash new_state_hash
420
                | Some protocol_state ->
×
421
                    Ok (With_hash.data protocol_state)
×
422
              in
423
              (* step 3. Construct staged ledger from snarked ledger, scan state
424
                 and pending coinbases. *)
425
              (* Construct the staged ledger before constructing the transition
426
               * frontier in order to verify the scan state we received.
427
               * TODO: reorganize the code to avoid doing this twice (#3480) *)
428
              let open Deferred.Let_syntax in
429
              let%map staged_ledger_construction_time, construction_result =
430
                time_deferred
×
431
                  (let open Deferred.Let_syntax in
432
                  let temp_mask = Root_ledger.as_masked temp_snarked_ledger in
433
                  let%map result =
434
                    Staged_ledger
435
                    .of_scan_state_pending_coinbases_and_snarked_ledger ~logger
436
                      ~snarked_local_state:
437
                        Mina_block.(
438
                          t.current_root |> Validation.block |> header
×
439
                          |> Header.protocol_state
×
440
                          |> Protocol_state.blockchain_state
×
441
                          |> Blockchain_state.snarked_local_state)
×
442
                      ~verifier ~constraint_constants ~scan_state
443
                      ~snarked_ledger:temp_mask ~expected_merkle_root
444
                      ~pending_coinbases ~get_state ~signature_kind
445
                  in
446
                  ignore
×
447
                    ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__ temp_mask
×
448
                      : Ledger.unattached_mask ) ;
449
                  Result.map result
450
                    ~f:
451
                      (const
×
452
                         ( scan_state
453
                         , pending_coinbases
454
                         , new_root
455
                         , protocol_states ) ))
456
              in
457
              Ok (staged_ledger_construction_time, construction_result) )
×
458
        in
459
        match staged_ledger_construction_result with
×
460
        | Error err ->
×
461
            (staged_ledger_data_download_time, None, Error err)
462
        | Ok (staged_ledger_construction_time, result) ->
×
463
            ( staged_ledger_data_download_time
464
            , Some staged_ledger_construction_time
465
            , result ) )
466
  in
467
  Transition_frontier.Persistent_root.Instance.close
×
468
    temp_persistent_root_instance ;
469
  match staged_ledger_aux_result with
×
470
  | Error e ->
×
471
      let%map () =
472
        Trust_system.(
473
          record t.trust_system logger sender
×
474
            Actions.
475
              ( Outgoing_connection_error
476
              , Some
477
                  ( "Can't find scan state from the peer or received faulty \
478
                     scan state from the peer."
479
                  , [] ) ))
480
      in
481
      [%log error]
×
482
        ~metadata:
483
          [ ("error", Error_json.error_to_yojson e)
×
484
          ; ("state_hash", State_hash.to_yojson hash)
×
485
          ; ( "expected_staged_ledger_hash"
486
            , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
487
          ]
488
        "Failed to find scan state for the transition with hash $state_hash \
489
         from the peer or received faulty scan state: $error. Retry bootstrap" ;
490
      let this_cycle =
×
491
        { cycle_result = "failed to download and construct scan state"
492
        ; sync_ledger_time
493
        ; staged_ledger_data_download_time
494
        ; staged_ledger_construction_time
495
        ; local_state_sync_required = false
496
        ; local_state_sync_time = None
497
        }
498
      in
499
      `Repeat (this_cycle :: previous_cycles)
500
  | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
501
      let%bind () =
502
        Trust_system.(
503
          record t.trust_system logger sender
×
504
            Actions.
505
              ( Fulfilled_request
506
              , Some ("Received valid scan state from peer", []) ))
507
      in
508
      let best_seen_block_with_hash, _ = t.best_seen_transition in
×
509
      let consensus_state =
510
        With_hash.data best_seen_block_with_hash
×
511
        |> Mina_block.header |> Mina_block.Header.protocol_state
×
512
        |> Protocol_state.consensus_state
513
      in
514
      (* step 4. Synchronize consensus local state if necessary *)
515
      let%bind ( local_state_sync_time
516
               , (local_state_sync_required, local_state_sync_result) ) =
517
        time_deferred
×
518
          ( match
519
              Consensus.Hooks.required_local_state_sync
520
                ~constants:precomputed_values.consensus_constants
521
                ~consensus_state ~local_state:consensus_local_state
522
            with
523
          | None ->
×
524
              [%log debug]
×
525
                ~metadata:
526
                  [ ( "local_state"
527
                    , Consensus.Data.Local_state.to_yojson consensus_local_state
×
528
                    )
529
                  ; ( "consensus_state"
530
                    , Consensus.Data.Consensus_state.Value.to_yojson
×
531
                        consensus_state )
532
                  ]
533
                "Not synchronizing consensus local state" ;
534
              Deferred.return (false, Or_error.return ())
×
535
          | Some sync_jobs ->
×
536
              [%log info] "Synchronizing consensus local state" ;
×
537
              let%map result =
538
                Consensus.Hooks.sync_local_state
×
539
                  ~context:(module Context)
540
                  ~local_state:consensus_local_state ~trust_system
541
                  ~glue_sync_ledger:(Mina_networking.glue_sync_ledger t.network)
×
542
                  sync_jobs
543
              in
544
              (true, result) )
×
545
      in
546
      match local_state_sync_result with
×
547
      | Error e ->
×
548
          [%log error]
×
549
            ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
550
            "Local state sync failed: $error. Retry bootstrap" ;
551
          let this_cycle =
×
552
            { cycle_result = "failed to synchronize local state"
553
            ; sync_ledger_time
554
            ; staged_ledger_data_download_time
555
            ; staged_ledger_construction_time
556
            ; local_state_sync_required
557
            ; local_state_sync_time = Some local_state_sync_time
558
            }
559
          in
560
          Deferred.return (`Repeat (this_cycle :: previous_cycles))
561
      | Ok () ->
×
562
          (* step 5. Close the old frontier and reload a new one from disk. *)
563
          let new_root_data : Transition_frontier.Root_data.Limited.t =
564
            Transition_frontier.Root_data.Limited.create
565
              ~transition:(Mina_block.Validated.lift new_root)
×
566
              ~scan_state ~pending_coinbase ~protocol_states
567
          in
568
          let%bind () =
569
            Transition_frontier.Persistent_frontier.reset_database_exn
×
570
              persistent_frontier ~root_data:new_root_data
571
              ~genesis_state_hash:
572
                (State_hash.With_state_hashes.state_hash
×
573
                   precomputed_values.protocol_state_with_hashes )
574
          in
575
          (* TODO: lazy load db in persistent root to avoid unnecessary opens like this *)
576
          Transition_frontier.Persistent_root.(
×
577
            with_instance_exn persistent_root ~f:(fun instance ->
×
578
                Instance.set_root_state_hash instance
×
579
                @@ Mina_block.Validated.state_hash
×
580
                @@ Mina_block.Validated.lift new_root )) ;
×
581
          let%map new_frontier =
582
            let fail msg =
583
              failwith
×
584
                ( "failed to initialize transition frontier after \
585
                   bootstrapping: " ^ msg )
586
            in
587
            Transition_frontier.load
×
588
              ~context:(module Context)
589
              ~retry_with_fresh_db:false ~verifier ~consensus_local_state
590
              ~persistent_root ~persistent_frontier ~catchup_mode ()
591
            >>| function
×
592
            | Ok frontier ->
×
593
                frontier
594
            | Error (`Failure msg) ->
×
595
                fail msg
596
            | Error `Bootstrap_required ->
×
597
                fail
598
                  "bootstrap still required (indicates logical error in code)"
599
            | Error `Persistent_frontier_malformed ->
×
600
                fail "persistent frontier was malformed"
601
            | Error `Snarked_ledger_mismatch ->
×
602
                fail
603
                  "this should not happen, because we just reset the \
604
                   snarked_ledger"
605
          in
606
          [%str_log info] Bootstrap_complete ;
×
607
          let collected_transitions = Transition_cache.data transition_graph in
×
608
          let logger =
×
609
            Logger.extend logger
610
              [ ("context", `String "Filter collected transitions in bootstrap")
611
              ]
612
          in
613
          let root_consensus_state =
×
614
            Transition_frontier.(
615
              Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
616
          in
617
          let filtered_collected_transitions =
618
            List.filter collected_transitions
619
              ~f:(fun (incoming_transition, _) ->
620
                let transition =
×
621
                  Envelope.Incoming.data incoming_transition
×
622
                  |> Transition_cache.header_with_hash
623
                in
624
                Consensus.Hooks.equal_select_status `Take
×
625
                @@ Consensus.Hooks.select
626
                     ~context:(module Context)
627
                     ~existing:root_consensus_state
628
                     ~candidate:
629
                       (With_hash.map
×
630
                          ~f:
631
                            (Fn.compose Protocol_state.consensus_state
×
632
                               Mina_block.Header.protocol_state )
633
                          transition ) )
634
          in
635
          [%log debug] "Sorting filtered transitions by consensus state"
×
636
            ~metadata:[] ;
637
          let sorted_filtered_collected_transitions =
×
638
            O1trace.sync_thread "sorting_collected_transitions" (fun () ->
639
                List.sort filtered_collected_transitions
×
640
                  ~compare:
641
                    (Comparable.lift
×
642
                       ~f:(fun (x, _) ->
643
                         Transition_cache.header_with_hash
×
644
                         @@ Envelope.Incoming.data x )
×
645
                       (external_transition_compare ~context:(module Context)) ) )
646
          in
647
          let this_cycle =
×
648
            { cycle_result = "success"
649
            ; sync_ledger_time
650
            ; staged_ledger_data_download_time
651
            ; staged_ledger_construction_time
652
            ; local_state_sync_required
653
            ; local_state_sync_time = Some local_state_sync_time
654
            }
655
          in
656
          `Finished
657
            ( this_cycle :: previous_cycles
658
            , (new_frontier, sorted_filtered_collected_transitions) ) )
659

660
(** The entry point function for bootstrap controller. When bootstrap finished
661
    it would return a transition frontier with the root breadcrumb and a list
662
    of transitions collected during bootstrap.
663

664
    Bootstrap controller would do the following steps to contrust the
665
    transition frontier:
666
    1. Download the root snarked_ledger.
667
    2. Download the scan state and pending coinbases.
668
    3. Construct the staged ledger from the snarked ledger, scan state and
669
       pending coinbases.
670
    4. Synchronize the consensus local state if necessary.
671
    5. Close the old frontier and reload a new one from disk.
672
 *)
673
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
674
    ~consensus_local_state ~network_transition_pipe ~preferred_peers
675
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
676
    =
677
  let open Context in
×
678
  let run_cycle =
679
    run_cycle
680
      ~context:(module Context : CONTEXT)
681
      ~trust_system ~verifier ~network ~consensus_local_state
682
      ~network_transition_pipe ~preferred_peers ~persistent_root
683
      ~persistent_frontier ~initial_root_transition ~catchup_mode
684
  in
685
  O1trace.thread "bootstrap"
686
  @@ fun () ->
687
  let%map time_elapsed, (cycles, result) =
688
    time_deferred @@ Deferred.repeat_until_finished [] run_cycle
×
689
  in
690
  [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
691
    ~metadata:
692
      [ ("time_elapsed", Time.Span.to_yojson_hum time_elapsed)
×
693
      ; ( "bootstrap_stats"
694
        , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
695
      ] ;
696
  Mina_metrics.(
×
697
    Gauge.set Bootstrap.bootstrap_time_ms Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
698
  result
699

700
let%test_module "Bootstrap_controller tests" =
701
  ( module struct
702
    let max_frontier_length =
703
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
704

705
    let logger = Logger.null ()
×
706

707
    let () =
708
      (* Disable log messages from best_tip_diff logger. *)
709
      Logger.Consumer_registry.register ~commit_id:""
×
710
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
711
        ~transport:
712
          (Logger.Transport.create
×
713
             ( module struct
714
               type t = unit
715

716
               let transport () _ = ()
×
717
             end )
718
             () )
719
        ()
720

721
    let trust_system =
722
      let s = Trust_system.null () in
723
      don't_wait_for
×
724
        (Pipe_lib.Strict_pipe.Reader.iter
×
725
           (Trust_system.upcall_pipe s)
×
726
           ~f:(const Deferred.unit) ) ;
×
727
      s
×
728

729
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
730

731
    let proof_level = precomputed_values.proof_level
732

733
    let constraint_constants = precomputed_values.constraint_constants
734

735
    let ledger_sync_config =
736
      Syncable_ledger.create_config
×
737
        ~compile_config:Mina_compile_config.For_unit_tests.t
738
        ~max_subtree_depth:None ~default_subtree_depth:None ()
739

740
    module Context = struct
741
      let logger = logger
742

743
      let precomputed_values = precomputed_values
744

745
      let constraint_constants =
746
        Genesis_constants.For_unit_tests.Constraint_constants.t
747

748
      let consensus_constants = precomputed_values.consensus_constants
749

750
      let ledger_sync_config =
751
        Syncable_ledger.create_config
×
752
          ~compile_config:Mina_compile_config.For_unit_tests.t
753
          ~max_subtree_depth:None ~default_subtree_depth:None ()
754

755
      let proof_cache_db = Proof_cache_tag.For_tests.create_db ()
×
756

757
      let signature_kind = Mina_signature_kind.Testnet
758
    end
759

760
    let verifier =
761
      Async.Thread_safe.block_on_async_exn (fun () ->
×
762
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
763
            () )
764

765
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
766

767
    let downcast_transition ~sender transition =
768
      let transition =
×
769
        transition |> Mina_block.Validated.remember
×
770
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
771
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
772
      in
773
      Envelope.Incoming.wrap ~data:transition
×
774
        ~sender:(Envelope.Sender.Remote sender)
775

776
    let downcast_breadcrumb ~sender breadcrumb =
777
      downcast_transition ~sender
×
778
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
779

780
    let make_non_running_bootstrap ~genesis_root ~network =
781
      let transition =
×
782
        genesis_root
783
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
784
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
785
      in
786
      { context = (module Context)
×
787
      ; trust_system
788
      ; verifier
789
      ; best_seen_transition = transition
790
      ; current_root = transition
791
      ; network
792
      ; num_of_root_snarked_ledger_retargeted = 0
793
      }
794

795
    let%test_unit "Bootstrap controller caches all transitions it is passed \
796
                   through the transition_reader" =
797
      let branch_size = (max_frontier_length * 2) + 2 in
×
798
      Quickcheck.test ~trials:1
799
        (let open Quickcheck.Generator.Let_syntax in
800
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
801
        let%bind fake_network =
802
          Fake_network.Generator.(
803
            gen ~precomputed_values ~verifier ~max_frontier_length
×
804
              ~ledger_sync_config [ fresh_peer; fresh_peer ])
805
        in
806
        let%map make_branch =
807
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
808
            ~verifier
809
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
810
            branch_size
811
        in
812
        let [ me; _ ] = fake_network.peer_networks in
×
813
        let branch =
814
          Async.Thread_safe.block_on_async_exn (fun () ->
815
              make_branch (Transition_frontier.root me.state.frontier) )
×
816
        in
817
        (fake_network, branch))
×
818
        ~f:(fun (fake_network, branch) ->
819
          let [ me; other ] = fake_network.peer_networks in
×
820
          let genesis_root =
821
            Transition_frontier.(
822
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
823
            |> Mina_block.Validated.remember
824
          in
825
          let transition_graph = Transition_cache.create () in
×
826
          let sync_ledger_reader, sync_ledger_writer =
×
827
            Pipe_lib.Choosable_synchronous_pipe.create ()
828
          in
829
          let bootstrap =
×
830
            make_non_running_bootstrap ~genesis_root ~network:me.network
831
          in
832
          let root_sync_ledger =
833
            Sync_ledger.Root.create
834
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
835
              ~context:(module Context)
836
              ~trust_system
837
          in
838
          Async.Thread_safe.block_on_async_exn (fun () ->
×
839
              let sync_deferred =
×
840
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
841
                  ~preferred:[] ~sync_ledger_reader
842
              in
843
              let%bind sync_ledger_writer' =
844
                Deferred.List.fold ~init:sync_ledger_writer branch
×
845
                  ~f:(fun sync_ledger_writer breadcrumb ->
846
                    Pipe_lib.Choosable_synchronous_pipe.write sync_ledger_writer
×
847
                      ( `Block
848
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
849
                      , `Valid_cb None ) )
850
              in
851
              Pipe_lib.Choosable_synchronous_pipe.close sync_ledger_writer' ;
×
852
              sync_deferred ) ;
×
853
          let expected_transitions =
×
854
            List.map branch
855
              ~f:
856
                (Fn.compose
×
857
                   (With_hash.map ~f:Mina_block.header)
858
                   (Fn.compose Mina_block.Validation.block_with_hash
×
859
                      (Fn.compose Mina_block.Validated.remember
×
860
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
861
          in
862
          let saved_transitions =
×
863
            Transition_cache.data transition_graph
×
864
            |> List.map ~f:(fun (x, _) ->
865
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
866
          in
867
          let module E = struct
×
868
            module T = struct
869
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
870
              [@@deriving sexp]
871

872
              let compare = external_transition_compare ~context:(module Context)
873
            end
874

875
            include Comparable.Make (T)
876
          end in
877
          [%test_result: E.Set.t]
×
878
            (E.Set.of_list saved_transitions)
×
879
            ~expect:(E.Set.of_list expected_transitions) )
×
880

881
    let run_bootstrap ~timeout_duration ~my_net ~network_transition_pipe =
882
      let open Fake_network in
×
883
      let time_controller = Block_time.Controller.basic ~logger in
884
      let persistent_root =
885
        Transition_frontier.persistent_root my_net.state.frontier
886
      in
887
      let persistent_frontier =
×
888
        Transition_frontier.persistent_frontier my_net.state.frontier
889
      in
890
      let initial_root_transition =
×
891
        Transition_frontier.(
892
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
893
      in
894
      let%bind () =
895
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
896
      in
897
      [%log info] "bootstrap begin" ;
×
898
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
899
        (run
900
           ~context:(module Context)
901
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
902
           ~consensus_local_state:my_net.state.consensus_local_state
903
           ~network_transition_pipe ~persistent_root ~persistent_frontier
904
           ~catchup_mode:`Super ~initial_root_transition )
905

906
    let assert_transitions_increasingly_sorted ~root
907
        (incoming_transitions : Transition_cache.element list) =
908
      let root =
×
909
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
910
      in
911
      ignore
×
912
        ( List.fold_result ~init:root incoming_transitions
×
913
            ~f:(fun max_acc incoming_transition ->
914
              let With_hash.{ data = header; _ } =
×
915
                Transition_cache.header_with_hash
916
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
917
              in
918
              let header_len h =
×
919
                Mina_block.Header.protocol_state h
×
920
                |> Protocol_state.consensus_state
×
921
                |> Consensus.Data.Consensus_state.blockchain_length
922
              in
923
              let open Result.Let_syntax in
924
              let%map () =
925
                Result.ok_if_true
×
926
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
927
                  ~error:
928
                    (Error.of_string
×
929
                       "The blocks are not sorted in increasing order" )
930
              in
931
              header )
×
932
          |> Or_error.ok_exn
×
933
          : Mina_block.Header.t )
934

935
    let%test_unit "sync with one node after receiving a transition" =
936
      Quickcheck.test ~trials:1
×
937
        Fake_network.Generator.(
938
          gen ~precomputed_values ~verifier ~max_frontier_length
×
939
            ~ledger_sync_config
940
            [ fresh_peer
941
            ; peer_with_branch
942
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
943
            ])
944
        ~f:(fun fake_network ->
945
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
946
          let block =
947
            Envelope.Incoming.wrap
948
              ~data:
949
                ( Transition_frontier.best_tip peer_net.state.frontier
×
950
                |> Transition_frontier.Breadcrumb.validated_transition
×
951
                |> Mina_block.Validated.remember
×
952
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
953
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
954
              ~sender:(Envelope.Sender.Remote peer_net.peer)
955
          in
956
          let network_transition_pipe =
957
            Swappable.create ~name:(__MODULE__ ^ __LOC__)
958
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
959
          in
960
          Swappable.write network_transition_pipe (`Block block, `Valid_cb None) ;
×
961
          let new_frontier, sorted_external_transitions =
×
962
            Async.Thread_safe.block_on_async_exn (fun () ->
963
                run_bootstrap
×
964
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
965
                  ~my_net ~network_transition_pipe )
966
          in
967
          assert_transitions_increasingly_sorted
×
968
            ~root:(Transition_frontier.root new_frontier)
×
969
            sorted_external_transitions ;
970
          [%test_result: Ledger_hash.t]
×
NEW
971
            ( Root_ledger.merkle_root
×
972
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
973
            ~expect:
NEW
974
              ( Root_ledger.merkle_root
×
975
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
976
              ) )
977

978
    let%test_unit "reconstruct staged_ledgers using \
979
                   of_scan_state_and_snarked_ledger" =
980
      Quickcheck.test ~trials:1
×
981
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
982
           ~max_length:max_frontier_length ~size:max_frontier_length () )
983
        ~f:(fun frontier ->
984
          Thread_safe.block_on_async_exn
×
985
          @@ fun () ->
986
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
987
            ~f:(fun breadcrumb ->
988
              let staged_ledger =
×
989
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
990
              in
991
              let expected_merkle_root =
×
992
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
993
              in
994
              let snarked_ledger =
×
995
                Transition_frontier.root_snarked_ledger frontier
×
996
                |> Root_ledger.as_masked
997
              in
998
              let snarked_local_state =
×
999
                Transition_frontier.root frontier
×
1000
                |> Transition_frontier.Breadcrumb.protocol_state
×
1001
                |> Protocol_state.blockchain_state
×
1002
                |> Blockchain_state.snarked_local_state
1003
              in
1004
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1005
              let get_state hash =
×
1006
                match Transition_frontier.find_protocol_state frontier hash with
×
1007
                | Some protocol_state ->
×
1008
                    Ok protocol_state
1009
                | None ->
×
1010
                    Or_error.errorf
1011
                      !"Protocol state (for scan state transactions) for \
×
1012
                        %{sexp:State_hash.t} not found"
1013
                      hash
1014
              in
1015
              let pending_coinbases =
1016
                Staged_ledger.pending_coinbase_collection staged_ledger
1017
              in
1018
              let%map actual_staged_ledger =
1019
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1020
                  ~scan_state ~logger ~verifier ~constraint_constants
1021
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1022
                  ~pending_coinbases ~get_state ~signature_kind:Testnet
1023
                |> Deferred.Or_error.ok_exn
×
1024
              in
1025
              let height =
×
1026
                Transition_frontier.Breadcrumb.consensus_state breadcrumb
×
1027
                |> Consensus.Data.Consensus_state.blockchain_length
×
1028
                |> Mina_numbers.Length.to_int
1029
              in
1030
              [%test_eq: Staged_ledger_hash.t]
×
1031
                ~message:
1032
                  (sprintf "mismatch of staged ledger hash height %d" height)
×
1033
                (Transition_frontier.Breadcrumb.staged_ledger_hash breadcrumb)
×
1034
                (Staged_ledger.hash actual_staged_ledger) ) )
×
1035

1036
    (*
1037
    let%test_unit "if we see a new transition that is better than the \
1038
                   transition that we are syncing from, than we should \
1039
                   retarget our root" =
1040
      Quickcheck.test ~trials:1
1041
        Fake_network.Generator.(
1042
          gen ~max_frontier_length
1043
            [ fresh_peer
1044
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1045
            ; peer_with_branch
1046
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1047
        ~f:(fun fake_network ->
1048
          let [me; weaker_chain; stronger_chain] =
1049
            fake_network.peer_networks
1050
          in
1051
          let transition_reader, transition_writer =
1052
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1053
              (Buffered (`Capacity 10, `Overflow Drop_head))
1054
          in
1055
          Envelope.Incoming.wrap
1056
            ~data:
1057
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1058
              |> Transition_frontier.Breadcrumb.validated_transition
1059
              |> Mina_block.Validated.to_initial_validated )
1060
            ~sender:
1061
              (Envelope.Sender.Remote
1062
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1063
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1064
          Envelope.Incoming.wrap
1065
            ~data:
1066
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1067
              |> Transition_frontier.Breadcrumb.validated_transition
1068
              |> Mina_block.Validated.to_initial_validated )
1069
            ~sender:
1070
              (Envelope.Sender.Remote
1071
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1072
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1073
          let new_frontier, sorted_external_transitions =
1074
            Async.Thread_safe.block_on_async_exn (fun () ->
1075
                run_bootstrap
1076
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1077
                  ~my_net:me ~transition_reader )
1078
          in
1079
          assert_transitions_increasingly_sorted
1080
            ~root:(Transition_frontier.root new_frontier)
1081
            sorted_external_transitions ;
1082
          [%test_result: Ledger_hash.t]
1083
            ( Ledger.Db.merkle_root
1084
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1085
            ~expect:
1086
              ( Ledger.Db.merkle_root
1087
              @@ Transition_frontier.root_snarked_ledger
1088
                   stronger_chain.state.frontier ) )
1089
*)
1090
  end )
226✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc